0

Hello I am trying to sort the data inside some netCDF4 (.nc) files into bins as efficiently as possible. I am currently trying this with xarray and NumPy's digitize function. Since I want to process a large amount of files I am using xarray.open_mfdataset, but this seems to cause an error when I apply the digitize function to it using xarray.apply_ufunc. My code is below:

import numpy as np
import xarray as xr

precip_bins = np.arange(0, 120, .1) # mm/hr, 0.1mm/hr bins
ds = xr.open_mfdataset(['/home/data/2022_rain.nc',
                        '/home/data/2023_rain.nc'],
                       chunks={'time': -1, 'latitude': 500, 'longitude': 700})
hourly_precip_ds = ds['precipitation']


print(hourly_precip_ds)

# Digitize the data into bins
bin_idxs = xr.apply_ufunc(np.digitize,
                          hourly_precip_ds,
                          precip_bins,
                          input_core_dims=[['time'], []],
                          output_core_dims=[['time']],
                          dask="parallelized",
                          dask_gufunc_kwargs={'allow_rechunk': True}) - 1  # Subtract 1 to make bins 0-indexed

Output:

<xarray.DataArray 'precipitation' (time: 17481, latitude: 500, longitude: 700)> Size: 24GB
dask.array<concatenate, shape=(17481, 500, 700), dtype=float32, chunksize=(8760, 500, 700), chunktype=numpy.ndarray>
Coordinates:
  * time       (time) datetime64[ns] 140kB 2022-01-01 ... 2023-12-31T23:00:00
  * latitude   (latitude) float32 2kB 69.95 69.85 69.75 ... 20.25 20.15 20.05
  * longitude  (longitude) float32 3kB -24.95 -24.85 -24.75 ... 44.85 44.95
Attributes:
    units:    mm/hr

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[9], line 2
      1 # Digitize the data into bins
----> 2 bin_idxs = xr.apply_ufunc(np.digitize,
      3                           hourly_precip_ds,
      4                           precip_bins,
      5                           input_core_dims=[['time'], []],
      6                           output_core_dims=[['time']],
      7                           dask="parallelized",
      8                           dask_gufunc_kwargs={'allow_rechunk': True}) - 1  # Subtract 1 to make bins 0-indexed
      9 print("++++++++++++++++++++++++++++++++++++++++++++++++")
     10 print(bin_idxs)

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:1278, in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, meta, dask_gufunc_kwargs, on_missing_core_dim, *args)
   1276 # feed DataArray apply_variable_ufunc through apply_dataarray_vfunc
   1277 elif any(isinstance(a, DataArray) for a in args):
-> 1278     return apply_dataarray_vfunc(
   1279         variables_vfunc,
   1280         *args,
   1281         signature=signature,
   1282         join=join,
   1283         exclude_dims=exclude_dims,
   1284         keep_attrs=keep_attrs,
   1285     )
   1286 # feed Variables directly through apply_variable_ufunc
   1287 elif any(isinstance(a, Variable) for a in args):

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:320, in apply_dataarray_vfunc(func, signature, join, exclude_dims, keep_attrs, *args)
    315 result_coords, result_indexes = build_output_coords_and_indexes(
    316     args, signature, exclude_dims, combine_attrs=keep_attrs
    317 )
    319 data_vars = [getattr(a, "variable", a) for a in args]
--> 320 result_var = func(*data_vars)
    322 out: tuple[DataArray, ...] | DataArray
    323 if signature.num_outputs > 1:

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:831, in apply_variable_ufunc(func, signature, exclude_dims, dask, output_dtypes, vectorize, keep_attrs, dask_gufunc_kwargs, *args)
    826     if vectorize:
    827         func = _vectorize(
    828             func, signature, output_dtypes=output_dtypes, exclude_dims=exclude_dims
    829         )
--> 831 result_data = func(*input_data)
    833 if signature.num_outputs == 1:
    834     result_data = (result_data,)

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:808, in apply_variable_ufunc.<locals>.func(*arrays)
    807 def func(*arrays):
--> 808     res = chunkmanager.apply_gufunc(
    809         numpy_func,
    810         signature.to_gufunc_string(exclude_dims),
    811         *arrays,
    812         vectorize=vectorize,
    813         output_dtypes=output_dtypes,
    814         **dask_gufunc_kwargs,
    815     )
    817     return res

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/namedarray/daskmanager.py:155, in DaskManager.apply_gufunc(self, func, signature, axes, axis, keepdims, output_dtypes, output_sizes, vectorize, allow_rechunk, meta, *args, **kwargs)
    138 def apply_gufunc(
    139     self,
    140     func: Callable[..., Any],
   (...)
    151     **kwargs: Any,
    152 ) -> Any:
    153     from dask.array.gufunc import apply_gufunc
--> 155     return apply_gufunc(
    156         func,
    157         signature,
    158         *args,
    159         axes=axes,
    160         axis=axis,
    161         keepdims=keepdims,
    162         output_dtypes=output_dtypes,
    163         output_sizes=output_sizes,
    164         vectorize=vectorize,
    165         allow_rechunk=allow_rechunk,
    166         meta=meta,
    167         **kwargs,
    168     )

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/dask/array/gufunc.py:431, in apply_gufunc(func, signature, axes, axis, keepdims, output_dtypes, output_sizes, vectorize, allow_rechunk, meta, *args, **kwargs)
    428 for dim, sizes in dimsizess.items():
    429     #### Check that the arrays have same length for same dimensions or dimension `1`
    430     if set(sizes) | {1} != {1, max(sizes)}:
--> 431         raise ValueError(f"Dimension `'{dim}'` with different lengths in arrays")
    432     if not allow_rechunk:
    433         chunksizes = chunksizess[dim]

ValueError: Dimension `'__loopdim1__'` with different lengths in arrays

1 Answer 1

-1

I am not 100% certain why the error appears but my guess is that the function believes you are trying to broadcast the ufunc over both the hourly_precip_ds xarray and the precip_bins array.

Moving the precip_bins array into the kwargs parameter of apply_ufunc seems to fix the error.

bin_idxs = xr.apply_ufunc(
  np.digitize,
  hourly_precip_ds,
  input_core_dims=[['time']],
  output_core_dims=[['time']],
  kwargs={"bins": precip_bins},
  dask="parallelized",
  dask_gufunc_kwargs={'allow_rechunk': True}
) - 1  # Subtract 1 to make bins 0-indexed
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for the solution. I'll note that the memory usage increased a ton: print(ds) <xarray.DataArray 'precipitation' (time: 17481, latitude: 500, longitude: 700)> Size: 24GB dask.array<concatenate, shape=(17481, 500, 700), dtype=float32, chunksize=(8760, 500, 700), chunktype=numpy.ndarray> and print(bin_idxs) <xarray.DataArray 'precipitation' (latitude: 500, longitude: 700, time: 17481)> Size: 49GB dask.array<sub, shape=(500, 700, 17481), dtype=int64, chunksize=(500, 700, 17481), chunktype=numpy.ndarray> but if it works it works I guess

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.