0

I have 2 DataFrames called df1 and df2, where they both have the same column names. I wish to run a for loop over unique dates, from df1 and apply the same date filter to df2. I created a list of unique dates and then tried to iterate through that. However what I have is throwing errors.

Here is what I have:

val unique_weeks = df1.select(df1("date")).distinct

for( week <- unique_weeks) {
  val df1_filtered = df1.filter($"date" === week)
  val df2_filtered = df2.filter($"date" === week)
  /// will run a join here and more code 

}

I think <- this part may be incorrect - but not sure how I can filter the DataFrames using another method.

Here is the error:

[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error]     at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error]     at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error]     at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error]     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error]     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error]     at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error]     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error]     at java.lang.Thread.run(Thread.java:748)
[error] 
[error] Driver stacktrace:
[error] org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error]     at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error]     at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error]     at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error]     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error]     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error]     at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error]     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error]     at java.lang.Thread.run(Thread.java:748)
[error] 
[error] Driver stacktrace:
[error]     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
[error]     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error]     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[error]     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[error]     at scala.Option.foreach(Option.scala:257)
[error]     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
[error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
[error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
[error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
[error]     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[error]     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)
[error]     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[error]     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[error]     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
[error]     at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)
[error]     at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2286)
[error]     at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
[error]     at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
[error]     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
[error]     at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
[error]     at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2285)
[error]     at spark_pkg.SparkMain$.main(SparkMain.scala:878)
[error]     at spark_pkg.SparkMain.main(SparkMain.scala)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]     at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error]     at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error]     at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error]     at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error]     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error]     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error]     at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error]     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error]     at java.lang.Thread.run(Thread.java:748)
[error] stack trace is suppressed; run 'last Compile / bgRun' for the full output
[error] Nonzero exit code: 1
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 137 s (02:17), completed Aug 20, 2020 1:16:02 PM
6
  • What exceptions are getting thrown? Commented Aug 20, 2020 at 19:46
  • i haven't run the full program since its long but my IDE is giving me a red line under <- Commented Aug 20, 2020 at 19:54
  • It's a bit misleading to say it's throwing errors, which could reasonably interpreted as being at runtime. Commented Aug 20, 2020 at 20:03
  • In any event, what does the IDE say the error under the red line is? Commented Aug 20, 2020 at 20:07
  • @LeviRamsey i updated the question to include the error on run time Commented Aug 20, 2020 at 20:17

1 Answer 1

2

A dataframe is not an iterator, and therefore, you cannot run a for loop over it. You can run something like this - but I don't think it will do what you're hoping to achieve based on your other code.

unique_weeks.foreachPartition{ weeks : Iterator[YourData] => 

  for( week <- weeks) {
  
  }    
}

Your question suggests your mental model of what a dataframe is and how Spark works is not quite complete. Think of a Dataframe more as a List[List[YourData]], except each inner List[YourData] is located on an independent piece of a machine, and may not necessarily know or interact with any of the other Lists until you collect them back to the driver.

Sign up to request clarification or add additional context in comments.

1 Comment

Spark 3.0's scaladoc indicates that dataframes have a foreach method.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.