4

I have a very large list of strings (originally from a text file) that I need to process using python. Eventually I am trying to go for a map-reduce style of parallel processing.

I have written a "mapper" function and fed it to multiprocessing.Pool.map(), but it takes the same amount of time as simply calling the mapper function with the full set of data. I must be doing something wrong.

I have tried multiple approaches, all with similar results.

def initial_map(lines):
    results = []
    for line in lines:
        processed = # process line (O^(1) operation)
        results.append(processed)
    return results

def chunks(l, n):
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

if __name__ == "__main__":
    lines = list(open("../../log.txt", 'r'))
    pool = Pool(processes=8)
    partitions = chunks(lines, len(lines)/8)
    results = pool.map(initial_map, partitions, 1)

So the chunks function makes a list of sublists of the original set of lines to give to the pool.map(), then it should hand these 8 sublists to 8 different processes and run them through the mapper function. When I run this I can see all 8 of my cores peak at 100%. Yet it takes 22-24 seconds.

When I simple run this (single process/thread):

lines = list(open("../../log.txt", 'r'))
results = initial_map(results)

It takes about the same amount of time. ~24 seconds. I only see one process getting to 100% CPU.

I have also tried letting the pool split up the lines itself and have the mapper function only handle one line at a time, with similar results.

def initial_map(line):
    processed = # process line (O^(1) operation)
    return processed

if __name__ == "__main__":
    lines = list(open("../../log.txt", 'r'))
    pool = Pool(processes=8)
    pool.map(initial_map, lines)

~22 seconds.

Why is this happening? Parallelizing this should result in faster results, shouldn't it?

3
  • 1
    How large is the data you are passing around? My guess is that your CPU is saturated mostly by serialising and deserealising data. Commented Apr 29, 2014 at 20:11
  • Your processing is rather I/O bound, so it is not CPU limited as it mostly waits for some data to come. With mulitple processes you sometime add overhead, which even slows things down. Commented Apr 29, 2014 at 20:13
  • 3
    Just a heads up, the third argument to pool.map is chunksize removing the need to write your own chunks function. Commented Apr 29, 2014 at 20:14

1 Answer 1

1

If the amount of work done in one iteration is very small, you're spending a big proportion of the time just communicating with your subprocesses, which is expensive. Instead, try to pass bigger slices of your data to the processing function. Something like the following:

slices = (data[i:i+100] for i in range(0, len(data), 100)

def process_slice(data):
    return [initial_data(x) for x in data]

pool.map(process_slice, slices)

# and then itertools.chain the output to flatten it

(don't have my comp. so can't give you a full working solution nor verify what I said)

Edit: or see the 3rd comment on your question by @ubomb.

Sign up to request clarification or add additional context in comments.

2 Comments

I believe that is what my approach in the first block of code is doing. chunks gets a list of "slices" to give to map and the actual mapper function loops through them.
well then increase the chunk size — that's all I'm saying. by what I can tell from the code snippets you posted, you're processing a single line at a time.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.