0

I have a lot of tasks (independent of each other, represented by some code in Python) that need to be executed. Their execution time varies. I also have limited resources so at most N tasks can be running at the same time. The goal is to finish executing the whole stack of tasks as fast as possible.

It seems that I am looking for some kind of manager that starts new tasks when the resource gets available and collects finished tasks.

  • Are there any already-made solutions or should I code it myself?
  • Are there any caveats that I should keep in mind?
2

2 Answers 2

2

as far as I can tell your main would just become:

def main():
    tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    with multiprocessing.Pool(POOL_SIZE) as pool:
        pool.map(sleep, tasks)

i.e. you've just reimplemented a pool, but inefficiently (Pool reuses Processes where possible) and in not as safely, Pool goes to lots of effort to cleanup on exceptions

Sign up to request clarification or add additional context in comments.

2 Comments

Well that's embarrassing. Parallel programming is a big mental step - your code literally cannot be more simple and I still have a hard time imagining what's actually happening behind those one or two lines. Still, it helped me a lot, thank you.
@Jeyekomon the joy of open source code is it's all there if you want to find out :)
0

Here is a simple code snippet that should fit the requirements:

import multiprocessing
import time

POOL_SIZE = 4
STEP = 1


def sleep(seconds: int):
    time.sleep(seconds)


def main():
    tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    pool = [None] * POOL_SIZE

    while tasks or [item for item in pool if item is not None]:
        for i in range(len(pool)):
            if pool[i] is not None and not pool[i].is_alive():
                # Finished task. Clear the resource.
                pool[i] = None

            if pool[i] is None:
                # Free resource. Start new task if any are left.
                if tasks:
                    task = tasks.pop(0)
                    pool[i] = multiprocessing.Process(target=sleep, args=(task,))
                    pool[i].start()

        time.sleep(STEP)


if __name__ == '__main__':
    main()

The manager has a tasks list of arbitrary length, here are tasks for simplicity represented by integers that are being placed as arguments to a sleep function. It also has a pool list, initially empty, representing the available resource.

The manager periodically visits all currently running processes and checks if they are finished or not. It also starts new processes if the resource becomes available. The whole cycle is being repeated until there are no tasks and no currently running processes left. The STEP value is here to save the computing power - you generally don't need to check the running processes every millisecond.

As for the caveats, there are some guidelines that should be kept in mind when using multiprocessing.

2 Comments

why aren't you using the suggested Pool().map API?
@SamMason I just started learning the multiprocessing module and all those Pipe, Queue, Event, Barrier, Semaphore, Manager, ... classes are yet a bit too much to comprehend for me. I managed to implement the functionality from scratch but I was still interested if (and how) can this be implemented using the multiprocessing classes. That's why I posted this question.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.