0

I need to use a process pool in Python. The requirement is the following:

The pool has a fixed size say 10. I have a number of jobs to submit to the pool (N > 10). In Java one can use a FixedThreadPool for that purpose. The jobs are submitted, and once a thread finished executing a task the client can submit the next task. So if there are 10 tasks currently running, the client cannot submit the 11th task. But if one completed, the client is able to submit the next task to the available thread.

This is the code I use to test some ideas:

import multiprocessing, time


def printStuff(number):
    print number
    if number % 2 : time.sleep(0.5)
    return number*number

pool = multiprocessing.Pool(5, None, None, None)   
a = []

def execute():
    def resultAggregator(n):
        print 'aggregator called...'
        a.append(n)
    for i in range (0, 34):

        # With callback
        #pool.apply_async(printStuff, [i], None, resultAggregator)
        #print "called for ", i

        # Without callback
        res = pool.apply_async(printStuff, [i])
        print "called for" , i, "returned ", res.get()

    pool.close() # disable sumitting any more tasks
    pool.join() # wait for all the worker to finish

execute()
print a

res.get() blocks until printStuff returns. Using a callback variant doesn't even call printStuff. Note in both cases a at the end is empty.

Any ideas how to implement the behavior described above? Code snippets would be great, but it is enough a pointer to an existing library function I am not aware of or just toss in some ideas.

1 Answer 1

2

I don't know about Java's FixedThreadPool, but I can fix your code ;-)

You obviously don't want to use res.get(), right? So I'll ignore that part. The problem with .apply_async() is that you're not calling it correctly. I'm surprised no exception was raised! The argument list should be a tuple, not a list (as for the builtin apply() function). And for the keyword-args argument, None doesn't work. If you don't have keyword arguments to pass, either leave it out (as I do below) or pass an empty dictionary ({}).

Other changes here are more cosmetic: introduced an IO lock to keep the terminal output from getting scrambled, and introduced a __name__ == "__main__" check both for clarity and so that the code will run on Windows too:

import multiprocessing, time

def getlock(lck):
    global iolock
    iolock = lck

def printStuff(number):
    with iolock:
        print number
    if number % 2:
        time.sleep(0.5)
    return number*number

def execute():
    def resultAggregator(n):
        with iolock:
            print 'aggregator called...'
        a.append(n)

    for i in range(34):
        pool.apply_async(printStuff, (i,), callback=resultAggregator)
        with iolock:
            print "called for ", i

if __name__ == "__main__":
    a = []
    iolock = multiprocessing.Lock()
    pool = multiprocessing.Pool(5, getlock, (iolock,))   
    execute()
    pool.close()
    pool.join()
    print a

Later: errors

Turns out an exception actually is raised if you pass None for the keyword arguments - but multiprocessing suppresses it. This is, alas, a common problem with asynchronous gimmicks: there's no good way to raise exceptions! They occur in a context unrelated to what your "main program" happens to be doing at the time.

At least Python 3.3.2's implementation of .apply_async() has an optional error_callback argument too. I don't know when it was introduced. If you supply it, async exceptions are passed to it, so you can decide how to report (or log, or ignore ...) them. Adding this function:

def ouch(e):
    raise e

and changing the invocation to:

pool.apply_async(printStuff, (i,), None, resultAggregator, ouch)

yields a traceback ending in ouch() with this exception detail:

TypeError: printStuff() argument after ** must be a mapping, not NoneType

So, at least, with a recent enough Python you can arrange not to let asynch errors pass invisibly.

Q&A

Could you explain the "global iolock" declaration within getLock() ? I thought it defines a global variable for each subprocess, but changing the name from iolock to iiolock in ? "main" makes iolock unknown to the worker processes.

Sorry, I can't tell from that exactly what you did. The name iolock is intended to be a global in all processes, main and children. That's because all processes in my code use the name iolock.

If, for example, "by changing the name ..." you mean you merely replaced

iolock = multiprocessing.Lock()

to

iiolock = multiprocessing.Lock()

then you would have got an exception:

