1

I am very new to to trying to parallelize my python code. I am trying to perform some analysis on an xarray, then fill in a pandas dataframe with the results. The columns of the dataframe are independent, so I think it should be trivial to parallelise using dask delayed, but can't work out how. My xarrays are quite big, so this loop takes a while, and is big in memory. It could also be chunked by time, instead, if that's easier (this might help with memory)!

Here is the un-parallelized version:

from time import sleep
import time
import pandas as pd
import dask.dataframe as dd

data1 = np.random.rand(4, 3,3)
data2=np.random.randint(4,size=(3,3))

locs1 = ["IA", "IL", "IN"]
locs2 = ['a', 'b', 'c']
times = pd.date_range("2000-01-01", periods=4)

xarray1 = xr.DataArray(data1, coords=[times, locs1, locs2], dims=["time", "space1", "space2"])
xarray2= xr.DataArray(data2, coords=[locs1, locs2], dims=[ "space1", "space2"])

def delayed_where(xarray1,xarray2,id):
    sleep(1)
    return xarray1.where(xarray2==id).mean(axis=(1,2)).to_dataframe(id)

final_df=pd.DataFrame(columns=range(4),index=times)

for column in final_df:
    final_df[column]=delayed_where(xarray1,xarray2,column)

I would like to parallelize the for loop, but have tried:

final_df_delayed=pd.DataFrame(columns=range(4),index=times)

for column in final_df:
    final_df_delayed[column]=delayed(delayed_where)(xarray1,xarray2,column)

final_df.compute()

Or maybe something with dask delayed?

final_df_dd=dd.from_pandas(final_df, npartitions=2)
for column in final_df:
    final_df_dd[column]=delayed(delayed_where)(xarray1,xarray2,column)

final_df_dd.compute()

But none of these work. Can anyone help?

1 Answer 1

1

You're using delayed correctly, but it's not possible to construct a dask dataframe in the way you specified.

from dask import delayed
import dask

@delayed
def delayed_where(xarray1,xarray2,id):
    sleep(1)
    return xarray1.where(xarray2==id).mean(axis=(1,2)).to_dataframe(id)

@delayed
def form_df(list_col_results):
    final_df=pd.DataFrame(columns=range(4),index=times)

    for n, column in enumerate(final_df):
        final_df[column]=list_col_results[n]
    return final_df

delayed_cols = [delayed_where(xarray1,xarray2, col) for col in final_df.columns]

delayed_df = form_df(delayed_cols)

delayed_df.compute()

Note that the enumeration is a clumsy way to get correct order of the columns, but your actual problem might guide you to a better way of specifying this (e.g. by explicitly specifying each column as an individual argument).

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you so much, I think this is exactly what I need. In particular, the bit I hadn't got my head around was also forming the dataframe in a delayed function, which then allows you to compute the whole thing at the end. Thanks also for introducing me to the @ notation, it's much cleaner than what I wrote.
You're welcome! If you use xarray, you might find this video useful, it seems like you are working with a very similar pattern: youtube.com/watch?v=84m0NlqJ_E0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.