It almost the same as in: Memory Not Released After Each Request Despite Cleanup Attempts but instead we use pipe because we want to receive data from processing and use it going forward
1- Process the first Dataframe df1(ressource are freed when value are sent) then send those value through pipe to our two cosumers
2- Each cosumer process it's own dataframe(s) after each processing is done ressource are freed
import polars as pl
from multiprocessing import Process, Pipe
import gc
import ctypes
def malloc_trim():
try:
ctypes.CDLL("libc.so.6").malloc_trim(0)
except:
pass
# Load df1, produce df2 and df3, send to both consumers
# Producer: Loads input, prepares dataframes, and sends to consumers
def producer_load_and_distribute(pipe_for_both, pipe_for_df3):
df1 = pl.read_parquet("input.parquet")
# add processing logic for df2
df2 = df1.filter(pl.col("value") > 10)
# add processing logic for df3
df3 = df1.select(["id", "value"]).with_columns(
(pl.col("value") * 2).alias("double_value")
)
# serialize to arrow
df2_arrow = df2.to_arrow()
df3_arrow = df3.to_arrow()
# Send to both consumers
pipe_for_both.send((df2_arrow, df3_arrow))
pipe_for_df3.send(df3_arrow)
# Cleanup
del df1, df2, df3
gc.collect()
malloc_trim()
# Needs both df2 and df3
# Consumer: Receives both df2 and df3, processes, and writes output
def consumer_process_both(pipe_conn):
df2_arrow, df3_arrow = pipe_conn.recv()
# deserialize from arrow
df2 = pl.from_arrow(df2_arrow)
df3 = pl.from_arrow(df3_arrow)
# Add processing logic for df2 and df3
df5 = df2.join(df3, on="id", how="inner")
df5.write_parquet("df5.parquet")
del df2, df3, df5
gc.collect()
malloc_trim()
# Needs only df3
# Consumer: Receives only df3, processes, and writes output
def consumer_process_df3(pipe_conn):
df3_arrow = pipe_conn.recv()
df3 = pl.from_arrow(df3_arrow)
# Add processing logic for df3
df4 = df3.with_columns((pl.col("double_value") + 1).alias("incremented"))
df4.write_parquet("df4.parquet")
del df3, df4
gc.collect()
malloc_trim()
if __name__ == "__main__":
# Create pipes for each consumer
parent_both, child_both = Pipe()
parent_df3, child_df3 = Pipe()
# Producer process
p_producer = Process(target=producer_load_and_distribute, args=(child_both, child_df3))
# consumers
p_consumer_both = Process(target=consumer_process_both, args=(parent_both,))
p_consumer_df3 = Process(target=consumer_process_df3, args=(parent_df3,))
# Start
p_consumer_both.start()
p_consumer_df3.start()
p_producer.start()
# Wait for producer to finish so df1 is freed
p_producer.join()
# Wait for consumers to finish
p_consumer_both.join()
p_consumer_df3.join()