1

I need to accumulate the results of many trees for some query that outputs a large result. Since all trees can be handled independently it is embarrassingly parallel, except for the fact that the results needs to be summed and I cannot store the intermediate results for all trees in memory. Below is a simple example of a code for the problem which saves all the intermediate results in memory (of course the functions are newer the same in the real problem since that would be doing duplicated work).

import numpy as np
from joblib import Parallel, delayed


functions=[[abs,np.round] for i in range(500)] # Dummy functions
functions=[function for sublist in functions for function in sublist]
X=np.random.normal(size=(5,5)) # Dummy data


def helper_function(function,X=X):
    return function(X)
results = Parallel(n_jobs=-1,)(
         map(delayed(helper_function), [functions[i] for i in range(1000)]))
results_out = np.zeros(results[0].shape)
for result in results:
    results_out+=result

A solution could be the following modification:

import numpy as np
from joblib import Parallel, delayed

functions=[[abs,np.round] for i in range(500)] # Dummy functions
functions=[function for sublist in functions for function in sublist]
X=np.random.normal(size=(5,5)) # Dummy data
results_out = np.zeros(results[0].shape)

def helper_function(function,X=X,results=results_out):
    result = function(X)
    results += result
Parallel(n_jobs=-1,)(
         map(delayed(helper_function), [functions[i] for i in range(1000)]))

But this might cause races. So it is not optimal.

Do you have any suggestions for preforming this without storing the intermediate results and still make it parallel?

1 Answer 1

0

The answer is given in the documentation of joblib.

with Parallel(n_jobs=2) as parallel:
    accumulator = 0.
    n_iter = 0
    while accumulator < 1000:
        results = parallel(delayed(sqrt)(accumulator + i ** 2)
                           for i in range(5))
        accumulator += sum(results)  # synchronization barrier
        n_iter += 1

You can do the calculation in chunks and reduce the chunk as you are about to run out of memory.

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks great answer. I did not see your responds until after I added the solution with potential races.
Did it solve your problem? If so, please consider accepting the answer, if not I'd be curious why :-)
I think it solved the problem to 99 %, since it can works in practice. But it still leaves a hyperparameter to set for the application (the basket size for the accumulator) and gives a synchronization barrier. Just out of curiosity I wonder if it is posible to do secure inplace operations, such that all threads can update the results. I think that in other parallel programming regimes such as tensorflow have something like this included.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.