5

I'm currently building an application with Apache Spark (pyspark), and I have the following use case:

  • Run pyspark with local mode (using spark-submit local[*]).
  • Write the results of my spark job to S3 in the form of partitioned Parquet files.
  • Ensure that each job overwrite the particular partition it is writing to, in order to ensure idempotent jobs.
  • Ensure that spark-staging files are written to local disk before being committed to S3, as staging in S3, and then committing via a rename operation, is very expensive.

For various internal reasons, all four of the above bullet points are non-negotiable.

I have everything but the last bullet point working. I'm running a pyspark application, and writing to S3 (actually an on-prem Ceph instance), ensuring that spark.sql.sources.partitionOverwriteMode is set to dynamic.

However, this means that my spark-staging files are being staged in S3, and then committed by using a delete-and-rename operation, which is very expensive.

I've tried using the Spark Directory Committer in order to stage files on my local disk. This works great unless spark.sql.sources.partitionOverwriteMode.

After digging through the source code, it looks like the PathOutputCommitter does not support Dynamic Partition Overwriting.

At this point, I'm stuck. I want to be able to write my staging files to local disk, and then commit the results to S3. However, I also need to be able to dynamically overwrite a single partition without overwriting the entire Parquet table.

For reference, I'm running pyspark=3.1.2, and using the following spark-submit command:

spark-submit --repositories https://repository.cloudera.com/artifactory/cloudera-repos/ --packages com.amazonaws:aws-java-sdk:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0,org.apache.spark:spark-hadoop-cloud_2.12:3.1.1.3.1.7270.0-253

I get the following error when spark.sql.sources.partitionOverwriteMode is set to dynamic:

java.io.IOException: PathOutputCommitProtocol does not support dynamicPartitionOverwrite

My spark config is as follows:


        self.spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
        self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

        self.spark.conf.set("spark.hadoop.fs.s3a.committer.name", "magic")

        self.spark.conf.set("spark.sql.sources.commitProtocolClass",
                            "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")

        self.spark.conf.set("spark.sql.parquet.output.committer.class",
                            "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")

        self.spark.conf.set(
            "spark.sql.sources.partitionOverwriteMode", "dynamic"
        )

1 Answer 1

3

afraid the s3a committers don't support the dynamic partition overwrite feature. That actually works by doing lots of renaming, so misses the entire point of zero rename committers.

the "partioned" committer was written by netflix for their use case of updating/overwriting single partitions in an active table. it should work for you as it is the same use case.

consult the documentation

Sign up to request clarification or add additional context in comments.

6 Comments

Thank you for the pointer! I've set the following configurations: self.spark.conf.set("spark.hadoop.fs.s3a.committer.name", "partitioned") self.spark.conf.set("fs.s3a.committer.staging.conflict-mode", "replace") This appears to be working. However, I still see a line in the logs - 22/01/17 17:09:53 INFO FileOutputCommitter: Saved output of task 'attempt_202201171709455031776784011683221_0006_m_000000_10' to s3a://mybucket/_temporary/0/task_202201171709455031776784011683221_0006_m_000000 Why is it still saving temp data to s3?
there;s more binding needed than that I'm afraid, look at the s3a committer docs and;or other SO posts. when the SO posts and docs don't agree -go with the docs
Steve, so, basically, if a person is running spark in kubernetes with small local storage and doesn't have hdfs, they are screwed and have to use the slow file committer?
I think the documentation that you provide should explicitly state, that magic committer should not be used in this situation, when partitionOverwriteMode is set to dynamic, that it's behavior is wrong. Or maybe it should fail when this setting is on. I just ran a spark job with with partitionOverwriteMode=dynamic and magic committer and noticed that it appended data to partitions, instead of replacing it.
I've actually tried to fix it in the past; see the spark/hadoop jiras. now that spark is hadoop 3.3.5+ only, I can revisit that as it will be possible to do properly.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.