I found what appears to be useful post here:
http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
And I've tried this piece of code, which uses multiprocessing, but it doesn't work for me. The only change I made to the original is variable out_q=queue.Queue instead of out_q = Queue.
I believe this code was written in python 2.x and I'm using python 3.4.2
I've imported all the necessities of course.
def mp_factorizer(nums, nprocs):
def worker(nums, out_q):
""" The worker function, invoked in a process. 'nums' is a
list of numbers to factor. The results are placed in
a dictionary that's pushed to a queue.
"""
outdict = {}
for n in nums:
outdict[n] = factorize_naive(n)
out_q.put(outdict)
# Each process will get 'chunksize' nums and a queue to put his out
# dict into
out_q = queue.Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []
for i in range(nprocs):
p = multiprocessing.Process(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
out_q))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict = {}
for i in range(nprocs):
resultdict.update(out_q.get())
# Wait for all worker processes to finish
for p in procs:
p.join()
return resultdict
The error I get is
File "E:\....\file.py", line 109, in <module>
print (mp_factorizer(range(100000),1))
File "E:\....\file.py", line 88, in mp_factorizer
p.start()
File "E:\...\Python\Install\lib\multiprocessing\process.py", line 105, in start
self._popen = self.+Popen(self)
File "E:\...\Python\Install\lib\multiprocessing\context.py", line 212, in _Popen
return default_context.get_context().Process._Popen(process_obj)
File "E:\...\Python\Install\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "E:\...\Python\Install\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "E:\...\Python\Install\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
Manager.Queue, and using one would be needlessly inefficient - doout_q = multiprocessing.Queue()instead. Aqueue.Queuedefinitely can't work in this context.multiprocessing.Manager().Queue()normultiprocessing.Queue()have worked. I have usedPoolbefore, but then I have to useif __name__ == "__main__":with which I have 2 problems. First I don't understand it, and second I haven't found how can it be used inside the function which can be reused