10

I am making a process pool and each of them need to write in different parts of a matrix that exists in the main program. There exists no fear of overwriting information as each process will work with different rows of the matrix. How can i make the matrix writable from within the processes??

The program is a matrix multiplier a professor assigned me and has to be multiprocessed. It will create a process for every core the computer has. The main program will send different parts of the matrix to the processes and they will compute them, then they will return them in a way i can identify which response corresponds to which row it was based on.

5
  • Are you sure you need this? At the first glance, the point of multiprocessing is to distribute calculations, not writing to arrays. Why can't your subroutines just return the appropriate results for further handling by the main program? Commented Mar 16, 2012 at 18:43
  • Yes you are right, then what i would need to ask is how can i manage the responses of what can be multiple processes (one for each core)? Commented Mar 16, 2012 at 18:56
  • 1
    multiprocessing module allows you to collect the results of 'asyncronously'-run functions, no matter what they return. Say, they can return 1D arrays corresponding to rows of your matrix. If your case is more complex, please edit your post to provide more details. Commented Mar 16, 2012 at 19:02
  • I added more info, thanks for all your help. Commented Mar 16, 2012 at 19:23
  • 1
    example how to write to a numpy array from multiple processes Commented Mar 16, 2012 at 20:21

4 Answers 4

11

Have you tried using multiprocessing.Array class to establish some shared memory?

See also the example from the docs:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

Just extend this to a matrix of size h*w with i*w+j-style indexing. Then, add multiple processes using a Process Pool.

Sign up to request clarification or add additional context in comments.

3 Comments

+1: for shared array. Synchronization is not need in this case so multiprocessing.RawArray can be used.
Could you please provide a short example using Process Pool and Array? Using pool.map it seems not possible to pass the arr and num at the same time.
@YPOC There are quite a few Q/A on that already. Did you check with any of those (e.g., this or that)? Otherwise, I'd recommend to post a new question for your issue specifically.
6

The cost of creating of new processes or copying matrices between them if processes are reused overshadows the cost of matrix multiplication. Anyway numpy.dot() can utilize different CPU cores by itself.

Matrix multiplication can be distributed between processes by computing different rows of the result in different processes, e.g., given input matrices a and b then the result (i,j) element is:

out[i,j] = sum(a[i,:] * b[:,j])

So i-th row can be computed as:

import numpy as np

def dot_slice(a, b, out, i):
    t = np.empty_like(a[i,:])
    for j in xrange(b.shape[1]):
        # out[i,j] = sum(a[i,:] * b[:,j])
        np.multiply(a[i,:], b[:,j], t).sum(axis=1, out=out[i,j])

numpy array accepts a slice as an index, e.g., a[1:3,:] returns the 2nd and 3rd rows.

a, b are readonly so they can be inherited as is by child processes (exploiting copy-on-write on Linux), the result is computed using shared array. Only indexes are copied during computations:

import ctypes
import multiprocessing as mp

def dot(a, b, nprocesses=mp.cpu_count()):
    """Perform matrix multiplication using multiple processes."""
    if (a.shape[1] != b.shape[0]):
        raise ValueError("wrong shape")

    # create shared array
    mp_arr = mp.RawArray(ctypes.c_double, a.shape[0]*b.shape[1])

    # start processes
    np_args = mp_arr, (a.shape[0], b.shape[1]), a.dtype
    pool = mp.Pool(nprocesses, initializer=init, initargs=(a, b)+np_args)

    # perform multiplication
    for i in pool.imap_unordered(mpdot_slice, slices(a.shape[0], nprocesses)):
        print("done %s" % (i,))
    pool.close()
    pool.join()

    # return result
    return tonumpyarray(*np_args)

Where:

def mpdot_slice(i):
    dot_slice(ga, gb, gout, i)
    return i

def init(a, b, *np_args):
    """Called on each child process initialization."""
    global ga, gb, gout
    ga, gb = a, b
    gout = tonumpyarray(*np_args)

def tonumpyarray(mp_arr, shape, dtype):
    """Convert shared multiprocessing array to numpy array.

    no data copying
    """
    return np.frombuffer(mp_arr, dtype=dtype).reshape(shape)

def slices(nitems, mslices):
    """Split nitems on mslices pieces.

    >>> list(slices(10, 3))
    [slice(0, 4, None), slice(4, 8, None), slice(8, 10, None)]
    >>> list(slices(1, 3))
    [slice(0, 1, None), slice(1, 1, None), slice(2, 1, None)]
    """
    step = nitems // mslices + 1
    for i in xrange(mslices):
        yield slice(i*step, min(nitems, (i+1)*step))

To test it:

def test():
    n = 100000
    a = np.random.rand(50, n)
    b = np.random.rand(n, 60)
    assert np.allclose(np.dot(a,b), dot(a,b, nprocesses=2))

On Linux this multiprocessing version has the same performance as the solution that uses threads and releases GIL (in the C extension) during computations:

$ python -mtimeit -s'from test_cydot import a,b,out,np' 'np.dot(a,b,out)'
100 loops, best of 3: 9.05 msec per loop

$ python -mtimeit -s'from test_cydot import a,b,out,cydot' 'cydot.dot(a,b,out)' 
10 loops, best of 3: 88.8 msec per loop

$ python -mtimeit -s'from test_cydot import a,b; import mpdot' 'mpdot.dot(a,b)'
done slice(49, 50, None)
..[snip]..
done slice(35, 42, None)
10 loops, best of 3: 82.3 msec per loop

Note: the test was changed to use np.float64 everywhere.

Comments

2

Matrix multiplication means each element of the resulting matrix is calculated separately. That seems like a job for Pool. Since it's homework (and also to follow the SO code) I will only illustrate the use of the Pool itself, not the whole solution.

So, you have to write a routine to calculate the (i, j)-th element of the resulting matrix:

def getProductElement(m1, m2, i, j):
    # some calculations
    return element

Then you initialize the Pool:

from multiprocessing import Pool, cpu_count
pool = Pool(processes=cpu_count())

Then you need to submit the jobs. You can organize them in a matrix, too, but why bother, let's just make a list.

result = []
# here you need to iterate through the the columns of the first and the rows of
# the second matrix. How you do it, depends on the implementation (how you store
# the matrices). Also, make sure you check the dimensions are the same.
# The simplest case is if you have a list of columns:

N = len(m1)
M = len(m2[0])
for i in range(N):
    for j in range(M):
        results.append(pool.apply_async(getProductElement, (m1, m2, i, j)))

Then fill the resulting matrix with the results:

m = []
count = 0
for i in range(N):
    column = []
    for j in range(M):
        column.append(results[count].get())
    m.append(column)

Again, the exact shape of the code depends on how you represent the matrices.

9 Comments

Copying the whole matrices to compute a single element is grossly inefficient.
@J.F.Sebastian, I genuinely believed they were passed by reference. Was I wrong the whole time? At least in this example matrices are lists.
It is easy to demonstrate: just try to change values. See @moooeeeep' answer on how to share a value between multiple processes.
Yep, so if I have something like def change(l): l.append('new'), and then I do l = [] and call change(l), print l gives me ['new']. That's exactly what I mean.
try: pool.apply(change, (l,)); print l
|
-4

You don't.

Either they return their edits in a format you can use in the main programme, or you use some kind of interprocess-communication to have them send their edits over, or you use some kind of shared storage, such as a database, or a datastructure server like redis.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.