0

I'm running both I/O Bound and CPU Bound task. Where multiprocessing is used for crawling website from multiprocessing.Queue(), which parse it's HTML to extract links, and threading is used to read a text file that contains list of subdomain from giant marketplace sites (shopee.com and tokopedia.com)

import multiprocessing
from threading import Thread
from multiprocessing import cpu_count, Process, Queue

tasker = Queue(cpu_count()*10)

def consumer():
    while True:
        task = tasker.get()
        if task is None:
            break
        print(task)

def adding_task():
    with open('file.txt', 'r') as f:
        for line in f:
            tasker.put(line.strip())
    for i in range(cpu_count()):
        tasker.put(None)

def producer():
    add_task = Thread(target = adding_task)
    add_task.start()
    procs = []
    for i in range(cpu_count()):
        p = Process(target = consumer)
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
    
    #Suspected problem.
    add_task.join()

producer()

The problems is: Queue is growing faster than multiprocessing could finish the task. Currently, I'm using this to check if the task is empty:

import multiprocessing
from threading import Thread
from netaddr import IPNetwork
from multiprocessing import cpu_count, Process, Queue

tasker = Queue(cpu_count()*10)
ips = '173.245.48.0/20'

def consumer():
    while True:
        task = tasker.get()
        if task is None:
            break
        print(task)

#Check if Queue is empty or full: tasker.full()
def check_tasker():
    while True:
        if tasker.empty():
            break
        pass

def adding_task():
    for ip in IPNetwork(ips):
        check_tasker()
        tasker.put(str(ip))
    for i in range(cpu_count()):
        tasker.put(None)

def producer():
    add_task = Thread(target = adding_task)
    add_task.start()
    procs = []
    for i in range(cpu_count()):
        p = Process(target = consumer)
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
    
    add_task.join()
    exit()

producer()

Is there a better way to temporarily stop threading if multiprocessing.Queue has reach N amount of task? and continue if it's decreased?

1
  • 2
    You're setting the size to 10 x the CPU count. tasker.put should block when you reach that size. Commented Jan 24, 2023 at 20:12

1 Answer 1

1

The queue is already bounded:

tasker = Queue(cpu_count()*10)

That's the only limit you need. The capacity bound already stops new tasks from being added if the queue is at max capacity. (OS-level limits on the size of the underlying pipe may prevent adding items even before the queue reaches nominal capacity.)

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for confirming, At first, I thought I need more checking on the Queue.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.