1) If I use the following one in both local and cluster mode, I get NullPointerException error
import sparkSession.implicits._
val testDS = sparkSession.createDataFrame(
Seq(
ABC("1","2", 1),
ABC("3","9", 3),
ABC("8","2", 2),
ABC("1","2", 3),
ABC("3","9", 1),
ABC("2","7", 1),
ABC("1","3", 2))
).as[ABC]
val t = testDS
.rdd
.groupBy(_.c)
.foreachPartition(
p => p.foreach(
a => {
val id = a._1
println("inside foreach, id: " + id)
val itABC = a._2
val itSeq = itABC.toSeq
println(itSeq.size)
val itDS = itSeq.toDS // Get "Caused by: java.lang.NullPointerException" here
itDS.show()
funcA(itDS, id)
}
)
)
println(t.toString)
Or
import sparkSession.implicits._
val testDS = sparkSession.createDataFrame(
Seq(
ABC("1","2", 1),
ABC("3","9", 3),
ABC("8","2", 2),
ABC("1","2", 3),
ABC("3","9", 1),
ABC("2","7", 1),
ABC("1","3", 2))
).as[ABC]
testDS
.rdd
.groupBy(_.c)
.foreachPartition(
p => p.foreach(
a => {
val id = a._1
println("inside foreach, id: " + id)
val itABC = a._2
import sparkSession.implicits._
val itDS = sparkSession.createDataFrame(
sparkSession.sparkContext.parallelize(itABC.toList, numSlices=200)) // get "NullPointerException" here
itDS.show()
funcA(itDS, id)
}
)
)
Here's the output log for 1):
17/10/26 15:07:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:> (0 + 4) / 4]17/10/26 15:07:29 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 8, 10.142.17.137, executor 0): java.lang.NullPointerException
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:176)
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:167)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/10/26 15:07:29 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 12, 10.142.17.137, executor 0): java.lang.NullPointerException
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:176)
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:167)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
at com.a.data_pipeline.SL.generateScaleGraphs(SL.scala:165)
at com.a.data_pipeline.GA$$anonfun$generateGraphsDataScale$1.apply(GA.scala:23)
at com.a.data_pipeline.GA$$anonfun$generateGraphsDataScale$1.apply(GA.scala:21)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.a.data_pipeline.GA$.generateGraphsDataScale(GA.scala:21)
at com.a.data_pipeline.GA$.main(GA.scala:52)
at com.a.data_pipeline.GA.main(GA.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:176)
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:167)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2) But if I use the following code, running in local mode works fine, but running in cluster mode I get NullPointerException or Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
import sparkSession.implicits._
val testDS = sparkSession.createDataFrame(
Seq(
ABC("1","2", 1),
ABC("3","9", 3),
ABC("8","2", 2),
ABC("1","2", 3),
ABC("3","9", 1),
ABC("2","7", 1),
ABC("1","3", 2))
).as[ABC]
val test = testDS
.rdd
.groupBy(_.c)
.foreachPartition(
p => p.foreach(
a => {
val id = a._1
println("inside foreach, id: " + id)
val itABC = a._2
val ss = SparkSessionUtil.getInstance(clusterMode)
import ss.implicits._
val itDS = ss.createDataFrame(
ss.sparkContext.parallelize(itABC.toList, numSlices=200)).as[ABC]
itDS.show()
funcA(itDS, id) // in funcA, I'd like to use this itDS(Dataset) to do some calculation, like itDS.groupby().agg().filter()
}
)
)
Here's the system out log for 2):
17/10/26 14:19:12 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
inside foreach, id: 1
17/10/26 14:19:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 1|
| 3| 9| 1|
| 2| 7| 1|
+---+---+---+
inside foreach, id: 2
17/10/26 14:19:14 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
17/10/26 14:19:14 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
+---+---+---+
| a| b| c|
+---+---+---+
| 8| 2| 2|
| 1| 3| 2|
+---+---+---+
inside foreach, id: 3
+---+---+---+
| a| b| c|
+---+---+---+
| 3| 9| 3|
| 1| 2| 3|
+---+---+---+
I would like to use id related Dataset(itDS) in funcA(itDS, id) to calculate something like itDS.groupby().agg().filter(),How should I solve this problem? Thank you in advance?
NullPointerException? Out of curiosity, what happens if you remove theas[ABC]from the innermostforeachof the first code snippet?as[ABC]as you mentioned, butNullPointerExceptionerror still happenseq[ABC]to yourfuncA(without transforming it to aDataSet) - you can dogroupByandfilteronSeqso just do that.DataSetsare for distributed processing of BIG datasets...funcAon your originalDataset, after thegroupBy? Have you considered using audfto wrapfuncA, and then apply it on the columns of the dataset which the function operates on? Otherwise, as previous comments suggest, I do not think converting the sequence of rows to a dataset from within theforeachPartitionsis the right way to go (should it even works).