I'm running both I/O Bound and CPU Bound task. Where multiprocessing is used for crawling website from multiprocessing.Queue(), which parse it's HTML to extract links, and threading is used to read a text file that contains list of subdomain from giant marketplace sites (shopee.com and tokopedia.com)
import multiprocessing
from threading import Thread
from multiprocessing import cpu_count, Process, Queue
tasker = Queue(cpu_count()*10)
def consumer():
while True:
task = tasker.get()
if task is None:
break
print(task)
def adding_task():
with open('file.txt', 'r') as f:
for line in f:
tasker.put(line.strip())
for i in range(cpu_count()):
tasker.put(None)
def producer():
add_task = Thread(target = adding_task)
add_task.start()
procs = []
for i in range(cpu_count()):
p = Process(target = consumer)
p.start()
procs.append(p)
for p in procs:
p.join()
#Suspected problem.
add_task.join()
producer()
The problems is: Queue is growing faster than multiprocessing could finish the task. Currently, I'm using this to check if the task is empty:
import multiprocessing
from threading import Thread
from netaddr import IPNetwork
from multiprocessing import cpu_count, Process, Queue
tasker = Queue(cpu_count()*10)
ips = '173.245.48.0/20'
def consumer():
while True:
task = tasker.get()
if task is None:
break
print(task)
#Check if Queue is empty or full: tasker.full()
def check_tasker():
while True:
if tasker.empty():
break
pass
def adding_task():
for ip in IPNetwork(ips):
check_tasker()
tasker.put(str(ip))
for i in range(cpu_count()):
tasker.put(None)
def producer():
add_task = Thread(target = adding_task)
add_task.start()
procs = []
for i in range(cpu_count()):
p = Process(target = consumer)
p.start()
procs.append(p)
for p in procs:
p.join()
add_task.join()
exit()
producer()
Is there a better way to temporarily stop threading if multiprocessing.Queue has reach N amount of task? and continue if it's decreased?
tasker.putshould block when you reach that size.