I am trying to parallelize an operation that generates a very large numpy array and usually blows up the memory of a machine that is running it.
What I came up with is the following workflow:
- Use Dask to generate a lazy zero filled array
- Use X-Array to generate a DataArray, using the previous lazy zero array with its appropriate coordinates etc...
- Using
DataArray.map_blocksI call on a functionwrite_valuesthat gets a subset of a Numpy array from a separate file and then insert them into the appropriate location in thexarray.DataArray. - Lazily convert to
xarray.Datasetwith a name for theDataArray - Then I attempt to store into disk via
to_zarr
First: Is this the appropriate to handle an operation that loops through the blocks in a chunked array?
Second: When I run this program, it executes while blowing up my memory, this could be due to the amount of tasks created via Dask? How can I optimize to never hit the memory limit of my machine.
Third: After this code runs, I get a zarr stored into disk, but it seems to not actually do the storing of the values I get from the external function. Is this the right way to change values in the disk stored array.
Problem: My function that writes the .zarr into disk, does not write the values from the numpy_returning_volume. I am thinking that it could be that I need to write the values while in the map_blocks function?
Thank you!
Fully working example:
import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar
class NumpyReturningVolume():
def __init__(self):
# self.data = da.random.random_sample([50000, 50000, 50000])
self.data = np.random.random_sample([500, 1000, 100])
def num_i(self):
return self.data.shape[0]
def num_j(self):
return self.data.shape[1]
def num_k(self):
return self.data.shape[2]
def get_float(self, start_coordinate, end_coordinate):
return self.data[
start_coordinate[0]:end_coordinate[0],
start_coordinate[1]:end_coordinate[1],
start_coordinate[2]:end_coordinate[2]
]
def write_values(chunk, **kwargs):
start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)
volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
chunk.data = volume_data
return(chunk)
seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
seismic_file_name + "_test.zarr")
numpy_returning_volume = NumpyReturningVolume()
dimensions = ('track', 'bin', 'time')
track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)
empty_arr = da.empty(shape=(
numpy_returning_volume.num_i(),
numpy_returning_volume.num_j(),
numpy_returning_volume.num_k()),
dtype=np.float32)
xarray_data = xr.DataArray(empty_arr, name="seis", coords={
'track': track_coords,
'bin': bin_coords, 'time': time_coords},
dims=dimensions)
xarray_data.map_blocks(write_values, kwargs={
"volume": numpy_returning_volume}, template=xarray_data).compute()
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)
with ProgressBar():
delayed_results.compute()