1

I am having 1 queue which is accessed by 2 multiprocessing functions. Both these processes and consuming the same item in the queue and then clearing it. I want each one to take one unique value only. What am I doing wrong?

import time
import queue
import multiprocessing
import threading

q = queue.Queue(maxsize=0)
run_1 = 1
run_2 = 1

def multi_one():
    while run_1 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

def multi_two():
    while run_2 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

p1 = multiprocessing.Process(target=multi_one)
p2 = multiprocessing.Process(target=multi_two)

for item in range(10):
    q.put(item)

p1.start()
p2.start()

Output I am getting is:

0
0
1
1
2
2
...

Output I am looking for is:

0
1
2
3
4
5
6
7
8
9

2 Answers 2

2

Your code contains several errors, describing which I will quote from the documentation:

  • You should protect the “entry point” of the program by using if __name__ == '__main__'

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).

  • You should pass the queue object as an argument to constructor.

On Unix using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

  • You should use multiprocessing.Queue or multiprocessing.JoinableQueue if you want use JoinableQueue.task_done() Because queue.Queue is used only in a multi-threading context when both the producer and the consumer are in the same process.

Considering the above notes, your code can be modified in this way (although it is still far from ideal):

import time
import multiprocessing

import threading

q = multiprocessing.JoinableQueue(maxsize=0)
run_1 = 1
run_2 = 1

def multi_one(q):
    while run_1 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

def multi_two(q):
    while run_2 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

if __name__ == "__main__":
    p1 = multiprocessing.Process(target=multi_one, args=(q, ))
    p2 = multiprocessing.Process(target=multi_two, args=(q, ))

    for item in range(10):
        q.put(item)

    p1.start()
    p2.start()

Output:

0
1
2
3
...
Sign up to request clarification or add additional context in comments.

2 Comments

thank you for the detailed explanation. It really helped. I was using Jupyter notebooks so far and didn't think "name == 'main'" would be relevant but as I am shifting to executing .py files directly, this will definitely add value. Is it robust/safe to pass arguments back from the child process to the main one using a separate queue? I have a Pandas DataFrame which needs to be updated. I have create 2 queue's. One for sending out data to multi-processes (1-7) and second queue which gets filled by these (1-7) processes and is accessed by multiprocess #8 to write to a csv.
It is safe to use different queues for input and output.
1

You're using wrong type of Queue, try to change it to multiprocessing.JoinableQueue:

import time
import multiprocessing

q = multiprocessing.JoinableQueue(maxsize=0)
run_1 = 1
run_2 = 1

def multi_one(q):
    while run_1 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

def multi_two(q):
    while run_2 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

for item in range(10):
    q.put(item)

p1 = multiprocessing.Process(target=multi_one, args=(q, ))
p2 = multiprocessing.Process(target=multi_two, args=(q, ))

p1.start()
p2.start()

Prints:

0
1
2
3
4
5
6
7
8
9

5 Comments

Probably best to mention why queue.Queue doesn't work: multiprocessing is forking (or spawning, but behavior indicates fork) brand new processes that have their own copy of the memory state of the original program, including their own copy of the state of the queue, which, after forking, is no longer connected to the copies in the parent or the other worker process. A JoinableQueue, by contrast, exists as a singleton across many processes; the underlying data isn't duplicated in a fork, so they all pull from the same queue. queue.Queue is for threads, JoinableQueue for processes.
@ShadowRanger noted. What if I have a common pandas dataframe that needs to be updated by multiple processes i.e. sharing of a df across multi-process. Is there a way to make a variable global across difference processes or any other way to achieve this?
@Andrej Kesely: Thank you!
@Neel: Read the full multiprocessing docs; there are a few ways to do it, but aside from actual shared memory, the majority involve fairly expensive synchronization or IPC. If the dataframe is numpy backed, you may want to stick to threads, as numpy will release the GIL for large operations, allowing threads to work in parallel on CPython (they normally don't, the exceptions being I/O and third-party C extensions that explicitly release the GIL).
@ShadowRanger:Noted. I briefly did go through them and found a way. I created one more queue to which my multiprocess push data to and the data is then accessed and entered in the dataframe using a separate multiprocess function from the new queue

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.