2

Simplified scenario

tl;dr: See code snippet below. Here I just explain what it's supposed to do and why.

I have an object example_class_instance of a class ExampleClassWithTonOfData that holds a ton of random data (in the original problem the data is read from file during run time).

example_class_instance has a method change_inplace() that takes an object of another class ObjectThatNeedsToBeModified and modifies it inplace using its stored data.

I have a function do_work that takes an object of ObjectThatNeedsToBeModified, passes it to example_class_instance.change_inplace() and returns the modified object.

I want to do this modification for a ton of objects and I want to do it using multiprocessing. I use the multiprocessing.Pool's map() function for this purpose.

As far as I know, map() will not begin yielding until all tasks have been submitted, i.e. until all objects of ObjectThatNeedsToBeModified have been assigned to a process. To avoid memory issues, I therefore only pass batches of objects to the executor map instead of an iterator over all objects. To this end, I create one object_iterator function that iterates over all objects I want to modify and one batch_iterator function that uses the object_iterator function and yields batches of objects.

I then iterate over the created batches and pass each batch to the map() function with the do_work() function.

Here is the code for this minimal example:

import psutil
import random
import multiprocessing

random.seed(42)

BATCH_SIZE = 1000
N_PROCESSES = 8
STORED_DATA_SIZE = 100000000
N_OBJECTS = 10000000
N_OBJECTS_VAL_RANGE = 100000


class ObjectThatNeedsToBeModified:
    def __init__(self):
        self.c = None


class ExampleClassWithTonOfData:
    # Class that holds an object with a huge memory footprint
    def __init__(self):
        self.lot_of_data = [random.randint(0, 100000) for _ in range(STORED_DATA_SIZE)]

    def change_inplace(self, object):
        object.c = self.lot_of_data[random.randint(0, STORED_DATA_SIZE-1)]


def do_work(args_tuple):
    example_class_instance.change_inplace(args_tuple)
    return args_tuple


def object_iterator():
    for _ in range(N_OBJECTS):
        yield ObjectThatNeedsToBeModified()


def batch_iterator(iterator):
    tuples = []
    for res_tuple in iterator:
        tuples.append(res_tuple)
        if len(tuples) >= BATCH_SIZE:
            yield tuples
            tuples = []

    if len(tuples) > 0:
        yield tuples


def main():
    print()
    info = p.memory_full_info()
    print(f"PSS: {getattr(info, 'pss'):> 10}, USS: {getattr(info, 'uss'):> 10}, RSS: {getattr(info, 'rss'):> 10}")

    print("Start working with %d processes." % N_PROCESSES)
    iterator = object_iterator()
    with multiprocessing.Pool(processes=N_PROCESSES) as executor:
        i = 0
        for batch in batch_iterator(iterator):
            for _ in executor.map(do_work, batch):
                if (i + 1) % (N_OBJECTS // 30) == 0:
                    info = p.memory_full_info()
                    print(f"PSS: {getattr(info, 'pss'):> 10}, USS: {getattr(info, 'uss'):> 10}, RSS: {getattr(info, 'rss'):> 10}")
                i += 1


if __name__ == "__main__":
    p = psutil.Process()
    print("Loading data")
    example_class_instance = ExampleClassWithTonOfData()

    main()

Problem

The memory consumption increases as long as the program is running. The output using psutil looks something like this:

PSS:  4051436544, USS:  4050128896, RSS:  4056670208
Start working with 8 processes.
PSS:  474577920, USS:    4972544, RSS:  4057817088
PSS:  495186944, USS:    5177344, RSS:  4057817088
PSS:  516726784, USS:    5181440, RSS:  4057817088
PSS:  539525120, USS:    5181440, RSS:  4057817088
PSS:  563405824, USS:    5218304, RSS:  4057817088
PSS:  588656640, USS:    5283840, RSS:  4057817088
PSS:  615304192, USS:    5423104, RSS:  4057817088
PSS:  643209216, USS:    5787648, RSS:  4057817088
PSS:  672796672, USS:    6475776, RSS:  4057817088
PSS:  703847424, USS:    7737344, RSS:  4057817088
PSS:  736442368, USS:    9609216, RSS:  4057817088
PSS:  770696192, USS:   12574720, RSS:  4057817088
PSS:  806727680, USS:   16683008, RSS:  4057817088
...

Interestingly, htop does not show this memory increase when looking at the processes, but the overall RAM consumption that is displayed increases.

What I've tried

Deleting batch after it was used has no effect.

The problem seems to have to do with the data loaded in ExampleClassWithTonOfData: If in example_class_instance.change_inplace() instead of using the stored data to modify the object, the object is assigned a constant value, the problem goes away.

As far as I know the data of the parent process is copied only if the child processes modify the data. Here however, the data of ExampleClassWithTonOfData is only modified once in its __init__ method.

Can someone explain why the memory consumption is increasing so drastically over time and how to avoid it? Any help is appreciated.

The provided code snippet is a full code example and should produce the described results when copied & pasted.

11
  • You should probably just use map(..., chunksize=...) instead of rolling batch_iterator yourself. Also see .imap_unordered() if the order of iteration doesn't matter. Commented Jan 18, 2022 at 14:29
  • Also remember that the changes to example_class_instance's properties are not propagated across process boundaries, so your program in effect does nothing. Commented Jan 18, 2022 at 14:30
  • 1
    By the way, your example only works if multiprocessing's start method is "fork" (which it is on Linux, but not on other platforms by default), since otherwise example_class_instance isn't implicitly COW'd into the subprocesses. Commented Jan 18, 2022 at 14:50
  • 1
    I tried my modified code in a Docker container and can't observe a drastic increase in rss (uss and pss do fluctuate, but not a lot). Give gist.github.com/akx/3b651934ebbb8eae79293641c64ac343 a shot? Commented Jan 18, 2022 at 15:04
  • 1
    You're right, I was eyeballing the results wrong. However, interestingly the increase is not linear - if you look at e.g. psutil.virtual_memory().percent, the overall memory use will plateau after a while. With maxtasksperchild, it's fairly stable (with local fluctuation of course). Since each access to a Python object will also invalidate the COW'd page the object's refcount is in, that might explain it - over time all of the integers in lot_of_data will have been accessed, so all of the child processes have their own copies of the data... Commented Jan 19, 2022 at 6:12

1 Answer 1

3

Setting the maxtasksperchild parameter in Pool() solves the issue.

From Python documentation:

maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool.

The fact that settings maxtasksperchild solves the issue seems to indicate that memory is leaking (See https://stackoverflow.com/a/54975030/7097579).

But I still don't understand where or why this is happening.

Sign up to request clarification or add additional context in comments.

2 Comments

Python memory management is complicated... This is exactly the solution I would recommend anyway. If you're interested in further reading, iirc one of the big tech companies that runs their stack on python (twitter maybe?) went to a fairly significant effort to fork cPython such that large chunks of loaded memory are fairly static so that they can fork processes and actually leverage copy-on-write memory allocation (normal garbage collection touches almost everything, so no data is particularly static). Even with that, they periodically re-start worker processes to free resources.
some additional reading here as well...

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.