3

1) If I use the following one in both local and cluster mode, I get NullPointerException error

import sparkSession.implicits._
val testDS = sparkSession.createDataFrame(
  Seq(
    ABC("1","2", 1),
    ABC("3","9", 3),
    ABC("8","2", 2),
    ABC("1","2", 3),
    ABC("3","9", 1),
    ABC("2","7", 1),
    ABC("1","3", 2))
).as[ABC]

val t = testDS
  .rdd
  .groupBy(_.c)
  .foreachPartition(
    p => p.foreach(
      a => {
        val id = a._1
        println("inside foreach, id: " + id)
        val itABC = a._2

        val itSeq = itABC.toSeq
        println(itSeq.size)

        val itDS = itSeq.toDS // Get "Caused by: java.lang.NullPointerException" here
        itDS.show()

        funcA(itDS, id)
      }
    )
  )
println(t.toString)

Or

import sparkSession.implicits._
val testDS = sparkSession.createDataFrame(
  Seq(
    ABC("1","2", 1),
    ABC("3","9", 3),
    ABC("8","2", 2),
    ABC("1","2", 3),
    ABC("3","9", 1),
    ABC("2","7", 1),
    ABC("1","3", 2))
).as[ABC]

testDS
  .rdd
  .groupBy(_.c)
  .foreachPartition(
    p => p.foreach(
      a => {
        val id = a._1
        println("inside foreach, id: " + id)
        val itABC = a._2

        import sparkSession.implicits._
        val itDS = sparkSession.createDataFrame( 
          sparkSession.sparkContext.parallelize(itABC.toList, numSlices=200)) // get "NullPointerException" here
        itDS.show()

        funcA(itDS, id)
      }
    )
  )

Here's the output log for 1):

    17/10/26 15:07:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                          (0 + 4) / 4]17/10/26 15:07:29 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 8, 10.142.17.137, executor 0): java.lang.NullPointerException
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:176)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:167)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/10/26 15:07:29 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 12, 10.142.17.137, executor 0): java.lang.NullPointerException
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:176)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:167)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
    at com.a.data_pipeline.SL.generateScaleGraphs(SL.scala:165)
    at com.a.data_pipeline.GA$$anonfun$generateGraphsDataScale$1.apply(GA.scala:23)
    at com.a.data_pipeline.GA$$anonfun$generateGraphsDataScale$1.apply(GA.scala:21)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at com.a.data_pipeline.GA$.generateGraphsDataScale(GA.scala:21)
    at com.a.data_pipeline.GA$.main(GA.scala:52)
    at com.a.data_pipeline.GA.main(GA.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:176)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:167)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

2) But if I use the following code, running in local mode works fine, but running in cluster mode I get NullPointerException or Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration

import sparkSession.implicits._
val testDS = sparkSession.createDataFrame(
  Seq(
    ABC("1","2", 1),
    ABC("3","9", 3),
    ABC("8","2", 2),
    ABC("1","2", 3),
    ABC("3","9", 1),
    ABC("2","7", 1),
    ABC("1","3", 2))
).as[ABC]

val test = testDS
  .rdd
  .groupBy(_.c)
  .foreachPartition(
    p => p.foreach(
      a => {
        val id = a._1
        println("inside foreach, id: " + id)
        val itABC = a._2
        val ss = SparkSessionUtil.getInstance(clusterMode)
        import ss.implicits._
        val itDS = ss.createDataFrame(
        ss.sparkContext.parallelize(itABC.toList, numSlices=200)).as[ABC]
        itDS.show()
        funcA(itDS, id)  // in funcA, I'd like to use this itDS(Dataset) to do some calculation, like itDS.groupby().agg().filter()
      }
    )
  )

Here's the system out log for 2):

17/10/26 14:19:12 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
inside foreach, id: 1
17/10/26 14:19:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  1|
|  3|  9|  1|
|  2|  7|  1|
+---+---+---+

inside foreach, id: 2
17/10/26 14:19:14 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
17/10/26 14:19:14 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  8|  2|  2|
|  1|  3|  2|
+---+---+---+

inside foreach, id: 3
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  3|  9|  3|
|  1|  2|  3|
+---+---+---+

I would like to use id related Dataset(itDS) in funcA(itDS, id) to calculate something like itDS.groupby().agg().filter(),How should I solve this problem? Thank you in advance?

9
  • Could you please share the stack trace associated to the NullPointerException? Out of curiosity, what happens if you remove the as[ABC] from the innermost foreach of the first code snippet? Commented Oct 26, 2017 at 21:54
  • Thank you, @AlexandreDupriez ! I updated the stack trace, in the meantime, I removed the as[ABC] as you mentioned, but NullPointerException error still happen Commented Oct 26, 2017 at 22:17
  • 1
    I could see the input arguments of funcA from the question code. I wanted to see the logics inside funcA so that i can help you writing it in different way. Commented Oct 27, 2017 at 3:57
  • 1
    This kind of nested DataSet processing is a no go, and why on earth would you even want to? Just pass the seq[ABC] to your funcA(without transforming it to a DataSet) - you can do groupBy and filter on Seq so just do that. DataSets are for distributed processing of BIG datasets... Commented Oct 27, 2017 at 10:55
  • 1
    @faustineinsun In this case, why don't you apply funcA on your original Dataset, after the groupBy? Have you considered using a udf to wrap funcA, and then apply it on the columns of the dataset which the function operates on? Otherwise, as previous comments suggest, I do not think converting the sequence of rows to a dataset from within the foreachPartitions is the right way to go (should it even works). Commented Oct 27, 2017 at 11:57

1 Answer 1

4

Recently encountered the same issue and since there was no answer Trying to add answer this question.... faustineinsun Comment is answer :

Thank you, @AlexandreDupriez ! The problem has been solved by restructuring the codes from sparkSession.sql() to Seq[ABC] so that sparkSession isn't referenced in the map/foreach function closure, since sparkSession isn't serializable, it's designed to run on the driver not on workers

Conclusion : With in foreach , foreachPartition or map, mapPartitions you CANT create a new dataframe with spark session .read or .sql inside it it will throw null pointer exception.

Also have a look at :

How to use SQLContext and SparkContext inside foreachPartition

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.