2

I am trying to import very large csv files into parquet files using polars. I stream data, use lazy dataframes and sinks. No problem until...

...sorting the dataframe on a column and removing duplicates. Requirement that can't be skipped is that the data written to parquet must be unique by the 'datetime' column and sorted by the same column. The bottleneck is the sorting and removing duplicates. My understanding is that the data must be fully in memory in order to remove duplicates and sort. There is no guarantee the source data is sorted or has no duplicates.

Writing the dataframe to parquet unsorted and without checking for duplicates is no problem and results in parquet files about 3-4GB in size. But reading them in and sorting and applying unique() explodes memory consumption to above 128GB which is the memory limit of my host (I run the code on Ubuntu in WSL2). I already allocated the maximum amount of memory to WSL2 and confirmed that it does have access to the entire amount of memory. At some point sorting and removing duplicates crashes the WSL VM. I do not seem to be able to efficiently sort and remove duplicates.

Can you please help suggest a better approach than I currently take as follows:

   def import_csv(self, symbol_id: str, data_source: DataSource, data_type: DataType, source_files: List[str], column_schema: List[ColumnSchema]) -> None:
        
        #ensure 1 or 2 source files are provided
        if len(source_files) != 1 and len(source_files) != 2:
            raise ValueError(f"Can only process 1 or 2 source files for symbol {symbol_id}")

        #obtain new df
        new_df = self._csv_to_dataframe(source_files, column_schema)

        #filter out duplicates and sort by datetime
        new_df = new_df.unique(subset="datetime")
        new_df = new_df.sort("datetime")

        #merge with existing data if it exists
        path_filename = self.base_directory / f"{symbol_id}_{data_source.value}_{data_type.value}.parquet"
        if path_filename.exists():
            old_df = pl.scan_parquet(path_filename, glob=False)
            df = pl.concat([old_df, new_df], how="vertical")
        else:
            df = new_df

        #write to parquet
        df.sink_parquet(path_filename, engine="streaming")
      
        #update metadata
        # self._update_metadata(symbol_id, data_source, data_type, len(df), df["datetime"].first(), df["datetime"].last())

        #logging
        # self.logger.info(f"Imported {len(df)} rows for {symbol_id} from {df["datetime"].first()} to {df["datetime"].last()}")


    def _csv_to_dataframe(self, source_files: list[str], column_schema: List[ColumnSchema]) -> pl.LazyFrame:

        # Generate Polars expressions for column transformations
        expressions = self._generate_polars_expressions(column_schema)

        dfs = []
        for source_file in source_files:
            df = pl.scan_csv(source_file, has_header=True, glob=False).select(expressions)
            dfs.append(df)
        
        if len(dfs) == 1:
            df = dfs[0]
        else:
            df = pl.concat(dfs, how="vertical")
            df = df.group_by("datetime").mean()

        return df

    def _generate_polars_expressions(self, schema: list[ColumnSchema]) -> list[pl.Expr]:
    
        expressions = []
        for col_schema in schema:
            # Create a base expression from the source column name
            expr = pl.col(col_schema.source_column_name)

            # Handle special cases based on the target data type
            if col_schema.dtype == pl.Datetime:
                
                # Ensure datetime format is provided
                if col_schema.datetime_format is None:
                    raise ValueError(
                        f"Datetime format is required for column '{col_schema.source_column_name}'"
                    )
                
                # For datetime, we first parse the string with the specified format
                expr = expr.str.to_datetime(format=col_schema.datetime_format, time_unit=self.time_unit, time_zone=col_schema.from_timezone)
                
                #always convert to default timezone
                expr = expr.dt.convert_time_zone(self.data_timezone)  
            else:
                # For other dtypes, a simple cast is sufficient
                expr = expr.cast(col_schema.dtype)

            # Alias the expression with the target column name
            final_expr = expr.alias(col_schema.target_column_name)

            # Add the final expression to the list
            expressions.append(final_expr)
        
        return expressions
4
  • 1
    Please remember that Stack Overflow is not your favourite Python forum, but rather a question and answer site for all programming related questions. Thus, always include the tag of the language you are programming in, that way other users familiar with that language can more easily find your question. Take the tour and read up on How to Ask to get more information on how this site works, then edit the question with the relevant tags. Commented Sep 10 at 13:28
  • 1
    Have you considered a divide and concur approach that let's say reads the current CSV and partitions unique rows into one of say 24 temp files (based on key hour for example) that you could then sort and merge back into the main file? Commented Sep 10 at 16:11
  • 2
    (1.) Stream it to temp.csv, call /usr/bin/sort on that, and do streaming import of the .csv. The sort program will put k-way mergesort fragments in $TMPDIR if RAM is limited. You may find the --buffer-size switch is useful. (2.) Stream it to an RDBMS table, perhaps using the builtin sqlite. Then stream it out with an ORDER BY clause. Let the DB worry about memory management details -- that's what it's there for. We use a relational database for problems that are "bigger than memory". Commented Sep 10 at 16:33
  • 1
    "Out-of-core" sort (or group_by) is not yet implemented in the new streaming engine: github.com/pola-rs/polars/issues/20947 Commented Sep 11 at 7:44

