6

How can I limit the number of concurrent threads in Python?

For example, I have a directory with many files, and I want to process all of them, but only 4 at a time in parallel.

Here is what I have so far:

def process_file(fname):
        # open file and do something                                                                                            

def process_file_thread(queue, fname):
    queue.put(process_file(fname))

def process_all_files(d):
    files=glob.glob(d + '/*')
    q=Queue.Queue()
    for fname in files:
        t=threading.Thread(target=process_file_thread, args=(q, fname))
        t.start()
    q.join()

def main():
    process_all_files('.')
    # Do something after all files have been processed

How can I modify the code so that only 4 threads are run at a time?

Note that I want to wait for all files to be processed and then continue and work on the processed files.

4
  • 2
    Have you tried multiprocess Pools? On Python 3 you can also use futures. Commented Aug 21, 2013 at 0:56
  • 2
    You can use futures in Python 2 also, you just need to install the backport. Commented Aug 21, 2013 at 0:57
  • concurrent.futures is indeed the best way to do it Commented Aug 21, 2013 at 0:58
  • You could use a multiprocessing.pool.ThreadPool to easily limit the number of threads, as shown in this answer to another question. Commented Aug 21, 2013 at 2:17

2 Answers 2

9

For example, I have a directory with many files, and I want to process all of them, but only 4 at a time in parallel.

That's exactly what a thread pool does: You create jobs, and the pool runs 4 at a time in parallel. You can make things even simpler by using an executor, where you just hand it functions (or other callables) and it hands you back futures for the results. You can build all of this yourself, but you don't have to.*

The stdlib's concurrent.futures module is the easiest way to do this. (For Python 3.1 and earlier, see the backport.) In fact, one of the main examples is very close to what you want to do. But let's adapt it to your exact use case:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        concurrent.futures.wait(fs)

If you wanted process_file to return something, that's almost as easy:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        for f in concurrent.futures.as_completed(fs):
            do_something(f.result())

And if you want to handle exceptions too… well, just look at the example; it's just a try/except around the call to result().


* If you want to build them yourself, it's not that hard. The source to multiprocessing.pool is well written and commented, and not that complicated, and most of the hard stuff isn't relevant to threading; the source to concurrent.futures is even simpler.

Sign up to request clarification or add additional context in comments.

Comments

0

I used this technique a few times, I think it's a bit ugly thought:

import threading

def process_something():
    something = list(get_something)

    def worker():
        while something:
            obj = something.pop()
            # do something with obj

   threads = [Thread(target=worker) for i in range(4)]
   [t.start() for t in threads]
   [t.join() for t in threads]

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.