I have an issue when running a Spark submit. I'm not sure whether the problem lies in my Spark script or my resources (but I suspect it's related to the resources).
I have 9 main processes:
- A function to fetch data from an API, then convert it to RDD, and finally to a Pandas DataFrame (there are 7 APIs).
- Fetch data from the APIs using the function above, then convert them into Spark DataFrames (7 datasets are fetched).
- Process A, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
- Process B, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
- Process C, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
- Process D, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
- Process E, which consists of 3 sub-processes: calculating costs, running the algorithm, and generating results (each sub-process uses Spark SQL).
- Perform a union of results from A, B, C, D, and E.
- Send the union result to the API.
In process 2, I added .cache() to every spark.sql. Similarly, in every sub-process of processes A, B, C, D, and E, I also added .cache() to every spark.sql. The queries are complex, and there are additional UDFs for the algorithm used in sub-process 2.
Example of caching in process 2:
get_first_api_df(API_LIMIT).createOrReplaceTempView("first_api")
first_ = spark.sql("""
SELECT fa.*
FROM first_api fa
INNER JOIN (...{}...{}...)
""".format(API_DATE_FORMAT, API_DATE_FORMAT)).cache()
Example of caching in processes 3, 4, 5, 6, and 7: Sub-process 1:
print("process query_cost_A")
cost_A = spark.sql(query_cost_A).cache()
cost_A.createOrReplaceTempView("cost_matrix_A")
Sub-process 2:
print("process solver_result_all_A")
solver_A = spark.sql(solver_result_all_A).cache()
solver_A.createOrReplaceTempView("solver_result_all_A")
Sub-process 3:
print("process db_solver_result_A")
db_solver_A = spark.sql(db_solver_result_A).cache()
db_solver_A.createOrReplaceTempView("db_solver_result_A")
The union operation retrieves data from db_solver_result_A, db_solver_result_B, db_solver_result_C, and so on.
The issue is that I keep encountering a Java heap space error during process D, specifically in the first sub-process. Processes A to C run smoothly, but it fails during process D.
Spark session configuration:
spark = SparkSession.builder \
.appName("Project - GPU - Cache") \
.config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
.config("spark.rapids.sql.enabled", "true") \
.config("spark.rapids.sql.concurrentGpuTasks", "2") \
.config("spark.rapids.sql.hasExtendedYearValues", "false") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.shuffle.service.enabled", "true") \
.config("spark.dynamicAllocation.maxExecutors", "5") \
.config("spark.sql.crossJoin.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.2") \
.getOrCreate()
Spark submit command:
time spark-submit --master local[*] \
--conf "spark.driver.memoryOverhead=4G" \
--conf "spark.executor.memoryOverhead=10G" \
--driver-memory 20G \
--executor-memory 40G \
--jars "/home/rapids-4-spark_2.12-24.06.1.jar" \
--conf spark.sql.session.timeZone=UTC \
--conf spark.rapids.sql.exec.InMemoryTableScanExec=true \
"/home/script_gpu_cache.py" >> /home/log.txt
Any suggestions or advice would be greatly appreciated.
I have tried increasing the Java heap space using Xms and Xmx, but the issue persists. I have also tried removing cache in each process, yet the result remains the same. I'm unsure if I should increase the driver memory to 30G and executor memory to 60G, or even 40G and 80G.