1 Answer 1

1

Here is a divide and conquer approach using the newish partition utility in 2-4 rounds depending on what you know in advance and how you want the final output.

Imports/constant

from pathlib import Path

import numpy as np
import polars as pl
import polars.io.partition as part
from polars import col as c

NUM_ROWS_PER_FILE = 2500

NUM_ROWS_PER_FILE is something you almost certainly want to make much bigger but I don't know what the right size is for your data/system.

Mock data source

df = pl.select(
    dt=pl.datetime_range(pl.datetime(2025, 1, 1), pl.datetime(2025, 6, 1), "1m")
)
df = df.with_columns(rnd=np.random.uniform(-5, 4, df.height))
df = (
    df.with_columns(
        pl.col("dt").repeat_by(pl.col("rnd").cast(pl.Int32).clip(1, 10))
    )
    .explode("dt")
    .sample(fraction=1.0, shuffle=True)
)

It's just a df with a bunch of datetimes and randomly some of them are duplicated.

Round 1

rnd1_path = Path("./experiment/round1")
rnd1_path.mkdir(exist_ok=True, parents=True)
stats = []
rnd1_part = part.PartitionMaxSize(
    rnd1_path,
    max_size=NUM_ROWS_PER_FILE,
    per_partition_sort_by="dt",
    finish_callback=lambda x, stats=stats: stats.append(
        x
    ),  # this gives us overall statistics
)
df.lazy().sink_parquet(rnd1_part)
stats = stats[0].unnest("dt_stats")

In this round we use the PartitionMaxSize partition strategy to make sure we have manageable files we can work with one at a time.

Round 2

time_per_rows = stats.select(
    (c.upper_bound.max() - c.lower_bound.min()) / c.num_rows.sum()
).item()

## For consistency, we'll assume we want to split these up into approximately same number of files but
## partitioned by dt ranges

dt_thresholds = pl.select(i=pl.int_range(0, stats.height)).with_columns(
    stats.select(c.lower_bound.min()).item()
    + NUM_ROWS_PER_FILE * time_per_rows * c.i
)

part_expr = pl.when(
    c.dt.is_between(dt_thresholds.item(0, 1), dt_thresholds.item(1, 1))
).then(pl.lit("group0"))
for i in range(1, dt_thresholds.select(c.i.max()).item()):
    part_expr = part_expr.when(
        c.dt.is_between(dt_thresholds.item(i, 1), dt_thresholds.item(i + 1, 1))
    ).then(pl.lit(f"group{i}"))
part_expr = part_expr.otherwise(pl.lit("leftover"))
rnd2_path = Path("./experiment/round2")
rnd2_path.mkdir(exist_ok=True, parents=True)
rnd2_part = part.PartitionParted(
    rnd2_path,
    by=part_expr,
    per_partition_sort_by="dt",
)
pl.scan_parquet(rnd1_path).sink_parquet(rnd2_part, mkdir=True)

In this round, we first create groups of datetime ranges that we want our files to represent using the statistics from the previous round. If you know upfront what those datetime ranges ought to be then you can skip round 1. When you're calling PartitionByKey you can set per_partition_sort_by which would make round 3 faster as the sorting will be done. I don't know what the net effect on total time taken would be, you'd just have to experiment.

Round 3

for root, _, files in rnd2_path.walk():
    if len(files) == 0:
        continue
    for _file in files:
        new_target = str(root / _file).replace("round2", "round3")
        pl.scan_parquet(root / _file).unique(
            "dt", maintain_order=True
        ).sink_parquet(new_target, mkdir=True)

In this round we open all the files from round 2, sort/deduplicate, and resave. Since each file represents a distinct datetime range, we can be confident that deduplicating the parts, deduplicates the whole.

Round 4

rnd3_path = Path(str(rnd2_path).replace("round2", "round3"))
rnd4_path = Path(str(rnd2_path).replace("round2", "round4")) / "0000.parquet"

pl.scan_parquet(rnd3_path).drop("literal").sink_parquet(rnd4_path, mkdir=True)

In this round you just resave all the files from round 3 but save them in a single file. This is, of course, optional, if you want a single file output.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you, I appreciate the effort, but in the end I decided to go with a custom partitioning of files by date ranges. I read in massive csv files in chunks and cache smaller blocks of parsed data grouped by those date ranges. Every now and then some of the groups are then pushed from the cache into storage by reading files of matching date ranges and sorting and removing duplicates on those smaller subsets of data which works well with my memory constraints of 128GB.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.