0

So I thought I'd finally post; what is the proper way to manage Process workers? I've tried to use a Pool, but I noticed I could not get the return value of each completed process. I tried to use a callback but that didn't work as expected either. Should I just be managing them myself with active_children ()?

My Pool code:

from multiprocessing import *                                                                                      
import time
import random

SOME_LIST = []

def myfunc():
    a = random.randint(0,3)
    time.sleep(a)
    return a

def cb(retval):
    SOME_LIST.append(retval)

print("Starting...")

p = Pool(processes=8)
p.apply_async(myfunc, callback=cb)
p.close()
p.join()

print("Stopping...")
print(SOME_LIST)

I expect a list of values; but all I get is the last item in the worker job to complete:

$ python multi.py 
Starting...
Stopping...
[3]

Note: The answer should not use threading module; here is the reason why:

In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing.

4
  • Using a Pool is the correct way. You should be able to get the return value of each completed worker Process, too : return_value = pool.apply(func, args=(arg1, arg2). Can you share the code you're trying to use that isn't working? Commented Oct 21, 2014 at 15:59
  • I'm not sure if this is what you mean, but this works fine, too: return_value = pool.apply(func). Where func is def func(): return 12345. return_value will be assigned to the 12345 returned by func. Commented Oct 21, 2014 at 20:35
  • I'm confused. Why do you expect a list of results from the above example? You're just calling apply_async once, which means you're just calling myfunc a single time in one of the worker processes. What are you expecting to happen? Commented Oct 22, 2014 at 3:47
  • Nope. It just calls it once. To call it for every process in the pool, you'd need to do for _ in range(pool._processes): pool.apply_async(myfunc, callback=cb). Or, if you make myfunc take a single argument (which you could ignore), you could do: SOME_LIST = pool.map(myfunc, range(pool._processes)) Commented Oct 22, 2014 at 3:53

1 Answer 1

6

You're misunderstanding the way apply_async works. It doesn't call the function you pass to it in every process in the Pool. It just calls the function one time, in one of the worker processes. So the results you're seeing are to be expected. You have a couple of options to get the behavior you want:

from multiprocessing import Pool                                                                                   
import time
import random

SOME_LIST = []

def myfunc():
    a = random.randint(0,3)
    time.sleep(a)
    return a

def cb(retval):
    SOME_LIST.append(retval)

print("Starting...")

p = Pool(processes=8)
for _ in range(p._processes):
    p.apply_async(myfunc, callback=cb)
p.close()
p.join()

print("Stopping...")
print(SOME_LIST)

Or

from multiprocessing import Pool                                                                                      
import time
import random


def myfunc():
    a = random.randint(0,3)
    time.sleep(a)
    return a

print("Starting...")

p = Pool(processes=8)
SOME_LIST = p.map(myfunc, range(p._processes))
p.close()
p.join()

print("Stopping...")
print(SOME_LIST)

Note that you could also call apply_async or map for more than the number of processes in the pool. The idea of the Pool is that it guarantees exactly num_processes processes will be running for the entire lifetime of the Pool, no matter how many tasks you submit. So if you create a Pool(8) and call apply_async once, one of your eight workers will get a task, and the other seven will be idle. If you create a Pool(8) and call apply_async 80 times, the 80 tasks will get distributed to your eight workers, with no more than eight of the tasks actually being processed at once.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.