I am very new to to trying to parallelize my python code. I am trying to perform some analysis on an xarray, then fill in a pandas dataframe with the results. The columns of the dataframe are independent, so I think it should be trivial to parallelise using dask delayed, but can't work out how. My xarrays are quite big, so this loop takes a while, and is big in memory. It could also be chunked by time, instead, if that's easier (this might help with memory)!
Here is the un-parallelized version:
from time import sleep
import time
import pandas as pd
import dask.dataframe as dd
data1 = np.random.rand(4, 3,3)
data2=np.random.randint(4,size=(3,3))
locs1 = ["IA", "IL", "IN"]
locs2 = ['a', 'b', 'c']
times = pd.date_range("2000-01-01", periods=4)
xarray1 = xr.DataArray(data1, coords=[times, locs1, locs2], dims=["time", "space1", "space2"])
xarray2= xr.DataArray(data2, coords=[locs1, locs2], dims=[ "space1", "space2"])
def delayed_where(xarray1,xarray2,id):
sleep(1)
return xarray1.where(xarray2==id).mean(axis=(1,2)).to_dataframe(id)
final_df=pd.DataFrame(columns=range(4),index=times)
for column in final_df:
final_df[column]=delayed_where(xarray1,xarray2,column)
I would like to parallelize the for loop, but have tried:
final_df_delayed=pd.DataFrame(columns=range(4),index=times)
for column in final_df:
final_df_delayed[column]=delayed(delayed_where)(xarray1,xarray2,column)
final_df.compute()
Or maybe something with dask delayed?
final_df_dd=dd.from_pandas(final_df, npartitions=2)
for column in final_df:
final_df_dd[column]=delayed(delayed_where)(xarray1,xarray2,column)
final_df_dd.compute()
But none of these work. Can anyone help?