0

I need to parallelize a function that modifies values of one of the arguments. For example modify this simple code:

def Test(i, a, length):
    if i % 2 == 0:
        for x in range(0, length, 2):
            a[x] = 2
    else:
        for x in range(1, length, 2):
            a[x] = 1

a0 = numpy.ndarray(shape=10, dtype=numpy.uint8)
Test(0, a, a.shape[0])
Test(1, a, a.shape[0])
print(a)

I tried to use joblib using Parallel(n_jobs=2)(delayed(Test)(i, a, 10) for i in range(2)) and multiprocessing.Pool with:

with multiprocessing.Pool(processes=2) as Pool:
    data = [(0, a, a.shape[0]), (1, a, a.shape[0])]
    Pool.starmap(Test, data)

But both solutions do not work because they fork the data, so the parameters do not get modified.

What would be my options to do such parallelization?

6
  • 2
    With multithreading data are shared, but with multiprocessing they are pickled (i.e. copied). If you want to mutate the input, then you need to use multithreading or to explicitly use shared memory with multiprocessing. See multiprocessing.shared_memory. Commented Apr 1 at 0:31
  • why starmap? why not apply_async? Commented Apr 1 at 5:15
  • @SuramuthuR What would it change? Commented Apr 1 at 6:39
  • @FiReTiTi, No, I want to know whether there is any specific reason for using starmap as I got used to apply_async in most of the occasions. Commented Apr 1 at 6:41
  • @FiReTiTi, Sorry for the late reply. Check my answer. As I got used to apply_async and map, I had to find out from the internet about starmap which consumed some since I got immersed into few other things in the net. Commented Apr 1 at 9:43

1 Answer 1

2

We use two functions:

  1. np_array_to_shared_array: Takes a numpy array and initializes a multiprocessing.Array instance with the contents of the numpy array.
  2. shared_array_to_np_array: Takes a multiprocessing.Array instance created with np_array_to_shared_array and reconstructs a numpy array whose buffer is the sharable multiprocessing.Array instance.

We then create our multiprocessing pool specifying a pool initializer that is passed a multiprocessi.Array instance created with np_array_to_shared_array and then using shared_array_to_np_array reconstructs the sharable numpy array for each process in the pool as a global variable 'a':

import numpy as np
import multiprocessing

def np_array_to_shared_array(np_array: np.ndarray,
                             lock: bool=True
                             ) -> multiprocessing.Array:
    """Initialize a multiprocessing.Array instance with the contents of a numpy array."""

    shared_array = multiprocessing.Array('B', np_array.nbytes, lock=lock)
    buffer = shared_array.get_obj() if lock else shared_array
    arr = np.frombuffer(buffer, np_array.dtype)
    arr[:] = np_array.flatten(order='C')
    return shared_array

def shared_array_to_np_array(shared_array: multiprocessing.Array,
                             shape: tuple[int],
                             dtype: object
                             ) -> np.ndarray:
    """Initialize a numpy array using a multiprocessing.Array instance as the buffer."""

    buffer = (
        shared_array.get_obj() if getattr(shared_array, 'get_obj', None)
        else shared_array
    )
    return np.ndarray(shape, dtype=dtype, buffer=buffer)

def init_pool(shared_array, shape, dtype):
    global a

    a = shared_array_to_np_array(shared_array, shape, dtype)

def test(i):
    len = a.shape[0]

    if i % 2 == 0:
        for x in range(0, len, 2):
            a[x] = 2
    else:
        for x in range(1, len, 2):
            a[x] = 1

def main():
    arr = np.zeros(shape=10, dtype=np.uint8)
    shape = arr.shape
    dtype = arr.dtype
    shared_array = np_array_to_shared_array(arr, lock=False)
    arr = shared_array_to_np_array(shared_array, shape, dtype)

    with multiprocessing.Pool(processes=2,
                              initializer=init_pool,
                              initargs=(shared_array, shape, dtype)
                             ) as pool:
        pool.map(test, [0, 1])

    print(arr)


if __name__ == '__main__':
    main()

Prints:

[2 1 2 1 2 1 2 1 2 1]
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you for this complete answer. I would have a question: in the function np_array_to_shared_array , what is the purpose of the two lines using arr (arr = np.frombuffer(buffer, np_array.dtype) and arr[:] = np_array.flatten(order='C'))?
arr = np.frombuffer(buffer, np_array.dtype) takes the contents of buffer interpreted as a one-dimensional numpy array of type np_array.dtype. But buffer is just multiprocessing.Array('B', np_array.nbytes, lock=lock), which is bytes of value 0. Next we execute arr[:] = np_array.flatten(order='C') which takes the numpy array np_array and uses method flatten to collapse it to a one-dimensional array and then copies it values byte for byte into arr. Thus arr will have the same values of np_array if we consider np_array to have been reshaped to a one-dimensional array.
In short, we are initializing a multiprocessing.Array instance with the values of numpy array. I suggest that you look at the numpy documentation and look up methods numpy.frombuffer and numpy.ndarray.flatten.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.