9

I am trying to run a function in parallel for multiple files and want all of them to terminate before a point.

For Example: There is a loop

def main():
  for item in list:
     function_x(item)

  function_y(list)

Now what I want is that this function_x should run in parallel for all items. But this function should be executed for all items before my function_y is called.

I am planning to use celery for this. but can not understand how to do this.

1
  • 2
    note that list is type name, so you better use different variable name for your list Commented Sep 17, 2014 at 11:38

5 Answers 5

15

Here is my final test code.

All I needed to do is use multiprocessing library.

from multiprocessing import Process
from time import sleep

Pros = []

def function_x(i):
    for j in range(0,5):
        sleep(3)
        print i

def function_y():
    print "done"

def main():
  for i in range(0,3):
     print "Thread Started"
     p = Process(target=function_x, args=(i,))
     Pros.append(p)
     p.start()

  # block until all the threads finish (i.e. block until all function_x calls finish)    
  for t in Pros:
     t.join()

  function_y()
Sign up to request clarification or add additional context in comments.

1 Comment

This is giving me an error below. RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if name == 'main': freeze_support() The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable.
5

you can use threads for this. thread.join is the function you need, this function block until the thread is finished.
you can do this:

import threading
threads = []
def main():
  for item in list:
     t = threading.Thread(target=function_x, args=(item,))
     threads.append(t)
     t.start()

  # block until all the threads finish (i.e. until all function_a functions finish)    
  for t in threads:
     t.join()

  function_y(list)

2 Comments

Thanks Elisha for giving me an approach. But I wanted to run these process in parallel and two threads can not work on same code simultaneously. But I found the solution, I just need to use multiprocessing instead of threading and most of things are same. docs.python.org/2/library/multiprocessing.html Read this for better understanding.
threads runs simultaneously indeed. this is not the difference between threads and processes.
4

You can do this elegantly with Ray, which is a library for writing parallel and distributed Python.

Simply declare the function_x with @ray.remote, and then it can be executed in parallel by invoking it with function_x.remote and the results can be retrieved with ray.get.

import ray
import time

ray.init()

@ray.remote
def function_x(item):
    time.sleep(1)
    return item

def function_y(list):
    pass

list = [1, 2, 3, 4]

# Process the items in parallel.
results = ray.get([function_x.remote(item) for item in list])

function_y(list)

View the Ray documentation.

Comments

1

Here is the documentation for celery groups, which is what I think you want. Use AsyncResult.get() instead of AsyncResult.ready() to block.

Comments

1
#!/bin/env python

import concurrent.futures

def function_x(item):
    return item * item


def function_y(lst):
    return [x * x for x in lst]


a_list = range(10)


if __name__ == '__main__':

    with concurrent.futures.ThreadPoolExecutor(10) as tp:

        future_to_function_x = {
            tp.submit(function_x, item): item
            for item in a_list
        }


    results = {}

    for future in concurrent.futures.as_completed(future_to_function_x):

        item = future_to_function_x[future]

        try:
            res = future.result()
        except Exception as e:
            print('Exception when processing item "%s": %s' % (item, e))
        else:
            results[item] = res


    print('results:', results)

    after = function_y(results.values())

    print('after:', after)

Output:

results: {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
after: [0, 1, 16, 81, 256, 625, 1296, 2401, 4096, 6561]

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.