3

To load a large dataset into Polars efficiently one can use the lazy API and the scan_* functions. This works well when we are performing an aggregation (so we have a big input dataset but a small result). However, if I want to process a big dataset in it's entirety (for example, change a value in each row of a column), it seems that there is no way around using collect and loading the whole (result) dataset into memory.

Is it instead possible to write a LazyFrame to disk directly, and have the processing operate on chunks of the dataset sequentially, in order to limit memory usage?

3 Answers 3

10

Edit (2023-01-08)

Polars' has growing support for streaming/out of core processing.

To run a query streaming collect your LazyFrame with collect(streaming=True).

If the result does not fit into memory, try to sink it to disk with sink_parquet.

Old answer (not true anymore).

Polars' algorithms are not streaming, so they need all data in memory for the operations like join, groupby, aggregations etc. So writing to disk directly would still have those intermediate DataFrames in memory.

There are of course things you can do. Depending on the type of query you do, it may lend itself to embarrassingly parallellizaton. A sum could for instance easily be computed in chunks.

You could also process columns in smaller chunks. This allows you to still compute harder aggregations/ computations.

Use lazy

If you have many filters in your query and polars is able to do them at the scan, your memory pressure is reduced to the selectivity ratio.

Sign up to request clarification or add additional context in comments.

2 Comments

I'm getting this error PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'. What would be the difference between lazy_frame.collect(streaming=True).write_parquet() and lazy_frame.sink_parquet()?
@Guz Same here , It seems sink_parquet only supports some limited number of predicates/filter/joins before using it. It is working fine if you directly send the output of scan_parquet with a .select() columns option to sink_parquet
0

I just encountered a case where Polars manages memory much better using Lazy. When using the join function I highly recommend using scan_csv/scan_parquet/scan_ipc if memory is an issue.

import polars as pl

# combine datasets
PATH_1 = "/.../big_dataset.feather"
PATH_2 = "/.../other_big_dataset.feather"

big_dataset_1 = pl.scan_ipc(PATH_1)
big_dataset_2 = pl.scan_ipc(PATH_2)

big_dataset_expanded = big_dataset_1.join(
    big_dataset_2, right_on="id_1", left_on="id_2", how="left"
)
big_dataset_expanded = big_dataset_expanded.collect()

1 Comment

scan_ipc or scan_parquet it self returns a LazyFrame . In the end when you collecting it to convert it into a DataFrame , it throws an error if the file size is out of memory.
0

I'll provide some examples since Ritchie already explained it. This shows how to write a LazyFrame to disk (streaming to file) without loading it all into memory:

(pl.LazyFrame({'a':'word', 'b':'word2'})
 .sink_parquet('test.parquet')
)

This is how to do the same for operations that aren't currently supported by streaming - like .concat_list()

(pl.LazyFrame({'a':'word', 'b':'word2'})
 .with_columns(joined = pl.concat_list(pl.col('a'), 
                                       pl.col('b'))
              )
 .collect(streaming=True)
 .write_parquet('test.parquet')
)
pl.scan_parquet('test.parquet').fetch()

shape: (1, 3)
┌──────┬───────┬───────────────────┐
│ a    ┆ b     ┆ joined            │
│ ---  ┆ ---   ┆ ---               │
│ str  ┆ str   ┆ list[str]         │
╞══════╪═══════╪═══════════════════╡
│ word ┆ word2 ┆ ["word", "word2"] │
└──────┴───────┴───────────────────┘

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.