1

I'm trying to make use of multiprocessing to speed up my array-based calculations. General workflow is as follows:

  • I have three arrays:
    • id_array is holding IDs of an array that belong together
    • class_array is a classified array (just integer representing class values from an image classification)
    • prob_array has the probability for these classes
  • based on the segments I want to:
    • find the class majority within each segment
    • average the probabilities within the segment, but only for the "pixels" that have the class majority

Here is my non-parallel example, which works fine:

import numpy as np

id_array = np.array([[1, 1, 2, 2, 2],
                     [1, 1, 2, 2, 4],
                     [3, 3, 4, 4, 4],
                     [3, 3, 4, 4, 4]])
class_array = np.array([[7, 7, 6, 8, 8],
                        [5, 7, 7, 8, 8],
                        [8, 8, 5, 5, 8],
                        [9, 9, 8, 7, 7]])
prob_array = np.array([[0.7, 0.3, 0.9, 0.5, 0.1],
                       [0.4, 0.6, 0.3, 0.5, 0.9],
                       [0.8, 0.6, 0.2, 0.2, 0.3],
                       [0.4, 0.4, 0.6, 0.3, 0.7]])

all_ids = np.unique(                     )
dst_classes = np.zeros_like(class_array)
dst_probs = np.zeros_like(prob_array)

for my_id in all_ids:
    segment = np.where(id_array == my_id)
    class_data = class_array[segment]
    # get majority of classes within segment
    majority = np.bincount(class_data.flatten()).argmax()
    # get probabilities within segment
    prob_data = prob_array[segment]
    # get probabilities within segment where class equals majority
    majority_probs = prob_data[np.where(class_data == majority)]
    # get median of these probabilities
    median_prob = np.nanmedian(majority_probs)
    # write values
    dst_classes[segment] = majority
    dst_probs[segment] = median_prob

print(dst_classes)
print(dst_probs)

The problem is that my real data have something like 4 million segments and this then takes a week to compute. So I followed this tutorial and came up with this:

import numpy as np
import multiprocessing as mp

WORKER_DICT = dict()
NODATA = 0

def shared_array_from_np_array(data_array, init_value=None):
    raw_array = mp.RawArray(np.ctypeslib.as_ctypes_type(data_array.dtype), data_array.size)
    shared_array = np.frombuffer(raw_array, dtype=data_array.dtype).reshape(data_array.shape)
    if init_value:
        np.copyto(shared_array, np.full_like(data_array, init_value))
        return raw_array, shared_array
    else:
        np.copyto(shared_array, data_array)
        return raw_array, shared_array

def init_worker(id_array, class_array, prob_array, class_results, prob_results):
    WORKER_DICT['id_array'] = id_array
    WORKER_DICT['class_array'] = class_array
    WORKER_DICT['prob_array'] = prob_array
    WORKER_DICT['class_results'] = class_results
    WORKER_DICT['prob_results'] = prob_results
    WORKER_DICT['shape'] = id_array.shape
    mp.freeze_support()

def worker(id):
    id_array = WORKER_DICT['id_array']
    class_array = WORKER_DICT['class_array']
    prob_array = WORKER_DICT['prob_array']
    class_result = WORKER_DICT['class_results']
    prob_result = WORKER_DICT['prob_results']
    # array indices for "id"
    segment = np.where(id_array == id)
    # get data at these indices, mask nodata values
    class_data = np.ma.masked_equal(class_array[segment], NODATA)
    # get majority value
    majority_class = np.bincount(class_data.flatten()).argmax()
    # get probabilities
    probs = prob_array[segment]
    majority_probs = probs[np.where(class_array[segment] == majority_class)]
    med_majority_probs = np.nanmedian(majority_probs)
    class_result[segment] = majority_class
    prob_result[segment] = med_majority_probs
    return

if __name__ == '__main__':
    # segment IDs
    id_ra, id_array = shared_array_from_np_array(np.array(
        [[1, 1, 2, 2, 2],
         [1, 1, 2, 2, 4],
         [3, 3, 4, 4, 4],
         [3, 3, 4, 4, 4]]))
    # classification
    cl_ra, class_array = shared_array_from_np_array(np.array(
        [[7, 7, 6, 8, 8],
         [5, 7, 7, 8, 8],
         [8, 8, 5, 5, 8],
         [9, 9, 8, 7, 7]]))
    # probabilities
    pr_ra, prob_array = shared_array_from_np_array(np.array(
        [[0.7, 0.3, 0.9, 0.5, 0.1],
         [0.4, 0.6, 0.3, 0.5, 0.9],
         [0.8, 0.6, 0.2, 0.2, 0.3],
         [0.4, 0.4, 0.6, 0.3, 0.7]]))
    cl_res, class_results = shared_array_from_np_array(class_array, 0)
    pr_res, prob_results = shared_array_from_np_array(prob_array, 0.)
    unique_ids = np.unique(id_array)
    init_args = (id_ra, cl_ra, pr_ra, cl_res, pr_res, id_array.shape)
    with mp.Pool(processes=2, initializer=init_worker, initargs=init_args) as pool:
        pool.map_async(worker, unique_ids)
    print('Majorities:', cl_res)
    print('Probabilities:', pr_res)

But I do not see how I can now get my results and whether they are correct. I tried

np.frombuffer(cl_res)
np.frombuffer(pr_res)

but that gives me only 10 values for cl_res (there should be 20) and they seem completely random, while pr_res has the exact same values as prob_array.

I have tried making use of other examples around here, like this, but can't get them to work either. That looks like a similar problem, but it already required a lot of knowledge how multiprocessing really works and I don't have that (total beginner with multiprocessing).

1 Answer 1

1

Several things to fix:

  • You need to create the numpy arrays in init_worker(), which should also take a shape argument:
def init_worker(id_ra, cl_ra, pr_ra, cl_res, pr_res, shape):
    WORKER_DICT['id_array'] = np.ctypeslib.as_array(id_ra, shape)
    WORKER_DICT['class_array'] = np.ctypeslib.as_array(cl_ra, shape)
    WORKER_DICT['prob_array'] = np.ctypeslib.as_array(pr_ra, shape)
    WORKER_DICT['class_results'] = np.ctypeslib.as_array(cl_res, shape)
    WORKER_DICT['prob_results'] = np.ctypeslib.as_array(pr_res, shape)
  • You should check if init_value is not None instead of just init_value in shared_array_from_np_array(), as 0 evaluates to False.
  • mp.freeze_support() should only be called immediately after if __name__ == '__main__', as per its docs.
  • pool.map_async() returns an AsyncResult object that needs to be waited on; you probably want pool.map(), which blocks until the processing is done.

You can access the results directly in the main section with the class_results and prob_results arrays.

Sign up to request clarification or add additional context in comments.

4 Comments

That's already a lot of great hints, thank you. However, although the pr_res is exactly what I want, the cl_res is still very strange. It is a multiprocessing.sharedctypes.c_long_Array_20 object with length 20 (as it should be), but when I do np.frombuffer(cl_res) it only has length 10. How can that be?
Ok, nevermind, it is np.frombuffer(cl_res, class_array.dtype).reshape(class_array.shape). Guess that solves it, will test it with my big dataset and then will probably accept this.
Oh, I forgot that part. I've updated the answer.
After 12 hours of processing nicely on 12 cores, I now had to cancel it (although I think it would have done what I need). Guess more than 4 million "segments", therefore unique IDs in id_array, are simply too much (a normal for-loop said it would take something like 2000 hours). Or I'm doing something really inefficient here. But since the code is actually doing what I asked for, this is still a very good blueprint.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.