4

For the same compression level of 10 using zstd with Parquet, I get significantly better performance in Pandas than from Apache Spark. The following files, for example, were first generated using Spark and then loaded and saved using python shell. Why is there so much discrepancy in performance? I am using native filesystem (ext4) on Ubuntu.

Within Spark:
    df
      .coalesce(1)
      .write
        .option("compression", "zstd")
        .option("compressionLevel", "10")
        .mode("overwrite")
        .parquet(parquetPath)


In Python:
    >>> import pandas as pd
    >>> df = pd.read_parquet('results/data1.parquet')
    >>> df.to_parquet('data1.parquet', engine='pyarrow', compression="zstd", compression_level=10, index=False)
    >>> df = pd.read_parquet('results/data2.parquet')
    >>> df.to_parquet('data2.parquet', engine='pyarrow', compression="zstd", compression_level=10, index=False)

Stats:
    file                   | Apache Spark | Pandas    | Pandas/Spark sizes
    ---------------------------------------------------------------------
    results/data1.parquet  |   237780532  | 172442433 | 0.72
    results/data2.parquet  |    62052301  |  41917063 | 0.67

Software:
    Apache Spark-4.0.0-preview1
    scala-2.13.14
    Java 21.0.4
    python-3.12.4
    pandas-2.2.3
    pyarrow-19.0.1

PS: Metadata information is as follows

pqt$ ls -l
-r--r--r-- 1 user user 237780532 May 20 11:55 spark.parquet

pqt$ python3
Python 3.12.4 | packaged by Anaconda, Inc. | (main, Jun 18 2024, 15:12:24) [GCC 11.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow.parquet as pq
>>> import pandas as pd
>>> df = pd.read_parquet('spark.parquet')
>>> df.to_parquet('pandas.parquet', engine='pyarrow', compression="zstd", compression_level=10, index=False)
>>> parquet_file = pq.ParquetFile('spark.parquet')
>>> print(parquet_file.metadata)
<pyarrow._parquet.FileMetaData object at 0x11e8091de2a0>
  created_by: parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)
  num_columns: 3
  num_rows: 20000
  num_row_groups: 2
  format_version: 1.0
  serialized_size: 1001
>>> parquet_file = pq.ParquetFile('pandas.parquet')
>>> print(parquet_file.metadata)
<pyarrow._parquet.FileMetaData object at 0x11e808d57fb0>
  created_by: parquet-cpp-arrow version 19.0.1
  num_columns: 3
  num_rows: 20000
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 1905
>>>

pqt$ ls -l
-rw-rw-r-- 1 user user 172442433 May 20 12:01 pandas.parquet
-r--r--r-- 1 user user 237780532 May 20 11:55 spark.parquet

pqt$ bc
scale = 2
172442433 / 237780532
.72

pqt$
7
  • 1
    Did you have a chance to compare metadata (number of rowgroups, compression algos, etc.) of parquet files generated by both? Commented May 19 at 12:35
  • Could you please tell me what exactly I should be looking for? There are thousands of columns present so I am not sure how to dump the metadata. Any suggestions pl? Commented May 19 at 23:31
  • @mazaneicha I have now added metadata information for a simple file with 20K lines. Commented May 20 at 6:37
  • 1
    So, apart from the apparent difference in parquet format versions (arrow's is much newer), there is one rowgroup in arrow's file while spark's has two. Each rowgroup carries its own metadata which can, at least partially, explain size difference. If you really want to get to the bottom of it, you can probably use something like parquet-tools meta <file> (renamed parquet-cli in later versions) to review and compare file structure, including column encodings and sizes. github.com/apache/parquet-java/tree/parquet-1.10.x/parquet-cli Commented May 20 at 15:13
  • @mazaneicha this is a bug and I added it it my reply. Commented May 25 at 3:59

1 Answer 1

1

The previous answer, shown below, is wrong. The config parameters are to be set differently to be able to get level 10 compression. I used the config parameters as shown in SPARK-39743.

The wrong config was:

    .option("compression", "zstd")
    .option("compressionLevel", "10")

The correct config is:

    .config("spark.sql.parquet.compression.codec", "zstd")
    .config("parquet.compression.codec.zstd.level", 10)
    .config("parquet.enable.dictionary", "true")

PS: I still find that the files (checked via md5sum) are sometimes not identical. Probably internal meta data arrangement is not deterministic?


Previous (wrong) answer.


Spark's parquet with zstd has an open bug and results only in level 3 compression. It is sad that it is still not fixed in Apache 4.0preview.

I verified the same with the following steps to reproduce the issue:

First, the difference in meta between the parquet files generated by pandas vs spark:

    created_by: parquet-cpp-arrow version 19.0.1      created_by: parquet-mr version 1.13.1 (build db41...
    num_row_groups: 1                                 num_row_groups: 2
    format_version: 2.6                               format_version: 1.0
    serialized_size: 1905                             serialized_size: 1001
    max_definition_level: 1                           max_definition_level: 0
    compression: ZSTD (space_saved: 69%)              compression: ZSTD (space_saved: 84%)
    compression: ZSTD (space_saved: 94%)              compression: ZSTD (space_saved: 91%)
    compression: ZSTD (space_saved: 84%)              compression: ZSTD (space_saved: 79%)

The differences in file sizes:

    >>> import pandas as pd
    >>> df = pd.read_parquet('spark.parquet')
    >>> df.to_parquet('pandas.parquet', engine='pyarrow', compression="zstd", compression_level=3, index=False)
    >>>
    $ ls -l *t
    -rw-rw-r-- 1 bss bss 235850125 May 22 14:23 pandas.parquet
    -r--r--r-- 1 bss bss 237780532 May 20 11:55 spark.parquet
    $
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.