I am observing different write behaviors when executing queries on EMR Notebook (correct behavior) vs when using spark-submit to submit a spark application to EMR Cluster (incorrect behavior).
When I am submitting Spark applications (with logic to append data) to EMR cluster using spark-submit, the data is being overwritten in the Iceberg tables instead of being appended. However, all of this works perfectly fine and data is indeed only appended if I execute the code via EMR Notebook instead of doing spark-submit. Below is the context:
I created an Iceberg table via EMR Notebook using:
CREATE TABLE IF NOT EXISTS raw_data.civ (
date timestamp,
marketplace_id int,
... some more columns
)
USING ICEBERG
PARTITIONED BY (
marketplace_id,
days(date)
)
TBLPROPERTIES (
'write.sort-order' = 'dimension'
)
Now I read from some source data and create a DataFrame in my scala code and I have tried below ways of writing to my Iceberg table and all of these lead to existing data being deleted and only new data being present.
Approach 1:
df.write.format("iceberg").mode("append").saveAsTable(outputTableName)
Approach 2:
df.writeTo(outputTableName).append()
Approach 3:
spark.sql(
s"""
|MERGE INTO ${input.outputTableName} destination
|USING inputDfTable source
|ON
<some conditions>
|WHEN MATCHED THEN
|UPDATE SET *
|WHEN NOT MATCHED THEN
|INSERT *
|""".stripMargin)
Configs that I used with spark-submit:
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.my_catalog.type=glue
--conf spark.sql.catalog.my_catalog.warehouse=s3://<some_location>/
--conf spark.sql.sources.partitionOverwriteMode=dynamic
And when I use these exact same configs in EMR Notebook, the result is as expected and data is actually appended.
I verify this by using below code in notebook:
val icebergDf = spark.table("my_catalog.raw_data.civ")
icebergDf.select("snapshot_day").distinct().orderBy(col("snapshot_day").desc).show(500)
One thing that I did was I renamed a partition column using query below and I did this before doing any of the above.
ALTER TABLE my_catalog.raw_data.civ RENAME COLUMN date TO snapshot_day
Can anyone please let me know what I might be doing wrong here and what would be the easiest way to determine the root cause for this? I didn't find Spark UI to be helpful but I might also have not been looking at the correct place.
EDIT:
df.write.format("iceberg").mode("append").saveAsTable(outputTableName) - this did not work but df.writeTo(outputTableName).append() worked. I was not building the jar correctly which was causing the issue.