6

I am trying to exit a multiprocessing script when an error is thrown by the target function, but instead of quitting, the parent process just hangs.

This is the test script I use to replicate the problem:

#!/usr/bin/python3.5

import time, multiprocessing as mp

def myWait(wait, resultQueue):
    startedAt = time.strftime("%H:%M:%S", time.localtime())
    time.sleep(wait)
    endedAt = time.strftime("%H:%M:%S", time.localtime())
    name = mp.current_process().name
    resultQueue.put((name, wait, startedAt, endedAt))

# queue initialisation
resultQueue = mp.Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
    mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
    ]

# starting processes
for p in proc:
    p.start()

for p in proc:
    p.join()

# print results
results = {}
for p in proc:
    name, wait, startedAt, endedAt = resultQueue.get()
    print('Process %s started at %s wait %s ended at %s' % (name, startedAt, wait, endedAt))

This works perfectly, I can see the parent script spawning two child processes in htop but when I want to force the parent script to exit if an error is thrown in the myWait target function the parent process just hangs and doesn't even spawn any child process. I have to ctrl-c to kill it.

def myWait(wait, resultQueue):
    try:
        # do something wrong
    except:
        raise SystemExit

I have tried every way to exit the function (e.g. exit(), sys.exit(), os._exit()...) to no avail.

0

2 Answers 2

2

Firstly, your code has a major issue: you're trying to join the processes before flushing the content of the queues, if any, which can result in a deadlock. See the section titled 'Joining processes that use queues' here: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming

Secondly, the call to resultQueue.get() will block until it receives some data, which never happens if an exception is raised from the myWait function and that no data has been pushed into the queue before then. So make it non-blocking and make it check for any data in a loop until it finally receives something or that something's wrong.

Here's a quick'n'dirty fix to give you the idea:

#!/usr/bin/python3.5

import multiprocessing as mp
import queue
import time

def myWait(wait, resultQueue):
    raise Exception("error!")

# queue initialisation
resultQueue = mp.Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
    mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
    ]

# starting processes
for p in proc:
    p.start()

# print results
results = {}
for p in proc:
    while True:
        if not p.is_alive():
            break

        try:
            name, wait, startedAt, endedAt = resultQueue.get(block=False)
            print('Process %s started at %s wait %s ended at %s'
                  % (name, startedAt, wait, endedAt))
            break
        except queue.Empty:
            pass

for p in proc:
    p.join()

The function myWait will throw an exception but both processes will still join and the program will exit nicely.

Sign up to request clarification or add additional context in comments.

8 Comments

get(block=False) did the trick. Your quick'n'dirty fix trow other errors from the multiprocessing lib though, but it's definitely getting there. Thanks.
Truth to be told, I've been running this code snippet using Python 3.4 but there doesn't seem to be any impactful change in 3.5 that would change its behaviour. On my side I don't get any error except the one voluntarily thrown with the raise Exception("error!") line. What's the error log like in your case?
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap self.run() and File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs)
Yes, as mentioned in my previous comment, that's to be expected... you only copy/pasted the 2 first entries from the traceback but it then continues onto the line raise Exception("error!"), which is the real source of the error. I only put this exception here to demonstrate that even if your code goes wrong, as you've been mentioning in your question, then your processes now join as they should and the program exits nicely. So just remove this exception and replace it with the actual content of the myWait function.
I have edited my answer to mention the expected behaviour.
|
2

You should use multiprocessing.Pool to manage your processes for you. And then use Pool.imap_unordered to iterate over the results in the order they are completed. As soon as you get the first exception, you can stop the pool and its child processes (this is automatically done when you exit the with Pool() as pool block). eg

from multiprocessing import Pool
import time

def my_wait(args):
    name, wait = args
    if wait == 2:
        raise ValueError("error!")
    else:
        startedAt = time.strftime("%H:%M:%S", time.localtime())
        time.sleep(wait)
        endedAt = time.strftime("%H:%M:%S", time.localtime())
        return name, wait, startedAt, endedAt

if __name__ == "__main__":
    try:
        with Pool() as pool:
            args = [["_One_", 2], ["_Two_", 3]]
            for name, wait, startedAt, endedAt in pool.imap_unordered(my_wait, args):     
                print('Task %s started at %s wait %s ended at %s' % (name,
                    startedAt, wait, endedAt))
    except ValueError as e:
        print(e)

This method is not suitable for long-timed, low-workload tasks, as it will only run as many of the tasks in parallel as the number of child processes it is managing (but this is something you can set). It's also not great if you need to run different functions.

2 Comments

Ah, I didn't expect seeing Pool here since it's a solution to a different set of problems. Thanks for taking the time nonetheless!
Looks promising and terse, compared to my attempt. As for the real thing (this was just a test bench), I am looking to send simultaneously only 2 http requests and get their results. So, only one target function of short-timed processes. Your method seems to be perfectly suitable. Thanks.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.