I have been trying to use multiprocessing module from python to achieve parallism on a task that is computationally expensive.
I'm able to execute my code, however it doesn't run in parallel. I have been reading multiprocessing's manual page and foruns to find out why it isn't working and i haven't figured it out yet.
I think that the problem may be related with some kinda of lock on executing other modules that i created and imported.
Here is my code:
main.py:
##import my modules
import prepare_data
import filter_part
import wrapper_part
import utils
from myClasses import ML_set
from myClasses import data_instance
n_proc = 5
def main():
if __name__ == '__main__':
##only main process should run this
data = prepare_data.import_data() ##read data from file
data = prepare_data.remove_and_correct_outliers(data)
data = prepare_data.normalize_data_range(data)
features = filter_part.filter_features(data)
start_t = time.time()
##parallelism will be used on this part
best_subset = wrapper_part.wrapper(n_proc, data, features)
print time.time() - start_t
main()
wrapper_part.py:
##my modules
from myClasses import ML_set
from myClasses import data_instance
import utils
def wrapper(n_proc, data, features):
p_work_list = utils.divide_features(n_proc-1, features)
n_train, n_test = utils.divide_data(data)
workers = []
for i in range(0,n_proc-1):
print "sending process:", i
p = mp.Process(target=worker_classification, args=(i, p_work_list[i], data, features, n_train, n_test))
workers.append(p)
p.start()
for worker in workers:
print "waiting for join from worker"
worker.join()
return
def worker_classification(id, work_list, data, features, n_train, n_test):
print "Worker ", id, " starting..."
best_acc = 0
best_subset = []
while (work_list != []):
test_subset = work_list[0]
del(work_list[0])
train_set, test_set = utils.cut_dataset(n_train, n_test, data, test_subset)
_, acc = classification_decision_tree(train_set, test_set)
if acc > best_acc:
best_acc = acc
best_subset = test_subset
print id, " found best subset -> ", best_subset, " with accuracy: ", best_acc
All the other modules dont use the multiprocessing module and work fine. At this stage i'm just testing paralelism, not even trying to get the results back, thus there isn't any communication between processes nor shared memory variables. Some variables are used by every process, however they are defined before spawning the processes so as far as my knowledge goes, i believe each process has its own copy of the variable.
As output for 5 processes i get this:
importing data from file...
sending process: 0
sending process: 1
Worker 0 starting...
0 found best subset -> [2313] with accuracy: 60.41
sending process: 2
Worker 1 starting...
1 found best subset -> [3055] with accuracy: 60.75
sending process: 3
Worker 2 starting...
2 found best subset -> [3977] with accuracy: 62.8
waiting for join from worker
waiting for join from worker
waiting for join from worker
waiting for join from worker
Worker 3 starting...
3 found best subset -> [5770] with accuracy: 60.07
55.4430000782
It took around 55 seconds for 4 processes to execute the parallel part. Testing this with only 1 process the execution time is 16 seconds:
importing data from file...
sending process: 0
waiting for join from worker
Worker 0 starting...
0 found best subset -> [5870] with accuracy: 63.32
16.4409999847
Im running this on python 2.7 and windows 8
EDIT
I tested my code on ubuntu and it worked, i guess its something wrong with windows 8 and python. Here is the output on ubuntu:
importing data from file...
size trainset: 792 size testset: 302
sending process: 0
sending process: 1
Worker 0 starting...
sending process: 2
Worker 1 starting...
sending process: 3
Worker 2 starting...
waiting for join from worker
Worker 3 starting...
2 found best subset -> [5199] with accuracy: 60.93
1 found best subset -> [3198] with accuracy: 60.93
0 found best subset -> [1657] with accuracy: 61.26
waiting for join from worker
waiting for join from worker
waiting for join from worker
3 found best subset -> [5985] with accuracy: 62.25
6.1428809166
I'll start using ubuntu to test from now on, however i would like to know why the code doesn't work on windows.
mp.Process, how large is the data passed asargs? Not sure if that matters. Also, does each worker have to partition the data in some way (utils.cut_dataset)? If so, is that the computationally expensive part of the job? And if so, maybe you could split before delegating the work to them (that would be a more common pattern, in my experience). Finally,del(work_list[0])caught my eye. Is that list large? If so, removing stuff from the front end might be expensive. An alternative approach is to reverse it first and then pop off the end.