0

We have a use case where in a Spark job

  • We iterate over partitions of an external table
    • Load data of this partition (almost same data vol in each partition)
    • Do transformation(self joins, no udfs) on the data
    • save data to an external location
    • repeat

In the process, we do not cache any dataframes.

We are seeing a steady increase in memory usage after each iteration. After some time the job exits with Out of heap memory error.

I am looking for some pointers that might help in debugging this issue.

The code looks very similar to this.

while(date < endDate) {
    val df = spark.sql("SELECT * FROM tbl JOIN tbl2 ON tbl.date = '${date}' AND tbl2.date = '${date}'")
    df.write.partitionBy("date").mode("overwrite").("s3://bucket/path")
    date = increment(date)
}
3
  • Is there any specific reason you want to iterate over each partition individually? Why don't you load everything, apply the transformations and partition the data during the writing process? Commented Mar 4, 2023 at 16:28
  • Please provide enough code so others can better understand or reproduce the problem. Commented Mar 5, 2023 at 10:51
  • We are restricted on how many nodes in a cluster we can add. Hence it's not possible to load everything at once. Commented Mar 6, 2023 at 3:28

1 Answer 1

0

So say we have data what is not possible to join in one job but could be split by date(we don't have a logic based on intersection by date). So how we will join it, we just a split data for different job what could be run in parallel here a example where quantity is a year:

empty = EmtpyOperator(task_id = "sync")
for year in range(start_year,end_year)
    spark_submit = LivyOperator()#or other what u use
    spark_submit >> empty

    ...
    val yearProcessing = spark.conf.get("spark.start_interval.year")
    val dataSourceOne = spark.read.parquet(...).where(year(col("date")) === yearProcessing)
    val dataSourceTwo = spark.read.parquet(...).where(year(col("date")) === yearProcessing)
    dataSourceOne
        .join(dataSourceTwo, dataSourceOne("date") === dataSourceTwo("date"))
        .select(dataSourceOne.columns.map(s=>dataSourceOne(s).alias(s"do_$s")++dataSourceTwo.columns.map(s=>dataSourceTwo(s).alias(s"dt_$s"):_*)
        .write
        .mode("overwrite")
        .partitionBy("dt_date")
        .option("partitionOverwriteMode","dynamic")
        .parquet("path/to/dist")

That what i did in my scenario. There i also use quntity by day but in this case i wrote bash script, because Airflow didn't allow to render dag with 5*365 task.

Sign up to request clarification or add additional context in comments.

2 Comments

- We are using the flag you mentioned. I updated the code snippet to better represent the situation. We can't whole table join because of the table size.
Okay, i have such as problem in production. Just split join by year, month wherever u can to split. Make while loop not in same code where u call a join, just move it upper. I wrote a DAG in airflow just to have a more control and shiny UI to see how it is going. But there is some restriction based on quantity date what u choice.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.