I don't know about Java's FixedThreadPool, but I can fix your code ;-)
You obviously don't want to use res.get(), right? So I'll ignore that part. The problem with .apply_async() is that you're not calling it correctly. I'm surprised no exception was raised! The argument list should be a tuple, not a list (as for the builtin apply() function). And for the keyword-args argument, None doesn't work. If you don't have keyword arguments to pass, either leave it out (as I do below) or pass an empty dictionary ({}).
Other changes here are more cosmetic: introduced an IO lock to keep the terminal output from getting scrambled, and introduced a __name__ == "__main__" check both for clarity and so that the code will run on Windows too:
import multiprocessing, time
def getlock(lck):
global iolock
iolock = lck
def printStuff(number):
with iolock:
print number
if number % 2:
time.sleep(0.5)
return number*number
def execute():
def resultAggregator(n):
with iolock:
print 'aggregator called...'
a.append(n)
for i in range(34):
pool.apply_async(printStuff, (i,), callback=resultAggregator)
with iolock:
print "called for ", i
if __name__ == "__main__":
a = []
iolock = multiprocessing.Lock()
pool = multiprocessing.Pool(5, getlock, (iolock,))
execute()
pool.close()
pool.join()
print a
Later: errors
Turns out an exception actually is raised if you pass None for the keyword arguments - but multiprocessing suppresses it. This is, alas, a common problem with asynchronous gimmicks: there's no good way to raise exceptions! They occur in a context unrelated to what your "main program" happens to be doing at the time.
At least Python 3.3.2's implementation of .apply_async() has an optional error_callback argument too. I don't know when it was introduced. If you supply it, async exceptions are passed to it, so you can decide how to report (or log, or ignore ...) them. Adding this function:
def ouch(e):
raise e
and changing the invocation to:
pool.apply_async(printStuff, (i,), None, resultAggregator, ouch)
yields a traceback ending in ouch() with this exception detail:
TypeError: printStuff() argument after ** must be a mapping, not NoneType
So, at least, with a recent enough Python you can arrange not to let asynch errors pass invisibly.
Q&A
Could you explain the "global iolock" declaration within getLock() ? I thought it
defines a global variable for each subprocess, but changing the name from
iolock to iiolock in ? "main" makes iolock unknown to the worker processes.
Sorry, I can't tell from that exactly what you did. The name iolock is intended to be a global in all processes, main and children. That's because all processes in my code use the name iolock.
If, for example, "by changing the name ..." you mean you merely replaced
iolock = multiprocessing.Lock()
to
iiolock = multiprocessing.Lock()
then you would have got an exception:
Traceback (most recent call last):
...
pool = multiprocessing.Pool(5, getlock, (iolock,))
NameError: global name 'iolock' is not defined
If you also changed that line (pool = ...) to use iiolock too, then you would have gotten a different exception, when resultAggregator in the main process tried to use iolock:
Exception in thread Thread-3:
Traceback (most recent call last):
...
File "mpool.py", line 19, in resultAggregator
with iolock:
NameError: global name 'iolock' is not defined
So I have no idea what you really did.
Also declaring printStuff within execute causes a silent
error (code does not progress past printing "called for")
That can't work. Functions in Python are not declared - def is an executable statement. The code for printStuff doesn't exist before def printstuff is executed. Because only the main program executes execute(), functions def'ed inside execute() only exist in the main program. It's true that
pool.apply_async(printStuff, (i,), callback=resultAggregator)
passes printStuff to the child processes, but all things passed work via pickling on the sending end and unpickling on the receiving end, and function objects cannot be pickled. Are you sure didn't get an error like this?:
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
(I'm using Python 3 here - maybe it's different under Python 2).
In any case, Python isn't Java - don't go crazy with nesting - keep it simple ;-) Every function and class used by a child process should be defined at module level (class is also an executable statement in Python! The only "declarations" in Python are the global and nonlocal statements).
More Q&A
You are right about the assumptions. I changed to iiolock in all
places except main
Still don't know exactly what you did. For things like this, you really have to post code, not merely describe what you did. I can only guess otherwise - and that's really painful ;-) How about this: if you have a new problem, open a new question?
From what you described here ("in all places except main"), you would have gotten an exception in execute(), because the new iiolock name would not exist in the main process (which is the only process in which execute() runs - and you said you did not change the old iolock in main()). But you didn't mention an exception, so I guess you didn't really do exactly what you said you did ("in all places except main").
and was expecting that the new Process just get the same lock
passed as a param to initialization function but each has its
own global iiolock variable. How can multiple processes share
the same variable anyway (aren't the memory contents different
for each process???).
There are two answers to this ;-) The one most immediately relevant is that iolock (in my original code - I really have no idea what your code looks like now) is an object created by multiprocessing (it's an mp.Lock()) and passed to child processes via mp.Pool() too:
pool = multiprocessing.Pool(5, getlock, (iolock,))
^^^^^^
mp is in control of everything here, and does a world stuff under the covers to ensure that this mp.Lock() has consistent state across processes. It's not just any old variable, it's something mp knows all about and all of whose behavior mp implements.
The second answer, which doesn't apply to any of the code in this issue so far, is that you can also create some kinds of data in "shared memory" using mp. See the docs for mp.Value and mp.Array and multiprocessing.sharedctypes. Those values are truly (physically) shared across processes.
But other than those (objects implemented by mp, and "shared memory" obtained from mp), you're right: no other values are shared (neither physically nor semantically) across processes. Communicating all other kinds of object values is done by pickling (serializing) and unpickling (reconstructing an object's value from a pickle string) at various mp synchronization points (like when you .put() an object on an mp.Queue, and another process .get()s it).