1

I have sequential producer consumer model which is taking a lot of time to execute. So I am trying to make the consumer code run concurrently.

Note: objects is a generator.

func report_object(self, object_type, objects):
    for obj in objects:
        try:
            change_handler(obj, self.config)
        except Exception as e:
            LOG.error("Error occurred in handling object: %s" % e)
            LOG.exception(e)
    else:
        LOG.info(" Consumer: no objects reported")

Threaded implementation of the above function:

import threading

func report_object(self, object_type, objects):
    threads = []
    for obj in objects:
        try:
            t = threading.Thread(target=change_handler,args=(obj, self.config))
            LOG.info(" ***** Number of active threads: %d *****", threading.activeCount())
            t.start()
            threads.append(t)
        except Exception as e:
            LOG.error("Error occurred in handling object: %s" % e)
            LOG.exception(e)
   for t in threads: 
      t.join()
   else:
       LOG.info(" Consumer: no objects reported")

If the above mechanism is followed I am running as many threads as len(objects). I this case if the objects become very huge like 1000/10000 then what will be the impact? Will there be a race condition? If yes then how can I prevent this? I tried another solution like:

threads = [ threading.Thread(target=change_handler,args=(obj, self.config)) for _ in range(8)]
for thread in threads:
    thread.start()
    LOG.info(thread.name)


for thread in threads:
    thread.join()

The number of active thread is still increasing. What would be the best way to restrict the number of active threads and best way to make the above function run concurrently.

3
  • 2
    Sounds like you want to use a ThreadPoolExecutor with the keyword argument max_workers=n. Commented Apr 13, 2020 at 12:03
  • Tried using ThreadPoolExecutor like this; def report_objects(self, object_type, objects): executor = ThreadPoolExecutor(max_workers=8) for obj in objects: try: executor.submit(change_handler(obj,self.config)) except Exception as e: LOG.error("Error occurred in handling object: %s" % e) Commented Apr 13, 2020 at 13:42
  • I tried executorpool with max_worker: 8,2,4. The execution time is quite similar. I tried it on limited objects. May be running it on large objects will see some improvements. Will give it a try. Commented Apr 13, 2020 at 13:45

1 Answer 1

1

The best way of controlling the number of threads is to use the ThreadPoolExecutor from the concurrent.futures package, and there are several ways of doing this. One way is to use the submit method, which returns a Future object representing the future completion of the thread. If the thread returns a result, you can call the result method on this object which will block until the call is complete and then returns the value returned from the call (there are, of course, many other methods you can call on the Future object). You are not obliged to save the Future object if the thread does not return a value or you do not otherwise need to test for successful completion.

Here is an example of how to use the ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
import time, random

def my_thread(n):
    time.sleep(random.random())
    return n, time.time()

MAX_THREADS = 10

with ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
    futures = [e.submit(my_thread, n) for n in range(15)]
    for f in futures:
        print(f.result())

Prints:

(0, 1586782110.1816075)
(1, 1586782109.4404495)
(2, 1586782109.6663365)
(3, 1586782109.8307955)
(4, 1586782109.6733325)
(5, 1586782109.6103601)
(6, 1586782109.3914738)
(7, 1586782109.6803281)
(8, 1586782109.8587916)
(9, 1586782109.7173235)
(10, 1586782110.3664994)
(11, 1586782110.1816075)
(12, 1586782110.518443)
(13, 1586782110.4524374)
(14, 1586782110.0256832)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.