My task is simple, i have a binary file that needs to be split into 8byte chunks where first 4bytes contain data (to be decoded in later step) the 2nd 4byte contain an int (time offset in ms). Some of these 8byte blocks are repeated - i want them to be removed. If at the end i have a reminder less than 8byte i mark it. My problem is the runtime behavior: 200kb -> 39s 1,200kb -> 2:02min 6 Mb -> 49min
Rendering my approach useless. Here is the code:
def process(self, df: DataFrame):
"""
Split byte arrays into 8-byte chunks before transforming to binary.
"""
# Define temporary column names
byte_offset_col = "byte_offset"
chunk_raw_col = "chunk_raw"
chunk_hex_col = "chunk_hex"
wrong_length = "wrong_length"
# Processing
df_offset_bytes = df.withColumn(
byte_offset_col,
f.expr(f"sequence(0, length({GamaExtractionColumns.CONTENT}) - 1, {self.chunk_size})"), # Generate offsets
)
df_split_to_chunk_array = df_offset_bytes.withColumn(
chunk_raw_col,
f.expr(
f"transform({byte_offset_col}, x -> substring({GamaExtractionColumns.CONTENT}, x + 1, {self.chunk_size}))"
), # Extract chunks
)
df_chunk_rows = df_split_to_chunk_array.withColumn(
self.BYTE_CHUNKS,
f.explode(f.col(chunk_raw_col)), # Explode into rows
)
df_hex_values = df_chunk_rows.withColumn(
chunk_hex_col,
f.expr(f"hex({self.BYTE_CHUNKS})"), # Convert chunk to hex
)
df_check_length = df_hex_values.withColumn(
wrong_length,
f.when((f.length(self.BYTE_CHUNKS) < self.chunk_size), f.lit(1)).otherwise(f.lit(0)),
)
df_wrong_length = df_check_length.filter(f.col(wrong_length) == 1)
if not df_wrong_length.isEmpty():
self.logger.error(
f"Found entry with wrong lenth in file {df_wrong_length.select([GamaExtractionColumns.PATH]).first()[0]}."
)
df_result = df_check_length.filter(f.col(wrong_length) == 0).drop(
GamaExtractionColumns.CONTENT, byte_offset_col, chunk_raw_col, chunk_hex_col, wrong_length
)
return df_result.distinct()
Removing distinct doubles the speed for small file for 6MB file i gain 9min. So it helps but i need to get rid of this exponential behavior. Any advice how to approach this - i am open to completely rework it if necessary. How to get O(n)? (IMHO this should be possible)
Update: My assumption is/was that spark may be not the best fit for small data - like small files. but at the moment it works only with small files / small data. I hope i am just using spark wrong?!
2nd:
Currently i am using spark.read.format("binaryFile").option("pathGlobFilter", "*.gama").load(folder) to read the file. mainly it abstract the file system (local works as welss as hdfs or azure as long the driver is present)
So a possible solution should not loose this nice abstraction