I am trying to import very large csv files into parquet files using polars. I stream data, use lazy dataframes and sinks. No problem until...
...sorting the dataframe on a column and removing duplicates. Requirement that can't be skipped is that the data written to parquet must be unique by the 'datetime' column and sorted by the same column. The bottleneck is the sorting and removing duplicates. My understanding is that the data must be fully in memory in order to remove duplicates and sort. There is no guarantee the source data is sorted or has no duplicates.
Writing the dataframe to parquet unsorted and without checking for duplicates is no problem and results in parquet files about 3-4GB in size. But reading them in and sorting and applying unique() explodes memory consumption to above 128GB which is the memory limit of my host (I run the code on Ubuntu in WSL2). I already allocated the maximum amount of memory to WSL2 and confirmed that it does have access to the entire amount of memory. At some point sorting and removing duplicates crashes the WSL VM. I do not seem to be able to efficiently sort and remove duplicates.
Can you please help suggest a better approach than I currently take as follows:
def import_csv(self, symbol_id: str, data_source: DataSource, data_type: DataType, source_files: List[str], column_schema: List[ColumnSchema]) -> None:
#ensure 1 or 2 source files are provided
if len(source_files) != 1 and len(source_files) != 2:
raise ValueError(f"Can only process 1 or 2 source files for symbol {symbol_id}")
#obtain new df
new_df = self._csv_to_dataframe(source_files, column_schema)
#filter out duplicates and sort by datetime
new_df = new_df.unique(subset="datetime")
new_df = new_df.sort("datetime")
#merge with existing data if it exists
path_filename = self.base_directory / f"{symbol_id}_{data_source.value}_{data_type.value}.parquet"
if path_filename.exists():
old_df = pl.scan_parquet(path_filename, glob=False)
df = pl.concat([old_df, new_df], how="vertical")
else:
df = new_df
#write to parquet
df.sink_parquet(path_filename, engine="streaming")
#update metadata
# self._update_metadata(symbol_id, data_source, data_type, len(df), df["datetime"].first(), df["datetime"].last())
#logging
# self.logger.info(f"Imported {len(df)} rows for {symbol_id} from {df["datetime"].first()} to {df["datetime"].last()}")
def _csv_to_dataframe(self, source_files: list[str], column_schema: List[ColumnSchema]) -> pl.LazyFrame:
# Generate Polars expressions for column transformations
expressions = self._generate_polars_expressions(column_schema)
dfs = []
for source_file in source_files:
df = pl.scan_csv(source_file, has_header=True, glob=False).select(expressions)
dfs.append(df)
if len(dfs) == 1:
df = dfs[0]
else:
df = pl.concat(dfs, how="vertical")
df = df.group_by("datetime").mean()
return df
def _generate_polars_expressions(self, schema: list[ColumnSchema]) -> list[pl.Expr]:
expressions = []
for col_schema in schema:
# Create a base expression from the source column name
expr = pl.col(col_schema.source_column_name)
# Handle special cases based on the target data type
if col_schema.dtype == pl.Datetime:
# Ensure datetime format is provided
if col_schema.datetime_format is None:
raise ValueError(
f"Datetime format is required for column '{col_schema.source_column_name}'"
)
# For datetime, we first parse the string with the specified format
expr = expr.str.to_datetime(format=col_schema.datetime_format, time_unit=self.time_unit, time_zone=col_schema.from_timezone)
#always convert to default timezone
expr = expr.dt.convert_time_zone(self.data_timezone)
else:
# For other dtypes, a simple cast is sufficient
expr = expr.cast(col_schema.dtype)
# Alias the expression with the target column name
final_expr = expr.alias(col_schema.target_column_name)
# Add the final expression to the list
expressions.append(final_expr)
return expressions
sortprogram will put k-way mergesort fragments in $TMPDIR if RAM is limited. You may find the --buffer-size switch is useful. (2.) Stream it to an RDBMS table, perhaps using the builtin sqlite. Then stream it out with an ORDER BY clause. Let the DB worry about memory management details -- that's what it's there for. We use a relational database for problems that are "bigger than memory".