0

I have a class defined and used as follows, that includes a method called in the context of a multiprocessing pool:

from multiprocessing import Pool

class MyClass:
    def __init__(self, big_object):
        self.big_object = big_object

    def compute_one(self, myvar):
        return self.big_object.do_something_with(myvar)

    def compute_all(self, myvars):
        with Pool() as pool:
            results = pool.map(compute_one, myvars)
        return results


big_object = pd.read_csv('very_big_file.csv')
cls = MyClass(big_object)

vars = ['foo', 'bar', 'baz']
all_results = cls.compute_all(vars)

This "works" but the issue is that big_object takes several Gbs in RAM, and with the above code, this big object is copied in RAM for every process launched by multiprocessing.Pool.

How to modify the above code so that big_object would be shared among all processes, but still able to be defined at class instantiation?

-- EDIT

Some context into what I'm trying to do might help identify a different approach altogether.

Here, big_object is a 1M+ rows Pandas dataframe, with tens of columns. And compute_one computes statistics based on a particular column.

A simplified high-level view would be (extremely summarized):

  • take one of the columns (e.g. current_col = manufacturer)
  • for each rem_col of the remaining categorical columns:
    • compute a big_object.groupby([current_col, rem_col]).size()

The end result of this would look like:

manufacturer country_us country_gb client_male client_female
bmw 15 18 74 13
mercedes 2 24 12 17
jaguar 48 102 23 22

So in essence, this is about computing statistics for every column, on every other column, over the entire source dataframe.

Using multiprocessing here allows, for a given current_col, to compute all the statistics on remaining_cols in parallel. Among these statistics are not only sum but also mean (for remaining numerical columns).

A dirty approach using a global variable instead (a global big_object instantiated from outside the class), takes the entire running time from 5+ hours to about 20 minutes. My only issue is that I'd like to avoid this global object approach.

18
  • Could you use threads instead of processes? Shared variables is the native way to share data between a bunch of threads, not so for processes. Commented May 9, 2022 at 10:30
  • 2
    @timgeb compute_one is CPU-bond so unless I'm mistaken I think I need processes here, as the point is to use all available cores Commented May 9, 2022 at 10:33
  • 1
    this looks promising, no? Commented May 9, 2022 at 10:34
  • 2
    Processes can share memory via the multiprocessing.shared_memory.SharedMemory class. You can also define custom multiprocessing.managers that will act as a proxy to some (possibly big) object of a custom class (that runs in a separate thread). Commented May 9, 2022 at 10:36
  • 2
    Few GB of data dangerously approaches a limit of RAM capabilities anyway. I would suggest rewriting the algorithm into "chunk by chunk" approach. Commented May 9, 2022 at 10:41

2 Answers 2

1

One solution is to make MyClass a managed class just like, for example, a managed dictionary created with multiprocessor.Manager().dict(). To ensure that there is only one copy of the "big object", first I would modify MyClass.__init__ to take a CSV file path argument. In this way the "big object" is constructed only in the process of the manager. Second, I would remove the compute_all logic from MyClass and invoke the multiprocessing.pool.Pool.map method in such as way that what is being passed as the worker function is the managed objects's proxy.

What you save in space, you give up in some performance since each invocation of method compute_one results in more-or-less the equivalent of a remote method call to the manager's process where it is actually executed.

from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import pandas as pd

class MyClassManager(BaseManager):
    pass

class MyClass:
    def __init__(self, path):
        self.big_object = pd.read_csv(path)

    def compute_one(self, myvar):
        # For demo purposes just check if column myvar exists in dataframe:
        return myvar in self.big_object

# Required for Windows:
if __name__ == '__main__':
    MyClassManager.register('MyClass', MyClass)
    with MyClassManager() as manager:
        cls = manager.MyClass('very_big_file.csv')

        # vars is a built-in function name and should not be used:
        my_vars = ['foo', 'bar', 'baz']
        with Pool() as pool:
            all_results = pool.map(cls.compute_one, my_vars)
        print(all_results)
Sign up to request clarification or add additional context in comments.

1 Comment

A comment was just incorrectly added as answer: "I have found that based @Booboo 's answer, there is only one process is runing in actually, right?"
0

A minor addition to the previous reply.

The solution provided by @Booboo worked for me. However, depending on the size of the data, manager.shutdown() in the end of the "with...manager" block causes Permission error, for some reason.

The error seem to appear only with very large cls objects: when cls occupies 5-10% RAM there is no error, when it occupies 80% RAM (~50gb in my case) - the error appears.

I wasn't able to clearly define the reason for such behavior, but deleting cls before manager shutdown helped to avoid it:

if __name__ == '__main__':

MyClassManager.register('MyClass', MyClass)
with MyClassManager() as manager:
    cls = manager.MyClass('very_big_file.csv')

    my_vars = ['foo', 'bar', 'baz']
    with Pool() as pool:
        all_results = pool.map(cls.compute_one, my_vars)

    del cls  # <------- added line

print(all_results)

Below is the traceback of the Permission error:

Traceback (most recent call last):

File "D:\folder\main.py", line 73, 
        in <module> with MyClassManager() as manager:
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 657, 
        in __exit__  self.shutdown()
File "C:\...\Python311\Lib\multiprocessing\util.py", line 224, 
        in __call__
res = self._callback(*self._args, **self._kwargs)
      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 681, 
        in _finalize_manager process.terminate()
File "C:\...\Python311\Lib\multiprocessing\process.py", line 133, 
        in terminate self._popen.terminate()
File "C:\...\Python311\Lib\multiprocessing\popen_spawn_win32.py", line 124, 
        in terminate _winapi.TerminateProcess(int(self._handle), TERMINATE)

PermissionError: [WinError 5] Permission denied

Process finished with exit code 1

1 Comment

As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.