0

I am observing different write behaviors when executing queries on EMR Notebook (correct behavior) vs when using spark-submit to submit a spark application to EMR Cluster (incorrect behavior).

When I am submitting Spark applications (with logic to append data) to EMR cluster using spark-submit, the data is being overwritten in the Iceberg tables instead of being appended. However, all of this works perfectly fine and data is indeed only appended if I execute the code via EMR Notebook instead of doing spark-submit. Below is the context:

I created an Iceberg table via EMR Notebook using:

CREATE TABLE IF NOT EXISTS raw_data.civ (
    date timestamp,
    marketplace_id int,
    ... some more columns
)
USING ICEBERG
PARTITIONED BY (
    marketplace_id,
    days(date)
)
TBLPROPERTIES (
    'write.sort-order' = 'dimension'
)

Now I read from some source data and create a DataFrame in my scala code and I have tried below ways of writing to my Iceberg table and all of these lead to existing data being deleted and only new data being present.

Approach 1:

df.write.format("iceberg").mode("append").saveAsTable(outputTableName)

Approach 2:

df.writeTo(outputTableName).append()

Approach 3:

spark.sql(
      s"""
         |MERGE INTO ${input.outputTableName} destination
         |USING inputDfTable source
         |ON
            <some conditions>
         |WHEN MATCHED THEN
            |UPDATE SET *
         |WHEN NOT MATCHED THEN
            |INSERT *
         |""".stripMargin)

Configs that I used with spark-submit:

--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog 
--conf spark.sql.catalog.my_catalog.type=glue 
--conf spark.sql.catalog.my_catalog.warehouse=s3://<some_location>/ 
--conf spark.sql.sources.partitionOverwriteMode=dynamic

And when I use these exact same configs in EMR Notebook, the result is as expected and data is actually appended.

I verify this by using below code in notebook:

val icebergDf = spark.table("my_catalog.raw_data.civ")
icebergDf.select("snapshot_day").distinct().orderBy(col("snapshot_day").desc).show(500)

One thing that I did was I renamed a partition column using query below and I did this before doing any of the above.

ALTER TABLE my_catalog.raw_data.civ RENAME COLUMN date TO snapshot_day

Can anyone please let me know what I might be doing wrong here and what would be the easiest way to determine the root cause for this? I didn't find Spark UI to be helpful but I might also have not been looking at the correct place.

EDIT:

df.write.format("iceberg").mode("append").saveAsTable(outputTableName) - this did not work but df.writeTo(outputTableName).append() worked. I was not building the jar correctly which was causing the issue.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.