2

I am trying to use the shared_memory with pool in python's multiprocessing.

In the Documentation, about shared memory, the argument buf (the memory view) is not clear to me (perhaps because I don't understand the concept of memory view - is it a pointer?).
I want to use this shared memory across different processes. Following, my example base on the documentation:

a = np.array([1, 1, 2, 3, 5, 8])  
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)

# Do I need to create the existing_shm or I can keep using shm?
existing_shm = shared_memory.SharedMemory(name=shm.name)

Now comes my first problem. I define the function that will use the array in the shared memory:

def test_function(Input):
    c = np.ndarray(a.shape, dtype=np.int64, buffer=existing_shm.buf)
    c[1]=100
    print(c)

This is incorrect but I don't know how should it be.

Then the main. Is there a role of having the main function to make this work?

if __name__=='__main__':
    with Pool(os.cpu_count()) as p:
        p.map(test_function, range(12))

It doesn't work. Do I have to define c in every process? Or I can define it in the main and use it across all processes? I assume that c is a python object and therefore can't be shared by processes due to the gil-lock?

Thank you very much!

1 Answer 1

2

This works. I don't have a clear understanding of all the facts yet, though.

1- The shared memory object is declared:
shm = shared_memory.SharedMemory(create=True, size=10000000*4).

2- A (numpy array in this case) object is declared with buffer as follows:
b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf).

3- The numpy array is populated by copying data into it.
b[:] = np.random.randint(100, size=10000000, dtype=np.int32).

Then, all the function to be executed in many cpus needs is the name of the shared memory object and the mentioned step 2 inside the function is mapping the shared memory, which has been populated earlier.

It's essential that you close the shared object after accessing it and at the end unlink.

import numpy as np
from multiprocessing import shared_memory, Pool
import os


def test_function(args): 
    Input, shm_name, size = args
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    d = np.ndarray(size, dtype=np.int32, buffer=existing_shm.buf)
    #print(Input, d[Input-1:Input+2])
    d[Input]=-20
    #print(Input, d[Input-1:Input+2])
    existing_shm.close()
    print(Input, 'parent process:', os.getppid())
    print(Input, 'process id:', os.getpid())


if __name__=='__main__':
    
    shm = shared_memory.SharedMemory(create=True, size=10000000*4)
    b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
    b[:] = np.random.randint(100, size=10000000, dtype=np.int32)

    inputs =[[    1,shm.name,b.shape],
    [    2,shm.name,b.shape],
    [    3,shm.name,b.shape],
    [    4,shm.name,b.shape],
    [    5,shm.name,b.shape],
    [    6,shm.name,b.shape],
    [    7,shm.name,b.shape],
    [    8,shm.name,b.shape],
    [    9,shm.name,b.shape],
    [    10,shm.name,b.shape],
    [    11,shm.name,b.shape],
    [    12,shm.name,b.shape],
    [13,shm.name,b.shape]]

    with Pool(os.cpu_count()) as p:
        p.map(test_function, inputs)
 
    print(b[:20])
    
    # Clean up from within the first Python shell
    shm.close()
    shm.unlink()  # Free and release the shared memory block at the very end
Sign up to request clarification or add additional context in comments.

1 Comment

the docs also give a great way to get the size needed for a particular numpy array myndarray.nbytes. There is also a manager but its fairly rudimentary

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.