10

I have been trying to use multiprocessing module from python to achieve parallism on a task that is computationally expensive.

I'm able to execute my code, however it doesn't run in parallel. I have been reading multiprocessing's manual page and foruns to find out why it isn't working and i haven't figured it out yet.

I think that the problem may be related with some kinda of lock on executing other modules that i created and imported.

Here is my code:

main.py:

##import my modules
import prepare_data
import filter_part
import wrapper_part
import utils
from myClasses import ML_set
from myClasses import data_instance

n_proc = 5

def main():
    if __name__ == '__main__':
        ##only main process should run this
        data = prepare_data.import_data() ##read data from file  
        data = prepare_data.remove_and_correct_outliers(data)
        data = prepare_data.normalize_data_range(data)
        features = filter_part.filter_features(data)

        start_t = time.time()
        ##parallelism will be used on this part
        best_subset = wrapper_part.wrapper(n_proc, data, features)

        print time.time() - start_t


main()

wrapper_part.py:

##my modules
from myClasses import ML_set
from myClasses import data_instance
import utils

def wrapper(n_proc, data, features):

    p_work_list = utils.divide_features(n_proc-1, features)
    n_train, n_test = utils.divide_data(data)

    workers = []

    for i in range(0,n_proc-1):
        print "sending process:", i
        p = mp.Process(target=worker_classification, args=(i, p_work_list[i], data, features, n_train, n_test))
        workers.append(p)
        p.start()

    for worker in workers:
        print "waiting for join from worker"
        worker.join()


    return


def worker_classification(id, work_list, data, features, n_train, n_test):
    print "Worker ", id, " starting..."
    best_acc = 0
    best_subset = []
    while (work_list != []):
        test_subset = work_list[0]
        del(work_list[0])
        train_set, test_set = utils.cut_dataset(n_train, n_test, data, test_subset)
        _, acc = classification_decision_tree(train_set, test_set)
        if acc > best_acc:
            best_acc = acc
            best_subset = test_subset
    print id, " found best subset ->  ", best_subset, " with accuracy: ", best_acc

All the other modules dont use the multiprocessing module and work fine. At this stage i'm just testing paralelism, not even trying to get the results back, thus there isn't any communication between processes nor shared memory variables. Some variables are used by every process, however they are defined before spawning the processes so as far as my knowledge goes, i believe each process has its own copy of the variable.

As output for 5 processes i get this:

importing data from file...
sending process: 0
sending process: 1
Worker  0  starting...
0  found best subset ->   [2313]  with accuracy:  60.41
sending process: 2
Worker  1  starting...
1  found best subset ->   [3055]  with accuracy:  60.75
sending process: 3
Worker  2  starting...
2  found best subset ->   [3977]  with accuracy:  62.8
waiting for join from worker
waiting for join from worker
waiting for join from worker
waiting for join from worker
Worker  3  starting...
3  found best subset ->   [5770]  with accuracy:  60.07
55.4430000782

It took around 55 seconds for 4 processes to execute the parallel part. Testing this with only 1 process the execution time is 16 seconds:

importing data from file...
sending process: 0
waiting for join from worker
Worker  0  starting...
0  found best subset ->   [5870]  with accuracy:  63.32
16.4409999847

Im running this on python 2.7 and windows 8

EDIT

I tested my code on ubuntu and it worked, i guess its something wrong with windows 8 and python. Here is the output on ubuntu:

importing data from file...
size trainset:  792  size testset:  302
sending process: 0
sending process: 1
Worker  0  starting...
sending process: 2
Worker  1  starting...
sending process: 3
Worker  2  starting...
waiting for join from worker
Worker  3  starting...
2  found best subset ->   [5199]  with accuracy:  60.93
1  found best subset ->   [3198]  with accuracy:  60.93
0  found best subset ->   [1657]  with accuracy:  61.26
waiting for join from worker
waiting for join from worker
waiting for join from worker
3  found best subset ->   [5985]  with accuracy:  62.25
6.1428809166

I'll start using ubuntu to test from now on, however i would like to know why the code doesn't work on windows.

2
  • 1
    When you spawn each process with mp.Process, how large is the data passed as args? Not sure if that matters. Also, does each worker have to partition the data in some way (utils.cut_dataset)? If so, is that the computationally expensive part of the job? And if so, maybe you could split before delegating the work to them (that would be a more common pattern, in my experience). Finally, del(work_list[0]) caught my eye. Is that list large? If so, removing stuff from the front end might be expensive. An alternative approach is to reverse it first and then pop off the end. Commented Feb 26, 2015 at 14:01
  • Data can be pictured as a quite large matrix, something like 1100 * N floats. Where N is a number around 300~400. Every process needs all data. On this example and just to test paralelism, i just perform the first steps of the problem. However, when i get everything working the idea is that every usefull set that each worker find inside the while (work_list != []) is going to be combined with all other possibilities. And this is the computationally expensive part. Thanks for the tip on the way im deleting the job from the work_list. Commented Feb 26, 2015 at 15:42

1 Answer 1

3

Make sure to read the Windows guidelines in the multiprocessing manual: https://docs.python.org/2/library/multiprocessing.html#windows

Especially "Safe importing of main module":

Instead one should protect the “entry point” of the program by using if __name__ == '__main__': as follows:

You violated this rule within the first code snippet shown above, so I did not look further than this. Hopefully the solution to the problems you observe is as simple as including this protection.

The reason why this is important: on Unix-like systems, child processes are created by forking. In this case, the operating system creates an exact copy of the process that creates the fork. That is, all state is inherited from the parent by the child. For instance, this means that all functions and classes are defined.

On Windows, there is no such system call. Python needs to perform the quite heavy task of creating a fresh Python interpreter session in the child, and re-create (step by step) the state of the parent. For instance, all functions and classes need to be defined again. That is why heavy import machinery is going on under the hood of a Python multiprocessing child on Windows. This machinery starts when the child imports the main module. In your case, this implicates a call to main() in the child! For sure, you do not want that.

You might find this tedious. I find impressive that the multiprocessing module manages to provide an interface for same functionality for two so very different platforms. Really, with respect to process handling, POSIX-compliant operating systems and Windows are so different, that it is inherently difficult to come up with an abstraction that works on both.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for the reply. I'm using if name == 'main': it is defined in the main part. I tried to use it just before the spawn of the processes, inside the wrapper function. However if the protection is there, then all processes start to execute the functions called in the main.py. I was aware that forking doesn't exist on windows, however i thought that it wouldn't need to recreate all the steps. Since it is so expensive to create other processes on windows i'll use unix systems from now on.
I did not quite get it: did protection of the call to main() solve your issue on Windows? You should note that while process creation may indeed be a complex thing it is of course worth it in many situations. The work performed by the process must be much more complex than its creation was. Then it pays off. On the other hand, you do not want to outsource a task to another process if the task is trivial.
Calling the protection on main() didn't solve the problem. My code still doesnt run in parallel on windows. However, using the protection there made possible to the code on main() be executed just by one process. The main part of my code takes like 30 sec to execute cause the amounts of data are quite large. If in windows, every created process needs to recreate those 30 sec to be able to execute, then it will take me extra time than using the unix fork.
If your child processes need 30 seconds before the worker function starts doing its work, then you have badly designed your architecture. Either there is much too much data transferred from parent to child via pipes, or you still have too much work going on triggered by pure imports. Imports can be expensive, yes. But we are talking milliseconds here, in common cases. The most extreme I have observed is importing a scipy/numpy/matplotlib stack through NFS, which involves hundreds or thousands of file reads. Even on a slow network this is done in below 10 seconds.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.