Traceback (most recent call last):
  ...
    pool = multiprocessing.Pool(5, getlock, (iolock,))
NameError: global name 'iolock' is not defined

If you also changed that line (pool = ...) to use iiolock too, then you would have gotten a different exception, when resultAggregator in the main process tried to use iolock:

Exception in thread Thread-3:
Traceback (most recent call last):
  ...
  File "mpool.py", line 19, in resultAggregator
    with iolock:
NameError: global name 'iolock' is not defined

So I have no idea what you really did.

Also declaring printStuff within execute causes a silent error (code does not progress past printing "called for")

That can't work. Functions in Python are not declared - def is an executable statement. The code for printStuff doesn't exist before def printstuff is executed. Because only the main program executes execute(), functions def'ed inside execute() only exist in the main program. It's true that

    pool.apply_async(printStuff, (i,), callback=resultAggregator)

passes printStuff to the child processes, but all things passed work via pickling on the sending end and unpickling on the receiving end, and function objects cannot be pickled. Are you sure didn't get an error like this?:

_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed

(I'm using Python 3 here - maybe it's different under Python 2).

In any case, Python isn't Java - don't go crazy with nesting - keep it simple ;-) Every function and class used by a child process should be defined at module level (class is also an executable statement in Python! The only "declarations" in Python are the global and nonlocal statements).

More Q&A

You are right about the assumptions. I changed to iiolock in all places except main

Still don't know exactly what you did. For things like this, you really have to post code, not merely describe what you did. I can only guess otherwise - and that's really painful ;-) How about this: if you have a new problem, open a new question?

From what you described here ("in all places except main"), you would have gotten an exception in execute(), because the new iiolock name would not exist in the main process (which is the only process in which execute() runs - and you said you did not change the old iolock in main()). But you didn't mention an exception, so I guess you didn't really do exactly what you said you did ("in all places except main").

and was expecting that the new Process just get the same lock passed as a param to initialization function but each has its own global iiolock variable. How can multiple processes share the same variable anyway (aren't the memory contents different for each process???).

There are two answers to this ;-) The one most immediately relevant is that iolock (in my original code - I really have no idea what your code looks like now) is an object created by multiprocessing (it's an mp.Lock()) and passed to child processes via mp.Pool() too:

pool = multiprocessing.Pool(5, getlock, (iolock,)) 
                                         ^^^^^^

mp is in control of everything here, and does a world stuff under the covers to ensure that this mp.Lock() has consistent state across processes. It's not just any old variable, it's something mp knows all about and all of whose behavior mp implements.

The second answer, which doesn't apply to any of the code in this issue so far, is that you can also create some kinds of data in "shared memory" using mp. See the docs for mp.Value and mp.Array and multiprocessing.sharedctypes. Those values are truly (physically) shared across processes.

But other than those (objects implemented by mp, and "shared memory" obtained from mp), you're right: no other values are shared (neither physically nor semantically) across processes. Communicating all other kinds of object values is done by pickling (serializing) and unpickling (reconstructing an object's value from a pickle string) at various mp synchronization points (like when you .put() an object on an mp.Queue, and another process .get()s it).

Sign up to request clarification or add additional context in comments.

8 Comments

Perfect answer ! It seems to do exactly what I want.
@Eugen, you were very close already! Passing args in a list (instead of a tuple) actually works, but since that's undocumented (except that it appears in some examples) better to stick with the tuple spelling. The only "real" problem in your code was passing None :-)
Could you explain the "global iolock" declaration within getLock() ? I thought it defines a global variable for each subprocess, but changing the name from iolock to iiolock in "main" makes iolock unknown to the worker processes. Also declaring printStuff within execute causes a silent error (code does not progress past printing "called for")
@Eugen, please see my edit just now - couldn't fit in a comment.
Progress ;-) printStuff() doesn't have to be passed, because it's defined at module level - every process sees it automatically. On Linux-y systems, because it was already defined when mp forks, and on Windows because each child process imports the "main program" as a module (so re-executes all the code in the module). Likewise execute() is known to all processes, but only the main program actually calls it.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.