1

I am new to using parallel processing for data analysis. I have a fairly large array and I want to apply a function to each index of said array.

Here is the code I have so far:

import numpy as np
import statsmodels.api as sm
from statsmodels.regression.quantile_regression import QuantReg
import multiprocessing
from functools import partial

def fit_model(data,q):
    #data is a 1-D array holding precipitation values
    years = np.arange(1895,2018,1)
    res = QuantReg(exog=sm.add_constant(years),endog=data).fit(q=q)
    pointEstimate = res.params[1] #output slope of quantile q
    return pointEstimate

#precipAll is an array of shape (1405*621,123,12) (longitudes*latitudes,years,months)
#find all indices where there is data
nonNaN = np.where(~np.isnan(precipAll[:,0,0]))[0] #481631 indices
month = 4

#holder array for results
asyncResults = np.zeros((precipAll.shape[0])) * np.nan
def saveResult(result,pos):
    asyncResults[pos] = result

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=20) #my server has 24 CPUs
    for i in nonNaN:
        #use partial so I can also pass the index i so the result is
        #stored in the expected position

        new_callback_function = partial(saveResult, pos=i)
        pool.apply_async(fit_model, args=(precipAll[i,:,month],0.9),callback=new_callback_function)

    pool.close()
    pool.join()

When I ran this, I stopped it after it took longer than had I not used multiprocessing at all. The function, fit_model, is on the order of 0.02 seconds, so could the overhang associated with apply_async be causing the slowdown? I need to maintain order of the results as I am plotting this data onto a map after this processing is done. Any thoughts on where I need improvement is greatly appreciated!

1 Answer 1

1

If you need to use the multiprocessing module, you'll probably want to batch more rows together into each task that you give to the worker pool. However, for what you're doing, I'd suggest trying out Ray due to its efficient handling of large numerical data.

import numpy as np
import statsmodels.api as sm
from statsmodels.regression.quantile_regression import QuantReg
import ray

@ray.remote
def fit_model(precip_all, i, month, q):
    data = precip_all[i,:,month]
    years = np.arange(1895, 2018, 1)
    res = QuantReg(exog=sm.add_constant(years), endog=data).fit(q=q)
    pointEstimate = res.params[1]
    return pointEstimate

if __name__ == '__main__':
    ray.init()

    # Create an array and place it in shared memory so that the workers can
    # access it (in a read-only fashion) without creating copies.
    precip_all = np.zeros((100, 123, 12))
    precip_all_id = ray.put(precip_all)

    result_ids = []
    for i in range(precip_all.shape[0]):
        result_ids.append(fit_model.remote(precip_all_id, i, 4, 0.9))

    results = np.array(ray.get(result_ids))

Some Notes

The example above runs out of the box, but note that I simplified the logic a bit. In particular, I removed the handling of NaNs.

On my laptop with 4 physical cores, this takes about 4 seconds. If you use 20 cores instead and make the data 9000 times bigger, I'd expect it to take about 7200 seconds, which is quite a long time. One possible approach to speeding this up is to use more machines or to process multiple rows in each call to fit_model in order to amortize some of the overhead.

The above example actually passes the entire precip_all matrix into each task. This is fine because each fit_model task only has read access to a copy of the matrix stored in shared memory and so doesn't need to create its own local copy. The call to ray.put(precip_all) places the array in shared memory once up front.

For about the differences between Ray and Python multiprocessing. Note I'm helping develop Ray.

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you for your insight! Do you think something like numpy's apply_along_axis method would be useful to your comment of batching more rows together for a task? precip_all has shape (1405,621,123) so I could have 1405 iterations where all 621 indices are passed to a single task.
Yes, I think that makes sense. You could either use apply_along_axis or manually do a for loop over the 621 indices.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.