I have sequential producer consumer model which is taking a lot of time to execute. So I am trying to make the consumer code run concurrently.
Note: objects is a generator.
func report_object(self, object_type, objects):
for obj in objects:
try:
change_handler(obj, self.config)
except Exception as e:
LOG.error("Error occurred in handling object: %s" % e)
LOG.exception(e)
else:
LOG.info(" Consumer: no objects reported")
Threaded implementation of the above function:
import threading
func report_object(self, object_type, objects):
threads = []
for obj in objects:
try:
t = threading.Thread(target=change_handler,args=(obj, self.config))
LOG.info(" ***** Number of active threads: %d *****", threading.activeCount())
t.start()
threads.append(t)
except Exception as e:
LOG.error("Error occurred in handling object: %s" % e)
LOG.exception(e)
for t in threads:
t.join()
else:
LOG.info(" Consumer: no objects reported")
If the above mechanism is followed I am running as many threads as len(objects). I this case if the objects become very huge like 1000/10000 then what will be the impact? Will there be a race condition? If yes then how can I prevent this? I tried another solution like:
threads = [ threading.Thread(target=change_handler,args=(obj, self.config)) for _ in range(8)]
for thread in threads:
thread.start()
LOG.info(thread.name)
for thread in threads:
thread.join()
The number of active thread is still increasing. What would be the best way to restrict the number of active threads and best way to make the above function run concurrently.
ThreadPoolExecutorwith the keyword argumentmax_workers=n.def report_objects(self, object_type, objects): executor = ThreadPoolExecutor(max_workers=8) for obj in objects: try: executor.submit(change_handler(obj,self.config)) except Exception as e: LOG.error("Error occurred in handling object: %s" % e)