I have 2 DataFrames called df1 and df2, where they both have the same column names. I wish to run a for loop over unique dates, from df1 and apply the same date filter to df2. I created a list of unique dates and then tried to iterate through that. However what I have is throwing errors.
Here is what I have:
val unique_weeks = df1.select(df1("date")).distinct
for( week <- unique_weeks) {
val df1_filtered = df1.filter($"date" === week)
val df2_filtered = df2.filter($"date" === week)
/// will run a join here and more code
}
I think <- this part may be incorrect - but not sure how I can filter the DataFrames using another method.
Here is the error:
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error] at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error] at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error] at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error] at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error] at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] at java.lang.Thread.run(Thread.java:748)
[error]
[error] Driver stacktrace:
[error] org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error] at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error] at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error] at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error] at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error] at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] at java.lang.Thread.run(Thread.java:748)
[error]
[error] Driver stacktrace:
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[error] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[error] at scala.Option.foreach(Option.scala:257)
[error] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
[error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
[error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
[error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
[error] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[error] at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)
[error] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[error] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[error] at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
[error] at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)
[error] at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2286)
[error] at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
[error] at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
[error] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
[error] at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
[error] at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2285)
[error] at spark_pkg.SparkMain$.main(SparkMain.scala:878)
[error] at spark_pkg.SparkMain.main(SparkMain.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error] at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error] at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error] at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error] at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error] at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] at java.lang.Thread.run(Thread.java:748)
[error] stack trace is suppressed; run 'last Compile / bgRun' for the full output
[error] Nonzero exit code: 1
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 137 s (02:17), completed Aug 20, 2020 1:16:02 PM
<-