2

I have two large datasets stored in partitioned Parquet format on S3, partitioned by category_id. I need to join them on category_id and label_id using Polars and write the results to Postgres.

The problem:

  • Data is too large for memory: Calling .collect() on the joined dataframe is not feasible.
  • Writing per partition is too slow: Iterating over category_id and processing each partition takes too long (several seconds per partition).

My current approach:

import polars as pl

df_left = pl.scan_parquet(
    "s3://my-bucket/left/**/*.parquet",
    hive_partitioning=True,
)
df_right = pl.scan_parquet(
    "s3://my-bucket/right/**/*.parquet",
    hive_partitioning=True,
)

df_joined = df_left.join(df_right, on=["category_id", "label_id"], how="inner")

At this point, I would like to efficiently:

  • Process the data in a streaming fashion
  • Write the joined data to Postgres

What I have tried:

  1. Looping over unique category_ids and processing one by one.
    • Too slow, as reading Parquet partitions takes a few seconds each time.
  2. Using Polars' lazy execution (scan_parquet).
    • I cannot .collect() since it does not fit in memory.
    • I am unsure how to efficiently stream the data to Postgres.

Question

How can I efficiently join two large Parquet datasets using Polars and write the result to Postgres in a way that avoids memory issues and excessive partition reads?

Would using Polars streaming help here? Or is there a way to batch-process partitions efficiently without reading each partition multiple times?

2 Answers 2

1

What you really need is sink_generator so that you could batch your data to postgres rather than collecting it all at once. In principle, if we had sink_generator then, when you get the first batch of results, you could COPY asynchronously using psycopg3 while simultaneously getting the next batch from polars.

TL,DR (do this)

In the absence of that, I think your best bet would be to do a sink_csv to an intermediate file and then use Postgres's COPY to put the csv into the database.

pg copy vs insert

Postgres is dramatically faster with COPY rather than INSERT which is what .write_database would do.

Another approach

Pyarrow's Parquet reader lets you read a parquet file one row group at a time. You could put your left and right, each to their own postgres table by iterating over row groups and then do the join in postgres inserting to the final table. I'd treat this as a backup plan if sinking the join to a csv doesn't work for some reason.

Sign up to request clarification or add additional context in comments.

Comments

-1

I have not tested this in anyway, but it may help. You could use streaming to avoid having it all in memory at once when using .collect() and then you can process it in chunks from there. Something like:

df_joined = (
    df_left.join(df_right, on=["category_id", "label_id"], how="inner")
    .collect(streaming=True)
)

for chunk in df_joined.iter_slices(n_rows=10000):
    # Write chunk to Postgres

Or if this still runs into memory issues, if you have the disk space for it, you could write df_joined to disk, using .sink_paquet and then read that with streaming, and then similarly write the chunks.

Without testing on your data, I don't know if this will actually help the situation, but I've had luck with approaches like this when faced with similar problems before.

3 Comments

OP said the data is too big to collect so collecting up front isn't going to work.
@DeanMacGregor Correct me if I'm wrong, but the purpose of using streaming=true is specifically so that its not all collected into memory at once, according to the documentation
That needs to be qualified a bit. collect puts the results in memory whether it's with streaming or not. If the results are bigger than memory then that's not going to work. Where that is relevant is if your source data is bigger than memory but the result can fit in memory then using streaming helps. Alternatively, if you use .sink____ then that will stream results into a file so that even if results are bigger than memory the query can still finish.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.