1

My question may be dumb but I just started learning dask distrubuted. Any help is appreciated.

I have code like below:


    @dask.delayed
    def do_something(date):
        return x, y
    
    get_item0 = dask.delayed(operator.itemgetter(0))
    get_item1 = dask.delayed(operator.itemgetter(1))

    def handle_x(list_x):
        # do something
        print(len(list_x))

    def handle_y(list_y):
        # do something
        print(len(list_y))
    
    def do_tasks():
        list_x, list_y = [], []
        dates = [20210101, 20210102, 20210103, 20210104, 20210105]
        for date in dates:
            result = do_something(date)
            x = get_item0(result)
            y = get_item1(result)
            list_x.append(x)
            list_y.append(y)
        return list_x, list_y
    
    with dask.Distributed.Client(cluster) as dask_client:
        tasks = do_tasks()
        list_x = get_item0(tasks)
        list_y = get_item1(tasks)
    
        # I want to print 5, which is number of dates, but this prints 2
        print(len(tasks))

        # I want to pass list_x and list_y to handle_x and handle_y separately. But the following code computes do_tasks twice. How do I fix that?
        dask_client.compute(dask.lazy(handle_x)(list_x)).result()
        dask_client.compute(dask.lazy(handle_y)(list_y)).result()


  1. How can I print out 5 (number of dates)? print(len(tasks)) seems to print 2 (which is length of list_x, list_y) instead of 5
  2. I want to pass list_x and list_y to handle_x and handle_y separately. But my code computes do_tasks twice.

How do I fix them?

1 Answer 1

2

A minor thing, but you can use nout to delayed if you know the number of returned items:

@dask.delayed(nout=2)
def do_something(date):
    return x, y

# simplifies a bit this function
def do_tasks():
    list_x, list_y = [], []
    dates = [20210101, 20210102, 20210103, 20210104, 20210105]
    for date in dates:
        x, y = do_something(date)
        list_x.append(x)
        list_y.append(y)
    return list_x, list_y

Specifically about your questions:

  1. This should print 5, the previuos code was referring to result which contained 2 lists.

# note, using dask.client is not necessary at this stage
# everything is still delayed
list_x, list_y = do_tasks()

# this will print 5
print(len(list_x))
  1. To compute both objects at the same time, it's best to let dask know about the links in the computation graph by submitting both computations at the same time:
with dask.Distributed.Client(cluster) as dask_client:

    # note that for simplicity handle_x/y are used without passing
    # them through decoration, but they can be delayed functions
    result = dask_client.compute([handle_x(list_x), handle_y(list_y)])
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.