2

In my code I need to have multiple worker thread instances running in a python program. I initially create few worker thread instances(say 10) and then add them into a pool. Whenever a client requests a service, a thread should be invoked and reserved for the client. After completing the task, the thread should add back to the pool.

I have written the following code so far. But I'm not sure about how to run threads forever in the pool(they should be sleeping when inside pool), invoke and get service whenever needed and add them back to the pool(should sleep again) after processing. Any help would be appreciated.

PRED = Queue(10)

class Worker(threading.Thread):
    def __init__(self, threadID, name):
        threading.Thread.__init__(self)
        self.threadID  =threadID
        self.name = name

    def run(self):
        print("starting " + self.name + " thread")
        while True:
            ??
        print("Exiting " + self.name + " thread")


def work():
    print("working")
    time.sleep(3)
  • Let's say worker threads are in PRED queue.
  • work() is the method which I should call to serve a client.

2 Answers 2

9

Here's something I derived from the Python documentation

read more: https://docs.python.org/3/library/queue.html#queue.Queue.join

Make sure you have a good read of it, there's some cool options like creating a priority queue or a First In First Out, or Last In First Out.

import queue
import threading
import time


# The queue for tasks
q = queue.Queue()


# Worker, handles each task
def worker():
    while True:
        item = q.get()
        if item is None:
            break
        print("Working on", item)
        time.sleep(1)
        q.task_done()


def start_workers(worker_pool=1000):
    threads = []
    for i in range(worker_pool):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)
    return threads


def stop_workers(threads):
    # stop workers
    for i in threads:
        q.put(None)
    for t in threads:
        t.join()


def create_queue(task_items):
    for item in task_items:
        q.put(item)


if __name__ == "__main__":
    # Dummy tasks
    tasks = [item for item in range(1000)]

    # Start up your workers
    workers = start_workers(worker_pool=10)
    create_queue(tasks)

    # Blocks until all tasks are complete
    q.join()

    stop_workers(workers)
Sign up to request clarification or add additional context in comments.

1 Comment

Nice example! You should consider making them daemons in case they do not get shut down properly (t.daemon = True). In my experience it also helps naming the workers for debugging.
0

Use a task queue to send tasks to your workers. Make your workers listen to the task queue and wait if it's empty. When a worker gets a task from the queue it should execute it and then go back to polling the queue. Pretty standard worker pattern.

When I say task, I mean you can put an actual method in the queue. The worker can just pick it up and execute it.

2 Comments

Yeah. That pattern is ok. But can you provide a little code on how to archive it. I'm not clear on how to do listening part in python
That's great. Thank you

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.