1

When I try to implement a parallel operation in python with multiprocessing library, I saw some processes do not terminate in non-intuitive manner.

My program consists of:

  • a queue, used for data transfer between processes
  • a user process, which calculates something using data received via the queue
  • two maker processes, which generate data and push to the queue

Below is a simplified example. make_data generates random numbers and push to the queue, and the use_data receives the data and compute the average. In total, 2*1000=2000 numbers are generated, and all of them are used. This code runs as expected. After all, all processes becomes terminated and no data is left in the queue.

import random
from multiprocessing import Process, Queue

q = Queue(maxsize=10000)
def make_data(q):
    for i in range(1000):
        x = random.random()
        q.put(x)
    print("final line of make data")

def use_data(q):
    i = 0
    res = 0.0
    while i < 2000:
        if q.empty():
            continue
        i += 1
        x = q.get()
        res = res*(i-1)/i + x/i
    print("iter %6d, avg = %.5f" % (i, res))

u = Process(target=use_data, args=(q,))
u.start()

p1 = Process(target=make_data, args=(q,))
p1.start()
p2 = Process(target=make_data, args=(q,))
p2.start()


u.join(timeout=10)
p1.join(timeout=10)
p2.join(timeout=10)
print(u.is_alive(), p1.is_alive(), p2.is_alive(), q.qsize())

Outcome:

final line of make data
final line of make data
iter   2000, avg = 0.49655
False False False 0

Things change when I let the makers generate more than necessary data. The code below differs from the above only in that each maker generates 5000 data, hence not all data are used. When this is run, it prints message of the final lines, but the maker processes never terminate (needs Ctrl-C to stop).

import random
from multiprocessing import Process, Queue

q = Queue(maxsize=10000)
def make_data(q):
    for i in range(5000):
        x = random.random()
        q.put(x)
    print("final line of make data")

def use_data(q):
    i = 0
    res = 0.0
    while i < 2000:
        if q.empty():
            continue
        i += 1
        x = q.get()
        res = res*(i-1)/i + x/i
    print("iter %6d, avg = %.5f" % (i, res))

u = Process(target=use_data, args=(q,))
u.start()

p1 = Process(target=make_data, args=(q,))
p1.start()
p2 = Process(target=make_data, args=(q,))
p2.start()


u.join(timeout=10)
p1.join(timeout=10)
p2.join(timeout=10)
print(u.is_alive(), p1.is_alive(), p2.is_alive(), q.qsize())

Outcome:

final line of make data
final line of make data
iter   2000, avg = 0.49388
False True True 8000
# and never finish

It looks to me that all processes run to the end, so wonder why they keep alive. Can someone help me understand this phenomenon?

I ran this program on python 3.6.6 from miniconda distribution.

1
  • Offcial document had hint. docs.python.org/2/library/… "Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.) This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined." Commented Mar 9, 2019 at 22:52

1 Answer 1

2

The child processes putting items into the queue are stuck trying to actually put the object in the queue.

A normal, non-multiprocessing, Queue object is implemented entirely in the address space of a single process. In that case the maxsize is the number of items that can be enqueued before a put() call blocks. But a multiprocessing Queue object is implemented using an IPC mechanism; typically a pipe. And an OS pipe can queue a finite number of bytes (a typical limit is 8KB). So when your use_data() process terminates after dequeuing just 2000 items the make_data() processes block because their IPC channel is full when flushing the locally queued items into the IPC on exit. This means they don't actually exit and thus the attempt to join() those processes blocks indefinitely.

In effect you've created a deadlock. The exact threshold at which that occurs depends on how much data the IPC channel can buffer. For example, on one of my Linux servers your second example works reliably with this inserted between the u.join() and the p1.join():

for _ in range(4000):
    q.get()

Reducing that range slightly (e.g., to 3990) produces intermittent hangs. Reducing the range more (e.g., to 3500) will always hang because at least one of the processes stuffing data into the queue blocks while flushing its items into the IPC channel.

The lesson of this story? Always fully drain a multiprocessing queue before attempting to wait for the processes to terminate.

Sign up to request clarification or add additional context in comments.

2 Comments

P.S., In a real world situation you typically don't know how many items will be stuffed into the queue. The way to handle that is to have each child process stuff a sentinel value that can be distinguished from the actual data into the queue when they exit. The reader then checks for the sentinel and when it is seen "joins" that process and forgets about it. Alternatively, call the terminate() method on p1 and p2 after you've called u.join().
Thank you for your explanation. I think it clears my case and guides me to deeper understanding.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.