1

I'm not able to process this block using the distributed cluster.

import pandas as pd
from dask import dataframe as dd 
import dask

df = pd.DataFrame({'reid_encod': [[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10]]})
dask_df = dd.from_pandas(df, npartitions=3)
save_val = []
def add(dask_df):
    for _, outer_row in dask_df.iterrows():
        for _, inner_row in dask_df.iterrows():
            for base_encod in outer_row['reid_encod']:
               for compare_encod in inner_row['reid_encod']:
                   val = base_encod + compare_encod
                   save_val.append(val)
    return save_val

from dask.distributed import Client

client = Client(...)
dask_compute = dask.delayed(add)(dask_df)
dask_compute.compute()

Also I have few queries

  1. Does dask.delayed use the available clusters to do the computation.

  2. Can I paralleize the for loop iteratition of this pandas DF using delayed, and use multiple computers present in the cluster to do computations.

  3. does dask.distributed work on pandas dataframe.

  4. can we use dask.delayed in dask.distributed.

  5. If the above programming approach is wrong, can you guide me whether to choose delayed or dask DF for the above scenario.

4
  • I recommend providing an MCVE. See also matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports Commented Nov 30, 2019 at 15:58
  • @MRocklin Added MVCE. Can you please guide me to run this code in distributed cluster Commented Dec 2, 2019 at 8:15
  • @Naren What is the problem, exactly? Commented Dec 3, 2019 at 10:29
  • @gparis I just want to know, how to paralleize the for loop in above code assuming there are multiple records is the dataframe. Commented Dec 4, 2019 at 6:58

2 Answers 2

3
+100

For the record, some answers, although I wish to note my earlier general points about this question

Does dask.delayed use the available clusters to do the computation.

If you have created a client to a distributed cluster, dask will use it for computation unless you specify otherwise.

Can I paralleize the for loop iteratition of this pandas DF using delayed, and use multiple computers present in the cluster to do computations.

Yes, you can in general use delayed with pandas dataframes for parallelism if you wish. However, your dataframe only has one row, so it is not obvious in this case how - it depends on what you really want to achieve.

does dask.distributed work on pandas dataframe.

Yes, you can do anything that python can do with distributed, since it is just python processes executing code. Whether it brings you the performance you are after is a separate question

can we use dask.delayed in dask.distributed.

Yes, distributed can execute anything that dask in general can, including delayed functions/objects

If the above programming approach is wrong, can you guide me whether to choose delayed or dask DF for the above scenario.

Not easily, it is not clear to me that this is a dataframe operation at all. It seems more like an array - but, again, I note that your function does not actually return anything useful at all.

In the tutorial: passing pandas dataframes to delayed ; same with dataframe API.

Sign up to request clarification or add additional context in comments.

3 Comments

I have edited the program, as you said I'm not able to process the above code snippet in multiple clusters. Can you suggest what might the problem stopping the code to run in distributed manner
or should i convert the list in the dataframe to dask array to perform multiprocessing
You are passing a dask dataframe to a delayed function. Don't! Either use a higher-level collection API (array, dataframe) or use non-dask objects. Without knowing what you want to do, we cannot advise you. Also, what multiple clusters? Dask uses a single cluster.
3
+50

The main problem with your code is sketched in this section of the best practices: don't pass Dask collections to delayed functions. This means, you should use either the delayed API or the dataframe API. While you can convert dataframes<->delayed, simply passing like this is not recommended.

Furthermore,

  • you only have one row in your dataframe, so you only get one partition and no parallelism whatever. You can only slow things down like this.
  • this appears to be an everything-to-everything (N^2) operation, so if you had many rows (the normal case for Dask), it would presumably take extremely long, no matter how many cores you used
  • passing lists in a pandas row is not a great idea, perhaps you wanted to use an array?
  • the function doesn't return anything useful, so it's not at all clear what you are trying to achieve. Under the description of MVCE, you will see references to "expected outcome" and "what went wrong". To get more help, please be more precise.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.