0

I have an issue when running a Spark submit. I'm not sure whether the problem lies in my Spark script or my resources (but I suspect it's related to the resources).

I have 9 main processes:

  1. A function to fetch data from an API, then convert it to RDD, and finally to a Pandas DataFrame (there are 7 APIs).
  2. Fetch data from the APIs using the function above, then convert them into Spark DataFrames (7 datasets are fetched).
  3. Process A, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
  4. Process B, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
  5. Process C, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
  6. Process D, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
  7. Process E, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
  8. Perform a union of results from A, B, C, D, and E.
  9. Send the union result to the API.

In process 2, I added .cache() to every spark.sql. Similarly, in every sub-process of processes A, B, C, D, and E, I also added .cache() to every spark.sql. The queries are complex, and there are additional UDFs for the algorithm used in sub-process 2.

Example of caching in process 2:

get_first_api_df(API_LIMIT).createOrReplaceTempView("first_api") 
first_ = spark.sql(""" 
    SELECT fa.* 
    FROM first_api fa 
    INNER JOIN (...{}...{}...)
""".format(API_DATE_FORMAT, API_DATE_FORMAT)).cache()

Example of caching in processes 3, 4, 5, 6, and 7: Sub-process 1:

print("process query_cost_A")
cost_A = spark.sql(query_cost_A).cache()
cost_A.createOrReplaceTempView("cost_matrix_A")

Sub-process 2:

print("process solver_result_all_A") 
solver_A = spark.sql(solver_result_all_A).cache() 
solver_A.createOrReplaceTempView("solver_result_all_A")

Sub-process 3:

print("process db_solver_result_A") 
db_solver_A = spark.sql(db_solver_result_A).cache() 
db_solver_A.createOrReplaceTempView("db_solver_result_A")

The union operation retrieves data from db_solver_result_A, db_solver_result_B, db_solver_result_C, and so on.

The issue is that I keep encountering a Java heap space error during process D, specifically in the first sub-process. Processes A to C run smoothly, but it fails during process D.

Spark session configuration:

spark = SparkSession.builder \
    .appName("Project - GPU - Cache") \ 
    .config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
    .config("spark.rapids.sql.enabled", "true") \
    .config("spark.rapids.sql.concurrentGpuTasks", "2") \
    .config("spark.rapids.sql.hasExtendedYearValues", "false") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.shuffle.service.enabled", "true") \
    .config("spark.dynamicAllocation.maxExecutors", "5") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.2") \
    .getOrCreate()

Spark submit command:

time spark-submit --master local[*] \
--conf "spark.driver.memoryOverhead=4G" \
--conf "spark.executor.memoryOverhead=10G" \
--driver-memory 20G \
--executor-memory 40G \
--jars "/home/rapids-4-spark_2.12-24.06.1.jar" \
--conf spark.sql.session.timeZone=UTC \
--conf spark.rapids.sql.exec.InMemoryTableScanExec=true \
"/home/script_gpu_cache.py" >> /home/log.txt

Any suggestions or advice would be greatly appreciated.

I have tried increasing the Java heap space using Xms and Xmx, but the issue persists. I have also tried removing cache in each process, yet the result remains the same. I'm unsure if I should increase the driver memory to 30G and executor memory to 60G, or even 40G and 80G.

1
  • This setup sounds pretty resource-heavy—maybe there’s a way to make it run lighter? Pandas DF: Could you go straight from RDD to Spark DF instead? cache(): If you’re reading the DF multiple times, how about adding new columns to the DF during each process and only formatting it into the final ‘tall’ format at the end? UDF: Any chance you could swap out UDFs for native PySpark or SparkSQL functions? Maybe define the API result with a schema and use Spark HOFs? API payload: Could you send separate calls for each of the 7 inputs instead of one combined payload? Commented Dec 6, 2024 at 21:34

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.