I am working on a Python project where I need to process a large CSV file using Polars. Currently, I am reading the CSV file inside each parallel process, which works but is inefficient because the read_csv operation is repeated for each process.
To improve performance, I tried reading the CSV file once before running the parallel processes and passing the resulting DataFrame to each process. However, I noticed that the code took significantly longer to execute in this setup.
Here’s a simplified version of my workflow:
import polars as pl
from concurrent.futures import ProcessPoolExecutor
def process_data(df, numbers):
# Filter and process data (example)
return df.filter(pl.col("Points").is_in(numbers))
# Reading inside each process
def worker(task_param):
df = pl.read_csv("path/to/file.csv")
return process_data(df, task_param)
with ProcessPoolExecutor(max_workers=4) as executor:
task_params_list = [[10, 20], [30, 40], [50, 60], [70, 80]]
results = list(executor.map(worker, task_params_list))
When I moved the pl.read_csv outside the worker function and tried to share the DataFrame across processes, the execution became slower:
df = pl.read_csv("path/to/file.csv")
def process_data(df, numbers):
# Filter and process data (example)
return df.filter(pl.col("Points").is_in(numbers))
def worker(task_param):
df = task_param[0]
numbers = task_param[1:]
return process_data(df, numbers)
with ProcessPoolExecutor(max_workers=4) as executor:
task_params_list = [[df, 10, 20], [df, 30, 40], [df, 50, 60], [df, 70, 80]]
results = list(executor.map(worker, task_params_list))
I suspect the slowdown is due to the DataFrame being serialized and copied to each process, but I'm not sure how to address this.
My questions are: How can I avoid repeated reading of the CSV file in each parallel process while maintaining good performance? Is there a better approach to share or reuse a Polars DataFrame across parallel processes? Any guidance or insights would be greatly appreciated.