0

The following code errors out once in a while. It works fine if I start just one process. But I keep increasing the number of processes to, may be 11 and it starts throwing an error.

try:
    num_workers = int(sys.argv[1])
except:
    num_workers = 1

someval = 10
def do_work(in_queue,x):
    i = 0
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal
        if line == None:
            if i > 0 :
                work.put(i,)
            # work.put(i)
            return
        else:
            print "value from work " + line.rstrip('\n')
            i = i + 1

if __name__ == "__main__":

    manager = Manager()
    work = manager.Queue(num_workers)
    someval = 20
    print " Number of workers is " + str(num_workers)
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work,someval))
        p.start()
        pool.append(p)
    with open("/home/jay/scripts/a.txt") as f:
         iters = itertools.chain(f, (None,)*num_workers)
         for num_and_line in enumerate(iters):
                work.put(num_and_line)

    x = 0
    for p in pool:
        p.join()

The file /home/jay/scripts/a.txt has 10 lines.

If I do

 ./x.py 7
     Number of workers is 7
    value from work 1
    value from work 2
    value from work 3
    value from work 4
    value from work 5
    value from work 6
    value from work 7
    value from work 8
    value from work 9
    value from work 10
     x is 0
     all done 

./x.py 11
 Number of workers is 11
value from work 1
value from work 2
value from work 3
value from work 4
value from work 5
value from work 6
value from work 7
value from work 8
value from work 9
value from work 10
Process Process-11:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "./x.py", line 18, in do_work
    line_no, line = item
TypeError: 'int' object is not iterable
 x is 0
 all done 
1
  • Add print(repr(item)) before line 18 so you can know what the value is Commented May 17, 2016 at 17:23

2 Answers 2

2

The offending line is work.put(i,) in do_work You put int into the queue and that int gets read and unpacked by another worker.

Also I agree with dano that using multiprocessing.Pool is easier and shorter.

if __name__ == "__main__":
    pool = multiprocessing.Pool(num_workers)
    with open("/home/jay/scripts/a.txt") as f:
        mapped = pool.map(do_work, f)

If you need that i from the worker just return it and it will be stored in mapped

Sign up to request clarification or add additional context in comments.

1 Comment

My real file is huge - more than 100 GB. As per another thread, "the map is going to consume your file all at once before dishing out work. " So I decided to take this approach. stackoverflow.com/questions/11196367/…
2

The problem is that work.put(1,) doesn't do what you think it does. You're intending to put the 1-tuple (1,) into the queue, but you're actually just putting 1 into the queue. If you change that line to work.put((1,)) you'll see the behavior you expect.

There's a race condition with large values of num_workers that allows one of your sub-process to add the 1 to the queue before your for loop in the main process finishes loading the Queue up with the (None,) sentinel values. With smaller values of num_workers, you get through the for loop before any of the worker processes add the 1 to the queue.

Also, have you considered using multiprocessing.Pool, rather manually creating a Pool using Process and Queue? It would simplify your code quite a bit.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.