13

When I write out a dataframe to, say, csv, a .csv file is created for each partition. Suppose I want to limit the max size of each file to, say, 1 MB. I could do the write multiple times and increase the argument to repartition each time. Is there a way I can calculate ahead of time what argument to use for repartition to ensure the max size of each file is less than some specified size.

I imagine there might be pathological cases where all the data ends up on one partition. So make the weaker assumption that we only want to ensure that the average file size is less than some specified amount, say 1 MB.

5
  • 1
    do you need the output of the dataframe which partitioned to create output .csv files in each partition with file size less than 1MB and you are okay with mutiple part files within a partition ?? Commented Aug 27, 2018 at 17:26
  • Yes, I'm okay with multiple part files within a partition. (I can't tell if you're asking one or two questions.) Commented Aug 27, 2018 at 17:43
  • df.cache.foreach( => ) val catalyst_plan = df.queryExecution.logical val df_size_in_bytes = spark.sessionState.executePlan( catalyst_plan).optimizedPlan.stats.sizeInBytes Commented Aug 30, 2018 at 19:08
  • Bit tangential, but log4j/slf4j have the ability to automatically split files so they don't exceed a specific size. Might work for your case? Commented Sep 3, 2018 at 13:57
  • One of the reasons your question didn't receive much attention is that it was missing the apache-spark tag. (40K+ questions against 12K+) Commented Sep 3, 2018 at 15:06

2 Answers 2

19
+500

1. Single dataframe solution

I was trying to find out some clever idea that would not kill the cluster at the same time and the only thing that came to my mind was:

  1. Calculate the size of the serialized row
  2. Get no. of rows in your DataFrame
  3. Repartition, by dividing with the expected size
  4. Should work?

The code should look more like this:

val df: DataFrame = ??? // your df
val rowSize = getBytes(df.head)
val rowCount = df.count()
val partitionSize = 1000000 // million bytes in MB?
val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt
df.repartition(noPartitions).write.format(...) // save to csv

// just helper function from https://stackoverflow.com/a/39371571/1549135
def getBytes(value: Any): Long = {
  val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
  val oos = new ObjectOutputStream(stream)
  oos.writeObject(value)
  oos.close
  stream.toByteArray.length
}

While my first choice was to calculate each row's byte size, that would be terribly inefficient. So, unless your data size in each row differs in size greatly, I would say that this solution will work. You can also calculate every n-th row size. You got the idea.

Also, I just 'hope' that Long will be big enough to support the expected size to calculate noPartitions. If not (if you have a lot of rows), maybe it would be better to change the operations order, f.e.:

val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt

again this is just a drafted idea with no domain knowledge about your data.

2. Cross system solution

While going through the apache-spark docs I have found an interesting cross-system solution:

spark.sql.files.maxPartitionBytes which sets:

The maximum number of bytes to pack into a single partition when reading files.

The default value is 134217728 (128 MB).

So I suppose you could set it to 1000000 (1MB) and it will have a permanent effect on your DataFrames. However, too small partition size may greatly impact your performance!

You can set it up, during SparkSession creation:

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.sql.files.maxPartitionBytes", 100000)
  .getOrCreate()

All of above is only valid if (I remember correctly and) the csv is partitioned with the same number of files as there are partitions of DataFrame.

Sign up to request clarification or add additional context in comments.

6 Comments

SparkSQL uses Tungsten to serialize rows, which is vastly more efficient than java or Kryo serialization. So your size evaluation is probably much higher that what it really takes, but that's a start. databricks.com/blog/2015/04/28/…
I knew about Tungsten, but I am not using Spark on daily basis and I do not know how to use it for serialization. I just posted a general idea. You are right, though and it might give this solution a nice performance boost.
It's actually spark anti-pattern to create very small partitions.
@eliasah that is what OP wanted and I posted all my concerns along with the answer.
I know. I saw that in your highlighted comment. I just wanted to clear up that it's actually an anti-pattern.
|
1
    val df = spark.range(10000000)
    df.cache     
    val catalyst_plan = df.queryExecution.logical
    val df_size_in_bytes = spark.sessionState.executePlan(catalyst_plan).optimizedPlan.stats.sizeInBytes

df_size_in_bytes: BigInt = 80000000

The best solution would be take 100 records and estimate the size and apply for all the rows as the above example

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.