2

Consider the following:

fine = np.random.uniform(0,100,10)
fine[fine<20] = 0 # introduce some intermittency
coarse = np.sum(fine.reshape(-1,2),axis=1)

fine is a timeseries of magnitudes (e.g. volume of rainfall). coarse is the same timeseries but at a halved resolution, so every 2 timesteps in fine are aggregated to a single value in coarse.

I am then interested in the weighting that determines the proportions of the magnitude of coarse that corresponds to each timestep in fine for the instances where the value of coarse is above zero.

def w_xx(fine, coarse):
    weights = [] 
    for i, val in enumerate(coarse):
        if val > 0:
            w = fine[i*2:i*2+2]/val # returns both w1 and w2, w1 is 1st element, w2 = 1-w1 is second
            weights.append(w)
    return np.asarray(weights)

So w_xx(fine,coarse) would return an array of shape 5,2 where the elements of axis=1 are the weights of fine for a value of coarse.

This is all fine for smaller timeseries, but I'm running this analysis on ~60k-sized arrays of fine, plus in a loop of 300+ iterations.

I have been trying to make this run in parallel using the multiprocessing library in Python2.7 but I've not managed to get far. I need to be be reading both timeseries at the same time in order to get the corresponding values of fine for every value in coarse, plus to only work for values above 0, which is what my analysis requires.

I would appreciate suggestions on a better way to do this. I imagine if I can define a mapping function to use with Pool.map in multiprocessing, I should be able to parallelize this? I've only just started out with multiprocessing so I don't know if there is another way?

Thank you.

3 Answers 3

3

You can achieve the same result in a vectorized form by simply doing:

>>> (fine / np.repeat(coarse, 2)).reshape(-1, 2)

then you may filter out rows which coarse is zero, by using np.isfinite since if coarse is zero the output is either inf or nan.

Sign up to request clarification or add additional context in comments.

Comments

1

In addition to the NumPy expression proposed by @behzad.nouri, you can use the Pythran compiler to reap extra speedups:

$ cat w_xx.py
#pythran export w_xx(float[], float[])
import numpy as np

def w_xx(fine, coarse):
    w = (fine / np.repeat(coarse, 2))
    return w[np.isfinite(w)].reshape(-1, 2)
$ python -m timeit -s 'import numpy as np; fine = np.random.uniform(0, 100, 100000); fine[fine<20] = 0; coarse = np.sum(fine.reshape(-1, 2), axis=1); from w_xx import w_xx' 'w_xx(fine, coarse)'
1000 loops, best of 3: 1.5 msec per loop
$ pythran w_xx.py -fopenmp -march=native # yes, this generates parallel code
$ python -m timeit -s 'import numpy as np; fine = np.random.uniform(0, 100, 100000); fine[fine<20] = 0; coarse = np.sum(fine.reshape(-1, 2), axis=1); from w_xx import w_xx' 'w_xx(fine, coarse)'
1000 loops, best of 3: 867 usec per loop

Disclaimer: I am a Pythran dev.

Comments

0

Excellent! I didn't know about np.repeat, thank you very much.

To answer my original question in the form it was presented, I've then also managed to make this work with multiprocessing:

import numpy as np    
from multiprocessing import Pool

fine = np.random.uniform(0,100,100000)
fine[fine<20] = 0
coarse = np.sum(fine.reshape(-1,2),axis=1)

def wfunc(zipped):
   return zipped[0]/zipped[1]

def wpar(zipped, processes):
    p = Pool(processes)
    calc = np.asarray(p.map(wfunc, zip(fine,np.repeat(coarse,2))))

    p.close()
    p.join()

    return calc[np.isfinite(calc)].reshape(-1,2)

However, the suggestion by @behzad.nouri is evidently better:

def w_opt(fine, coarse):
    w = (fine / np.repeat(coarse, 2))
    return w[np.isfinite(w)].reshape(-1,2)    

#using some iPython magic
%timeit w_opt(fine,coarse)
1000 loops, best of 3: 1.88 ms per loop

%timeit w_xx(fine,coarse)
1 loops, best of 3: 342 ms per loop

%timeit wpar(zip(fine,np.repeat(coarse,2)),6) #I've 6 cores at my disposal
1 loops, best of 3: 1.76 s per loop

Thanks again!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.