5

I'm using queues from the multiprocessing library for sharing data between processes.

I have 2 queues, both are limited to 10 objects, the first queue has one process that "puts" objects into it and many processes "get" from it.

The second queue has many processes that "put" objects into it, and only one process "gets" from it.

The system works perfectly for a while and then starts behaving strangly: only the process that "puts" objects into the first queue continues to work while the processes that read from the first queue apparently are not behaving/working anymore (even though the processes are alive). It seems that there's a deadlock here but I'm not sure, here is my code:

UPDATED

import multiprocessing
import logging
from multiprocessing import Process

logger = logging.get_logger(__name__)

# Processes 2, 3 ,4:

class Processes_234(Process):
    def __init__(self, message_queue_1, message_queue_2):
        Process.__init__(self)
        self.message_queue_1 = message_queue_1
        self.message_queue_2 = message_queue_2

    def run(self):
        while True:
            try:
                # get from queue
                el1, el2, el3 = self.message_queue_1.get()
                logger.debug('Processes234: get from queue')
            except Exception as exp:
                logger.debug("message_queue_1: queue empty, Exception message: " + str(exp))

            # do some stuff with el1, el2, el3...

            try:
                # put into second queue
                self.message_queue_2.put_nowait((el1, el2, el3))
                logger.debug('Processes234: put into queue')
            except Exception as excpt:
                logger.debug(excpt)
                logger.debug("message_queue_2: queue is full")
                # the queue is full so replace the old element with the new one
                try:
                    self.message_queue_2.get_nowait()
                    self.message_queue_2.put_nowait((el1, el2, el3))
                    # in case other process already fill the queue - ignore
                except:
                    pass


# process 5:
class Process5(Process):
    def __init__(self, message_queue_2):
        Process.__init__(self)
        self.message_queue_2 = message_queue_2

    def run(self):
        while True:
            try:
                # get from queue
                el1, el2, el = self.message_queue_2.get()
                print('Process5: get from queue')

            except Exception as exp:
                print("message_queue_2: queue empty, Exception message: " + str(exp))


def start_process_1():
    # init queues
    message_queue_1 = multiprocessing.Queue(maxsize=10)
    message_queue_2 = multiprocessing.Queue(maxsize=10)

    processes_234 = [Processes_234(message_queue_1, message_queue_2)
                     for _ in range(3)]

    for proc in processes_234:
        proc.start()

    process5 = Process5(message_queue_2)
    process5.start()
    counter = 1

    while True:
        el1 = counter + 1
        el2 = counter + counter
        el3 = "some string " * ((counter ** 2) % 60000)
        counter += 1
        # start passing data
        try:

            # put into queue
            message_queue_1.put_nowait((el1, el2, el3))
            logger.debug('Process1: put into queue')
        except Exception as excpt:
            logger.debug(excpt)
            logger.debug("message_queue_1: queue is full")
            # the queue is full so replace the old element with the new one
            try:
                message_queue_1.get_nowait()
                message_queue_1.put_nowait((el1, el2, el3))
                # in case other process already fill the queue - ignore
            except:
                pass


if __name__ == '__main__':
    start_process_1()

does anyone know what my problem is?

I'm using python 3.6.5

2
  • Can't see any immediate issue with what you posted, but then again, it's hard to try to reproduce your problem because we have to make assumptions on how the code is executed (what do your Process* objects and your __main__ look like). Please create an MCVE and also explain how exactly you determine that your Process_234 objects stopped working. Commented Jul 26, 2018 at 11:42
  • thanks, I solved and I updated the code at your request to one who reproduce the case. Commented Aug 7, 2018 at 18:12

1 Answer 1

4

Finally I was able to solve the problem, it was the logger! According to logging library the logger is thread safe but not multi-process safe.

I changed the code so that each process has its own logger and it solved the issue.

Sign up to request clarification or add additional context in comments.

2 Comments

Good catch and thanks for updating your question. This way, it might actually help others with a similar problem
I am not sure how long it would have taken me to solve this problem if I hadn't found this, thank you!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.