My problem is part of a very large and complicated project. I don't want to share all the details here as it's too much, but I want to describe the main parts in an example program.
I have a large dataset in a class named ModelData. This dataset is shared among about 20 providers. The main goal is to perform various operations on the data to collect information. The providers are designed in such a way that they use data from other providers, so they interact with each other as well.
The problem now is that the initialization of the providers and data collection takes a very long time. I want to optimize this process.
I created a dependency tree to determine the order in which providers should be initialized. My idea was to use Python's multiprocessing module to initialize multiple providers simultaneously. I read some articles about Python's multiprocessing.Manager to share data between multiple processes. Then I came up with the solution I'll paste here. I tried to use Events to optimize it, but now it takes longer than when I initialized all providers one at a time.
This is The Main.py
import sys
from multiprocessing.managers import BaseManager
def generate_random_data(rows, cols):
return pd.DataFrame(np.random.rand(rows, cols), columns=[f'col_{i}' for i in range(cols)])
def main():
input_data = {
f'dataframe_{i}': generate_random_data(3000, 3000) # 1000 rows and 50 columns for each DataFrame
for i in range(20)
}
BaseManager.register("ModelData", ModelData)
manager = BaseManager()
manager.start()
model_data = manager.ModelData(input_data)
print(f"Size: {sys.getsizeof(model_data)}")
provider_collection = ProviderCollection(model_data)
provider_collection.initialize_providers()
if __name__ == "__main__":
main()
This is the ModelData.py
from typing import Any
from pandas import DataFrame
import random
class ModelData:
def __init__(self, input_data: dict[str, DataFrame]):
self.data: dict[str, DataFrame] = input_data
def get_len(self):
return len(self.data)
def get_random_data(self, num_items) -> dict[Any, DataFrame]:
keys = list(self.data.keys())
random_keys = random.sample(keys, num_items)
return {key: self.data[key] for key in random_keys}
This is the TestProvider.py
import string
import time
from collections import defaultdict
from ModelData import ModelData
from typing import List
from multiprocessing import Event
from pandas import DataFrame
class TestProvider:
def __init__(self, model_data: ModelData,name: string, dependencys: List['TestProvider'] = None):
self.__input_data: ModelData = model_data
self.dependencys = dependencys if dependencys is not None else []
self.is_initialized = Event()
self.name = name
self.some_values: dict[str, DataFrame] = defaultdict(DataFrame)
def initialize(self) -> None:
print(f"Waiting for {self.name} to be initialized...")
for dependency in self.dependencys:
dependency.wait_for_initialization()
dependency.do_something()
print(f"Starting: {self.name}")
self.is_initialized.set()
def wait_for_initialization(self):
return self.is_initialized.wait()
def do_something(self):
for key, value in self.__input_data.get_random_data(20).items():
self.some_values[key] = value
for dependency in self.dependencys:
dependency.do_something()
This is the ProviderCollection.py
from ModelData import ModelData
from TestProvider import TestProvider
from TestProvider2 import TestProvider2
from multiprocessing import Process
from multiprocessing.managers import BaseManager
class ProviderCollection:
def __init__(self, model_data: ModelData):
self.providers = []
BaseManager.register("TestProvider", TestProvider)
manager = BaseManager()
manager.start()
provider1 = TestProvider(model_data, "1")
self.providers.append(provider1)
provider2 = TestProvider(model_data,"2", dependencys=[provider1])
self.providers.append(provider2)
provider5 = TestProvider(model_data, "5", dependencys=[provider1, provider2])
self.providers.append(provider5)
def initialize_providers(self):
processes = []
for provider in self.get_providers():
process = Process(target=provider.initialize)
processes.append(process)
process.start()
print("Processes Started")
for process in processes:
process.join()
def get_providers(self):
return self.providers
I think the problem is that there are too many calls between the different providers, and consequently, between the different processes. How can I get around this? Does anyone have a better idea to solve this problem? I am looking forward to any suggestions you might have.
pandas datasetprocessing?Parallel-pandas,Polars,dask,redis