1

I have a function called run_3_processes, which spawns 3 processes (duh) using multiprocessing.pool.apply, waits for their results and processes these results and returns a single result.

I have another function called run_3_processes_3_times, which should run run_3_processes 3 times in parallel, wait for all of them to return and then process all their results.

things I tried:

  1. using a process pool for run_3_processes_3_times - turns out this is complicated because of Python Process Pool non-daemonic?
  2. rewriting the entire applicative code to spawn 9 processes with the same pool - this really convoluted my code and breaks encapsulation
  3. using a threadpool.apply for run_3_processes_3_times - for some reason this makes it runs serially, not in parallel - is it because the apply in run_3_processes blocks the GIL?

I'm sure there's a one-liner solution I'm missing... thanks!

2
  • I'm confused: are you using both processes and threads at the same time? Are the threads spawning new processes, or are the processes first forking and then spawning threads? If run_3_processes starts 3 new processes using fork (or some other call that eventually calls fork) then those processes can truly run in parallel with no GIL problems. But I'm not sure what run_3_processes_3_times is actually doing. Can you post the code? Commented Mar 22, 2020 at 14:58
  • @Z4-tier - run_3_processes_3_times just needs to call run_3_processes and wait. That's why I thought lightweight threads are fitting here. run_3_processes actually spawns heavy-lifting processes (with multiprocessing.pool.apply, but I guess you're right and it calls fork down the road). But for some reason it won't give up the GIL, unless I use apply_async like below Commented Mar 22, 2020 at 15:03

2 Answers 2

1

ok, found a hacky answer, would love to hear if there's something better:


def run_3_processes_3_times():
        pool = ThreadPool(3)
        candidates = [pool.apply_async(run_3_processes,
                                 args=(c)) for c in range(3)]
        candidates = [x.get() for x in candidates]
        pool.close()

def run_3_processes(c):
        pool = mp.Pool(3)
        solutions = [pool.apply_async(do_it,
                                      args=(i) for i in range(3)]
        solutions = [x.get() for x in solutions]
        pool.close()

Sign up to request clarification or add additional context in comments.

Comments

1

Since you're using a combination of true threads and sub-processes, you will "sort of" run into the GIL, but the way it's structured makes it seem unlikely that it will be a problem. The ThreadPool will be subject to context switches to give concurrency between the threads, but since it's only purpose is to spawn the child processes, it's not doing anything CPU intensive. I'm not sure why it's even necessary to use multiple threads; I would probably just have a single threaded parent process spawn and wait on the child processes directly.

In both functions, it's probably more idiomatic to use the map() method instead of apply_async(), although both will work. Usually that would look a bit like this:

process_count = 3

def pre_process(input_data):
    input_subsets = [[]] * process_count
    for idx, data_point in enumerate(input_data):
        <do any input validation on data_point>
        input_subsets[idx % process_count].append(data_point)
    return input_subsets

def process_data(input_data):
    return_val = []
    for val in input_data:
        <do some processing work>
        return_val.append(<result of processing>) 
    return return_val

data_subsets = pre_process(raw_data)
pool = mp.Pool(process_count)
result_list = pool.map(process_data, data_subsets)
<check result_list>

3 Comments

thanks! but that's a code rewrite that's breaking encapsulation. data_subsets would now have 9 elements, and the post processing check result_list would have to discern which element belongs in which of the 3 contexts...
(1) data_subsets will have 3 elements, not 9. (2) Why would check_result_list need to know which worker process produced a particular member of the result set? If anything is breaking encapsulation, it's that. If that is the case, you could replace the map() call with map_async() and use the optional parameter callback=[callable] to implement check_result.
Maybe I am missing something, but I don't understand why multiple threads are needed to start the child worker processes. That can easily be done single threaded, and multi-threading will run into the GIL anyway, so not really much benefit that I can see. Multi-threading might actually make it slower, owing to the cost of the context switching between the threads on a single CPU.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.