1

I am having problems with code in the following format and assume that the error is to do with how I am trying to access the elements in each tuple.

from numberer import numberify
from sys import argv
infile=argv[1]
from multiprocessing import Pool
pool=Pool(15)
import os

def chunker(fob):
    chunkbegin=0
    filesize=os.stat(fob.name).st_size
    while chunkbegin < filesize:
        chunkend=chunkbegin+100000
        fob.seek(chunkend)
        fob.readline()
        chunkend=fob.tell()
        yield (chunkbegin,chunkend)
        chunkbegin=chunkend

def run(tup, fob):
    fob.seek(tup[0])
    length=int(tup[1])-int(tup[0])
    lines=fob.readlines(length)
    for line in lines:
        print(line)

fob=open(infile)
chunks=[x for x in chunker(fob)]
pool.map(run, (chunks, fob))

The exact error is:

Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'run' on <module '__main__' from 'pretonumber.py'>

1) So when map function maps the tuples to function; I assume that these elements should be called as if they are ordinary tuples? IE with one index?

2) The element chunks that I am passing to the function run: is a list of tuples: chunks=[(0,100000),(100000,200000)....] as created by the generator chunker.

Thank you.

1 Answer 1

2

The map method takes an iterable of argument. Each element of the iterable is passed to one instance of run. Since your iterable is the tuple (chunks, fob), this is going to run two tasks, calling run(chunks) in one task, and run(fob) in another.


What I think you want to do is to run one task for each chunk in chunks, calling run(chunk, fob).

So, first, you need an iterable that yields (chunk, fob) once per chunk, e.g., ((chunk, fob) for chunk in chunks).


But this still isn't going to work, because it's going to call run with a single argument, the 2-tuple (chunk, fob), not with two arguments. You can fix this by rewriting or wrapping run to take a single 2-tuple instead of two separate arguments, or you can just use starmap instead of map, which does that wrapping for you.


But this still isn't going to work. You're trying to pass an open file object between processes, and multiprocessing can't do that.

Since you're using the fork method, you can sometimes get away with inheriting the file object from the parent rather than passing it, but the details are complicated, and you really need to read the Programming guidelines for multiprocessing and understand how file descriptor inheritance works on Unix.

Since you want each child to have its own independent copy of the file object so they can all seek around in it, the easiest solution is to just pass the filename and have them open it themselves:

def run(tup, path):
    with open(path) as fob:
        fob.seek(tup[0])
        length=int(tup[1])-int(tup[0])
        lines=fob.readlines(length)
        for line in lines:
            print(line)

fob = open(infile)
chunks = [x for x in chunker(fob)]
args = ((chunk, infile) for chunk in chunks)
pool.starmap(run, args)

Meanwhile, now that we're sure we're not relying on fork behavior, it's probably a good idea to write the code to work with any start method. This means putting the top-level code into a __main__ block. And, while we're at it, let's make sure we close the file once we're done with it:

# imports
# function definitions
if __name__ == '__main__':
    infile = argv[1]
    pool = Pool(15)
    with open(infile) as fob:
        chunks = [x for x in chunker(fob)]
    args = ((chunk, infile) for chunk in chunks)
    pool.starmap(run, args)

You may still have other errors in your code, but I think this exhausts the multiprocessing ones.

Sign up to request clarification or add additional context in comments.

4 Comments

wow im just going through this. but you sir/madam are a genius and at the speed of light too...
perfect spot on. One other query I had was about how to return results to a globally shared list contributed to be each process.. e.g. is it best to simply declare globals and then append in each run.. or can mp not handle that?
@LewisMacLachlan The ideal solution to that, when possible, is to just have the child tasks return the results, or pass them over a queue, and have the parent build up the list. If not, if you can use something lower-level than a list of arbitrary Python objects, like an array of ints or a complex numpy array or a ctypes structure, use the shared memory and a lock. If not, you usually use a Manager object, which is inefficient, but at least it’s dead easy to write (unless you run into the uncommon but not too rare subtle synchronization issues).
@LewisMacLachlan If you read through the multiprocessing module docs (I know it’s huge, but the way it’s organized you pretty much have to read at least the first half straight through, then search the second half for relevant reference stuff, at least the first time…), it covers all of the options and the tradeoffs between them.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.