2

My task is simple, i have a binary file that needs to be split into 8byte chunks where first 4bytes contain data (to be decoded in later step) the 2nd 4byte contain an int (time offset in ms). Some of these 8byte blocks are repeated - i want them to be removed. If at the end i have a reminder less than 8byte i mark it. My problem is the runtime behavior: 200kb -> 39s 1,200kb -> 2:02min 6 Mb -> 49min

Rendering my approach useless. Here is the code:

def process(self, df: DataFrame):
    """
    Split byte arrays into 8-byte chunks before transforming to binary.
    """

    # Define temporary column names
    byte_offset_col = "byte_offset"
    chunk_raw_col = "chunk_raw"
    chunk_hex_col = "chunk_hex"
    wrong_length = "wrong_length"

    # Processing
    df_offset_bytes = df.withColumn(
        byte_offset_col,
        f.expr(f"sequence(0, length({GamaExtractionColumns.CONTENT}) - 1, {self.chunk_size})"),  # Generate offsets
    )

    df_split_to_chunk_array = df_offset_bytes.withColumn(
        chunk_raw_col,
        f.expr(
            f"transform({byte_offset_col}, x -> substring({GamaExtractionColumns.CONTENT}, x + 1, {self.chunk_size}))"
        ),  # Extract chunks
    )

    df_chunk_rows = df_split_to_chunk_array.withColumn(
        self.BYTE_CHUNKS,
        f.explode(f.col(chunk_raw_col)),  # Explode into rows
    )

    df_hex_values = df_chunk_rows.withColumn(
        chunk_hex_col,
        f.expr(f"hex({self.BYTE_CHUNKS})"),  # Convert chunk to hex
    )

    df_check_length = df_hex_values.withColumn(
        wrong_length,
        f.when((f.length(self.BYTE_CHUNKS) < self.chunk_size), f.lit(1)).otherwise(f.lit(0)),
    )

    df_wrong_length = df_check_length.filter(f.col(wrong_length) == 1)

    if not df_wrong_length.isEmpty():
        self.logger.error(
            f"Found entry with wrong lenth in file {df_wrong_length.select([GamaExtractionColumns.PATH]).first()[0]}."
        )

    df_result = df_check_length.filter(f.col(wrong_length) == 0).drop(
        GamaExtractionColumns.CONTENT, byte_offset_col, chunk_raw_col, chunk_hex_col, wrong_length
    )

    return df_result.distinct()

Removing distinct doubles the speed for small file for 6MB file i gain 9min. So it helps but i need to get rid of this exponential behavior. Any advice how to approach this - i am open to completely rework it if necessary. How to get O(n)? (IMHO this should be possible)

Update: My assumption is/was that spark may be not the best fit for small data - like small files. but at the moment it works only with small files / small data. I hope i am just using spark wrong?!

2nd: Currently i am using spark.read.format("binaryFile").option("pathGlobFilter", "*.gama").load(folder) to read the file. mainly it abstract the file system (local works as welss as hdfs or azure as long the driver is present) So a possible solution should not loose this nice abstraction

3 Answers 3

2

You’re blowing up your DataFrame with explode() and then calling distinct(), which will always kill performance as your data grows. If you’re only dealing with files of a few megabytes, you could drop Spark entirely and just stream through the file in Python, using a set() to filter duplicates in one linear pass.

If you need to stay in Spark, use the low‑level RDD API: read fixed size records with sc.binaryRecords(path, 8), filter out any short tail, then map each chunk to a key and reduceByKey to collapse duplicates in a single shuffle. And if you really want DataFrames, repartition by the chunk column and dedupe within each partition (via mapInPandas or mapPartitions), which avoids the two‑phase shuffle of distinct().

Sign up to request clarification or add additional context in comments.

2 Comments

distinct seems not to be the main problem, removing it just gives a certain factor. regarding use of spark: in this case the bigger the file the more useless spark becomes - it should be the other way around?
You're right, distinct isn’t the real culprit but your explode()→distinct() forces a full shuffle of every 8 B chunk, so Spark’s JVM/scheduler/shuffle overhead swamps you as files grow. For small files, a one‑pass Python stream + set() is far faster. At scale, you can use Spark’s binaryRecords→reduceByKey or repartition‑and‑in‑partition dedupe to keep it O(n) with only a single shuffle.
1

As @Salt mentioned distinct() will enforce a full shuffle and it will try to get all columns differences. So, more columns you have the slower it gets.

That says if you can distinct on fewer columns (such as primary key), that should make it performant.

If you need to distinct on all columns, you can create a hash value for a row and distinct on that hash column.

(df.withColumn('hash', F.sha(F.concat(*[F.col(x) for x in df.columns])))
 .dropDuplicates(['hash'])
).count()

With 60k records and 30 columns

%%time
df.distinct().count()

# CPU times: user 10.4 ms, sys: 17 ms, total: 27.3 ms
# Wall time: 21.4 s
%%time
(df.withColumn('hash', F.sha(F.concat(*[F.col(x) for x in df.columns])))
 .dropDuplicates(['hash'])
).count()

# CPU times: user 9.9 ms, sys: 9.01 ms, total: 18.9 ms
# Wall time: 3.2 s

4 Comments

as said distinct is not the real problem. removing it speeds up but the exponential increase stays - 6MB from 49min to 40min does not help. in my case some of the 8byte blocks have duplicates. The main problem seems to be in explode. For me the only viable spark solution to read in binary file - you need to split it.
There seems to be the other issues. explode can be one but before that how many files do you have for each experiment for 200KB and 6MB? Since if 6MB is fragmented over multiple small files, that is also make Pyspark slow.
@dermoritz what does the spark plan say? we could get some leads there.
not sure what you mean with spark plan, i looked into the spark ui and most of the time - nearly all is used with a function "isEmpty". this function is called multiple times and it takes 99% of run time for bigger files. if you give me hint what exactly i should include in my question i will try.
0

With hint from @Salt i looked in RDD api and my solution is now

  • still reading using:

    spark.read.format("binaryFile").option("pathGlobFilter", "*.gama").load(folder)
    
  • for splitting going into rdd: - this is what fixes the issue actually

    df.rdd.flatMap(self.__split_chunks)
    
  • recreating a DF using the original schema and adding one for an added column:

    schema = df.schema.add(StructField(time_offset_ms, IntegerType(), False))
    df_chunks = df.sparkSession.createDataFrame(rdd, schema)
    

The flatMap function is just pure python with

__split_chunks(self, row)

Not sure if staying in df and using an UDF would be also a good approach. In fact i don't know the real difference between DataFrame with UDF or RDD with pure python.

But the approach present is working flawlessly and has O(n) complexity what was main concern.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.