2

I'm trying to read and modify each row of a number of files using Python. Each file has thousands to hundreds of thousands of rows and hence each file is processed only after another one if processed. I'm trying to read the files like:

csvReader = csv.reader(open("file","r")
for row in csvReader:
    handleRow(row)

I want to use multi threading to read each of the files using a different thread parallel in order to save time. Can anyone point out if it would be useful as well as how to implement that?

1
  • As a side note: you're not closing your files anywhere, which is going to be a serious problem if you're opening hundreds of files. Commented Oct 19, 2013 at 21:25

2 Answers 2

3

It may or may not be useful--if all the files are on the same drive, and you're already pushing the drive as fast as it can go, multiplexing can only slow things down. But if you're not maxing out your I/O it'll speed things up.

As far as how to do it, that's trivial. Wrap your code up in a function that takes a pathname, then use a concurrent.futures.ThreadPoolExecutor or a multiprocessing.dummy.Pool and it's one line of code to map your function over your whole iterable of pathnames:

with ThreadPoolExecutor(4) as executor:
    executor.map(func, paths)

One more thing: if the reason you can't max out the I/O is because you're doing too much CPU work on each line, threads won't help in Python (because of the GIL), but you can just use processes--the exact same code, but with ProcessorPoolExecutor.

Sign up to request clarification or add additional context in comments.

Comments

1

Probably your bottleneck is the I/O, so multithreading would not help; anyway, is easy to try: the following code elaborates all the files on current directory, one thread for file, by applying a given string function to each row and writing the new file on a given path.

from threading import Thread
from os import listdir
from os.path import basename, join, isfile

class FileChanger(Thread):
     def __init__(self, sourcefilename, rowfunc, tgpath):
         Thread.__init__(self)
         self.rowfunc = rowfunc
         self.sfname = sourcefilename
         self.tgpath = tgpath

     def run(self):
         tgf = open(join(self.tgpath, basename(self.sfname)), 'w')
         for r in open(self.sfname):
             tgf.write(self.rowfunc(r))
         tgf.close()

# main #
workers = [FileChanger(f, str.upper, '/tmp/tg') \
                for f in listdir('.') if isfile(f)]
for w in workers:
    w.start()
for w in workers:
    w.join()

7 Comments

About CPU and threads: it's true CPython cannot really use multiprocessor power with threads cause GIL, but if you don't want to pay the price of full processes you could still try your code on a Python implementation without GIL, like Jython.
Unless he only has a handful of files, using one file per thread is probably going to cost more than it saves. It should only take a small pool of threads to get any benefits there may be to get.
@abarnert you're right, the above was just an example. Anyway it would be easy evolving the code in order to use the 'worker' list as queue with capped size: every time we join a thread we create and start another one...
@dbra 2 questions.1) why is w.join used 2) will the thread end itself when finished.
# I assumed the program would do something else when all the work is done, at least telling it to someone; of course you could not wait and just leave them alone; # Creating a thread is not so heavy, just a matter of ms, so I found cleaner defining a simple one-shot object (like Go's goroutines) leaving the distribution logic outside; this also made trivial the coordination.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.