0

I am trying to parallelize operations on a big array. I summarized my approach in the code snippet below. Since the operations on the big array are costly, of the 100 processes, I want to parallelize 4 (i.e. n_cpus) at each iteration. After an iteration is finished, some garbage collection will be done and the next iteration should start. The main loop does the first iteration and terminates. I will be glad if some parallel processing expert can point out how I can correct my code to achieve the desired task.

from multiprocessing import Process

def train_model(model, big_array, i):
    model = do_operations_on(big_array)

# edit: this part is within a class
n_processes = 100
n_cpus = 4
models = [None for _ in range(n_processes)]
n_iterations = n_processes / n_cpus
for it in range(n_iterations):
    procs = [Process(target=train_model, \
        args=(models[it*n_cpus+i], big_array, i)) for i in range(n_cpus)]

    for p in procs: p.start()
    for p in procs: p.join()
2
  • I'm not sure what's wrong with your code. It is the most simple way to use multiple processes and it is correct. However, communication between processes should be done with queues instead of writing to global variables, like 'models' in your case. Do you have different operations, e. g. different train models, that should be applied to your array? Commented May 24, 2015 at 7:00
  • You are right. I checked it out and the above snippet works. However, in my original application, train_model is outside whereas rest of the code is within a class. Then, the main loop does the first iteration and terminates. I will update if I have made a silly mistake or there is something else. Commented May 24, 2015 at 7:21

1 Answer 1

1

Your idea seems basically OK, except for a few problems:

  • As RaJa pointed out, you should probably pass stuff using queues instead of using shared state

  • I think your use of multiprocessing.Process is unnecessarily low level here; you should be using multiprocessing.Pool, which would also be more efficient, as you can just reuse the processes (instead of keeping up setting them up and tearing them down).

  • Your code has some mixup as your train_model ignores model and i, and just overwrites model.

So, in the following code, I assume you have something like

def train_model(specs, big_array):
    return ...

that takes some spec specifics and data, and returns a model built for these specifics.

I also assume in the following you have some array specifics containing all the specifics you want to try (and also that it is divisible in the number of cpus, which is not that difficult to get rid of).

Finally, I assume the point is to build a list models of all the models.

Your code becomes:

from multiprocessing import Pool

n_cpus = 4
n_iterations = len(specifics) / n_cpus
models = []
p = multiprocessing.Pool(n_cpus)
for it in range(n_iterations):
    cur_specs = specifics[it * n_cpus: (it + 1) * n_cpu]
    cur_models = p.map(lambda specs: train_model(specs, big_array), cur_specs)
    models.extend(cur_models)
    # Cleanup here
Sign up to request clarification or add additional context in comments.

6 Comments

Could you explain what specifics[it * n_cpus: (it + 1) * n_cpu] means or does?
@martineau Edited above so that the sub-expressions have more descriptive names. Does this address your comment? Also, in the course of doing so, found a typo (corrected). Thanks.
Well, not so much...I had hoped you would explain that you were taking a slice of the values in specifics array for each iteration, which might compensate for the somewhat vague description of its contents given earlier in the answer.
@AmiTavory Many thanks for the detailed answer and suggestions. With queues, I keep getting "can't pickle" or "object is not callable" type of errors. I will invest some more time to see if I can get everything to work though.
The least troublesome solution when working with classes seems to be using joblib (still based on multiprocessing Pool). Here is a one-liner that works (a bit slow though). for it in range(n_iters): models.extend( Parallel(n_jobs=n_cpus)(delayed(train_model)(specs, big_array) for i in range(n_cpus)) )
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.