7

Hi I'm trying to use multiprocessing to speed up my code. However, the apply_async doesn't work for me. I tried to do a simple example like:

from multiprocessing.pool import Pool
t = [0, 1, 2, 3, 4, 5]
def cube(x):
    t[x] = x**3
pool = Pool(processes=4)
for i in range(6):
    pool.apply_async(cube, args=(i, ))
for x in t:
    print(x)

It does not really change t as I would expect.

My real code is like:

from multiprocessing.pool import Pool
def func(a, b, c, d):
    #some calculations
    #save result to files
    #no return value
lt = #list of possible value of a
#set values to b, c, d
p = Pool()
for i in lt:
    p.apply_async(func, args=(i, b, c, d, ))

Where are the problems here?

Thank you!


Update: Thanks to the comments and answers, now I understand why my simple example won't work. However, I'm still in trouble with my real code. I have checked that my func does not rely on any global variable, so it seems not to be the same problem as my example code.

As suggested, I added a return value to my func, now my code is:

f = Flux("reactor")
d = Detector("Ge")
mv = arange(-6, 1.5, 0.5)
p = Pool()
lt = ["uee", "dee"]
for i in lt:
    re = p.apply_async(res, args=(i, d, f, mv, ))
    print(re.get())
p.close()
p.join()

Now I get the following error:

Traceback (most recent call last):
  File "/Users/Shu/Documents/Programming/Python/Research/debug.py", line 35, in <module>
print(re.get())
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 608, in get
raise self._value
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 385, in _handle_tasks
put(task)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'Flux.__init__.<locals>.<lambda>'
3
  • Is func() not creating the files as expected, or are you just not seeing any speed benefits? Commented Jul 4, 2017 at 3:38
  • @JohnGordon func doesn't do any thing, like in my first example cube is not executed. Commented Jul 4, 2017 at 3:56
  • You are assuming that every process shares the same global t, which is by definition incorrenct. You will have to pass t as a parameter so that t exists and is shared by all the processes. Commented Jul 4, 2017 at 16:51

2 Answers 2

8

EDIT: the first example you provided will not work for a simple reason: processes do not share memory. Therefore, the change t[x] = x**3 will not be applied to the parent process leaving the values of the list t unchanged.

You need to actually return the value from the computation and build a new list from that.

def cube(x):
    return x**3

t = [0, 1, 2, 3, 4, 5]

p = Pool()
t = p.map(cube, t)

print(t)  

If, as you claim in the second example, the results are supposed not to be returned but to be independently stored within files and this does not happen, I'd recommend to check the return value of your function to see whether the function itself is raising an exception or not.

I'd recommend you to get the actual results and see what happens:

p = Pool()
for i in lt:
    res = p.apply_async(func, args=(i, b, c, d, ))
    print(res.get())  # this will raise an exception if it happens within func

p.close()  # do not accept any more tasks
p.join()  # wait for the completion of all scheduled jobs
Sign up to request clarification or add additional context in comments.

4 Comments

Edited the answer.
Thanks for your patience! Now I got an error, please see my updated question.
According to the traceback, the function you are trying to schedule is not picklable. If it's a lambda function as the traceback suggests, try to convert it to a normal function.
Thanks, the res.get() displayed the exception that was not visible otherwise
-2

Function quits too soon, try add at the end of your script this code:

import time
time.sleep(3)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.