0

I am trying to parallelize a nested loop using dask distribute that looks this way:

@dask.delayed
def delayed_a(e):
    a = do_something_with(e)
    return something

@dask.delayed
def delayed_b(element):
    computations = []
    for e in element:
        computations.add(delayed_a(e))

    b = dask.compute(*computations, scheduler='distributed',
                    num_workers=4)
    return b

list = [some thousands of elements here]
computations = []
for element in list:
    computations.append(delayed_b(element))
    results = dask.compute(*computations, scheduler='distributed',
                           num_workers=4)

As you see, I am using the distributed scheduler. First, I create a computations list with a lazy delayed_b function that takes as an argument one element from list. Then, delayed_b creates a new set of computations that are calling a delayed_a function and everything is executed in distributed. This pseudo code is working but I have found out that is faster if delayed_a is not there. Then my question is -- what would be the correct way of doing distributed parallel for loops?

At the end of the history what I am trying to do is:

list = [some thousands of elements here]
for element in list:
    for e in element:
        do_something_with(e)

I would really appreciate any suggestions about the best way of accomplishing doing nested loops with dask.distributed.

1 Answer 1

1

Simple:

something = dask.delayed(do_something_with_e
list = [some thousands of elements here]

# this could be written as a one-line comprehension
computations = []
for element in list:
    part = []
    computations.append(part)
    for e in element:
        part.append(something(e))

results = dask.compute(*computations, scheduler='distributed',
                       num_workers=4)

You should never be calling a delayed function or compute() within a delayed function.

(note that the distributed scheduler would be used by default, so long as you've created a Client)

Sign up to request clarification or add additional context in comments.

6 Comments

Thanks for your reply. I will test it today. You are right, I read yesterday the do's and don'ts when it comes to delayed functions. Is it possible to achieve the same you wrote about but when you use the @delayed decorator?
the call to dask.delayed() does the same thing as the decorator. If you can access the code for do_something_with_e, you can add the decorator. If you cannot, either do what is here, or write a decorated function as you had originally.
I think in your snippet above there is a computations.append() missing after looping over the element list. I found still that just passing the element list to a delayed function that does the for e in element is faster. Is that normal? As you see, I am very new to both using dask and parallelizing. Thanks in advance.
(edited - thanks) Whether or not is is faster depends on the size of the tasks and how well they parallelise. You can loop within the delayed function, but do not call delayed/compute there.
No they don't, they pass the results of calling delayed functions ("delayed objects") as the input to other delayed functions.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.