5

I have an ML dataframe which I read from csv files. It contains three types of columns:

ID Timestamp Feature1 Feature2...Feature_n

where n is ~ 500 (500 features in ML parlance). The total number of rows in the dataset is ~ 160 millions.

As this is the result of a previous full join, there are many features which do not have values set.

My aim is to run a "fill" function(fillna style form python pandas), where each empty feature value gets set with the previously available value for that column, per Id and Date.

I am trying to achieve this with the following spark 2.2.1 code:

 val rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)

 val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(-50000, -1)

 val columns = Array(...) //first 30 columns initially, just to see it working

val rawDataSetFilled = columns.foldLeft(rawDataset) { (originalDF, columnToFill) =>
      originalDF.withColumn(columnToFill, coalesce(col(columnToFill), last(col(columnToFill), ignoreNulls = true).over(window)))
    }

I am running this job on a 4 m4.large instances on Amazon EMR, with spark 2.2.1. and dynamic allocation enabled.

The job runs for over 2h without completing.

Am I doing something wrong, at the code level? Given the size of the data, and the instances, I would assume it should finish in a reasonable amount of time? And I haven't even tried with the full 500 columns, just with about 30!

Looking in the container logs, all I see are many logs like this:

INFO codegen.CodeGenerator: Code generated in 166.677493 ms

INFO execution.ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

I have tried setting parameter spark.sql.windowExec.buffer.spill.threshold to something larger, without any impact. Is theresome other setting I should know about? Those 2 lines are the only ones I see in any container log.

In Ganglia, I see most of the CPU cores peaking around full usage, but the memory usage is lower than the maximum available. All executors are allocated and are doing work.

1
  • Did you look at the execution plan? If your dataframe is not repartitioned by ID and sorted in partitions by Id and DATE there will be a shuffle and sort before foldLeft. Could this be the reason? Also, is the limit really -50000 necessary? Maybe you should try first with some smaller values like -10. Commented Feb 21, 2018 at 16:25

1 Answer 1

6

I have managed to rewrite the fold left logic without using withColumn calls. Apparently they can be very slow for large number of columns, and I was also getting stackoverflow errors because of that.

I would be curious to know why this massive difference - and what exactly happens behind the scenes with the query plan execution, which makes repeated withColumns calls so slow.

Links which proved very helpful: Spark Jira issue and this stackoverflow question

    var rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)    
    val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    rawDataset = rawDataset.select(rawDataset.columns.map(column => coalesce(col(column), last(col(column), ignoreNulls = true).over(window)).alias(column)): _*)
    rawDataset.write.option("header", "true").csv(outputLocation)
Sign up to request clarification or add additional context in comments.

2 Comments

Nice solution. How fast is it now?
You can see a very nice blog post about this here where you also get a nice benchmark.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.