5

I found what appears to be useful post here:

http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/

And I've tried this piece of code, which uses multiprocessing, but it doesn't work for me. The only change I made to the original is variable out_q=queue.Queue instead of out_q = Queue.

I believe this code was written in python 2.x and I'm using python 3.4.2

I've imported all the necessities of course.

def mp_factorizer(nums, nprocs):
    def worker(nums, out_q):
        """ The worker function, invoked in a process. 'nums' is a
            list of numbers to factor. The results are placed in
            a dictionary that's pushed to a queue.
        """
        outdict = {}
        for n in nums:
            outdict[n] = factorize_naive(n)
        out_q.put(outdict)

    # Each process will get 'chunksize' nums and a queue to put his out
    # dict into
    out_q = queue.Queue()
    chunksize = int(math.ceil(len(nums) / float(nprocs)))
    procs = []

    for i in range(nprocs):
        p = multiprocessing.Process(
                target=worker,
                args=(nums[chunksize * i:chunksize * (i + 1)],
                      out_q))
        procs.append(p)
        p.start()

    # Collect all results into a single result dict. We know how many dicts
    # with results to expect.
    resultdict = {}
    for i in range(nprocs):
        resultdict.update(out_q.get())

    # Wait for all worker processes to finish
    for p in procs:
        p.join()

    return resultdict

The error I get is

File "E:\....\file.py", line 109, in <module>
    print (mp_factorizer(range(100000),1))
File "E:\....\file.py", line 88, in mp_factorizer
    p.start()
File "E:\...\Python\Install\lib\multiprocessing\process.py", line 105, in start
    self._popen = self.+Popen(self)
File "E:\...\Python\Install\lib\multiprocessing\context.py", line 212, in _Popen
    return default_context.get_context().Process._Popen(process_obj)
File "E:\...\Python\Install\lib\multiprocessing\context.py", line 313, in _Popen
    return Popen(process_obj)
File "E:\...\Python\Install\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
    reduction.dump(process_obj, to_child)
File "E:\...\Python\Install\lib\multiprocessing\reduction.py", line 59, in dump
    ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
7
  • I suggest you using Pool instead of an iteration for every Process. Then combined it with pool.map to execute the function. docs.python.org/3.5/library/… docs.python.org/3.5/library/… Commented May 5, 2016 at 21:00
  • 1
    I'm not sure how that example code is working. Generally only functions that are defined at the top-level of a module can be executed in a worker process... Commented May 5, 2016 at 21:00
  • @BiRico I believe that's correct. Commented May 5, 2016 at 21:07
  • You don't need a Manager.Queue, and using one would be needlessly inefficient - do out_q = multiprocessing.Queue() instead. A queue.Queue definitely can't work in this context. Commented May 5, 2016 at 21:11
  • neither multiprocessing.Manager().Queue() nor multiprocessing.Queue() have worked. I have used Pool before, but then I have to use if __name__ == "__main__": with which I have 2 problems. First I don't understand it, and second I haven't found how can it be used inside the function which can be reused Commented May 5, 2016 at 21:25

1 Answer 1

7

Here's a complete program, run under Python 3.5.1 (but also works fine under Python 2). This is so you can see all that needs to be done. It doesn't actually factorize anything - the guts of the worker were replaced by very simple code, because the computations done at the lowest level are irrelevant to any of the problems you're having:

import multiprocessing as mp
import math

def worker(nums, out_q):
    outdict = {}
    for n in nums:
        outdict[n] = 2 * n
    out_q.put(outdict)

def mp_factorizer(nums, nprocs):
    out_q = mp.Queue()
    chunksize = int(math.ceil(len(nums) / float(nprocs)))
    procs = []

    for i in range(nprocs):
        p = mp.Process(
                target=worker,
                args=(nums[chunksize * i:chunksize * (i + 1)],
                      out_q))
        procs.append(p)
        p.start()

    resultdict = {}
    for i in range(nprocs):
        resultdict.update(out_q.get())

    for p in procs:
        p.join()

    return resultdict

if __name__ == "__main__":
    print(mp_factorizer(range(100), 3))
Sign up to request clarification or add additional context in comments.

3 Comments

I've read that I should avoid comments like 'thanks' but thanks anyway :P But I have a question. Without using multiprocessing program run for about 2.7s, with multiprocessing it run for 1.7s. I've came across examples where improvments were far better. Should I expect more or is it usual?
Sorry, but there's no general answer to that - depends on all the details of the specific application you're running. Note that for integer factorization via brute force division, the workload distribution here is naive: for a prime p, that requires O(sqrt(p)) divisions, so workers stuck with the bigger integers can be expected to take longer than workers blessed with smaller integers. The expected work per element depends on the elements. In any case, with very small integers, the overheads of interprocess communication swamp the useful work required.
BTW, at extremes: I've had applications that were slower overall with multiprocessing. And I've had some that ran N times faster with N cores, with no practical limit on how big N got. The latter were "embarrassingly parallel": long-running tasks with no need for interprocess synchronization or communication except, perhaps, at the start and/or end.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.