3

I am trying to parallelize an operation that generates a very large numpy array and usually blows up the memory of a machine that is running it.

What I came up with is the following workflow:

  1. Use Dask to generate a lazy zero filled array
  2. Use X-Array to generate a DataArray, using the previous lazy zero array with its appropriate coordinates etc...
  3. Using DataArray.map_blocks I call on a function write_values that gets a subset of a Numpy array from a separate file and then insert them into the appropriate location in the xarray.DataArray.
  4. Lazily convert to xarray.Dataset with a name for the DataArray
  5. Then I attempt to store into disk via to_zarr

First: Is this the appropriate to handle an operation that loops through the blocks in a chunked array?

Second: When I run this program, it executes while blowing up my memory, this could be due to the amount of tasks created via Dask? How can I optimize to never hit the memory limit of my machine.

Third: After this code runs, I get a zarr stored into disk, but it seems to not actually do the storing of the values I get from the external function. Is this the right way to change values in the disk stored array.

Problem: My function that writes the .zarr into disk, does not write the values from the numpy_returning_volume. I am thinking that it could be that I need to write the values while in the map_blocks function?

Thank you!

Fully working example:

import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar

class NumpyReturningVolume():
    def __init__(self):
        # self.data = da.random.random_sample([50000, 50000, 50000])
        self.data = np.random.random_sample([500, 1000, 100])

    def num_i(self):
        return self.data.shape[0]

    def num_j(self):
        return self.data.shape[1]

    def num_k(self):
        return self.data.shape[2]
    
    def get_float(self, start_coordinate, end_coordinate):
        return self.data[
            start_coordinate[0]:end_coordinate[0],
            start_coordinate[1]:end_coordinate[1],
            start_coordinate[2]:end_coordinate[2]
            ]


def write_values(chunk, **kwargs):

    start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
    end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)

    volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
    chunk.data = volume_data

    return(chunk)


seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
    seismic_file_name + "_test.zarr")

numpy_returning_volume = NumpyReturningVolume()

dimensions = ('track', 'bin', 'time')

track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)

empty_arr = da.empty(shape=(
    numpy_returning_volume.num_i(),
    numpy_returning_volume.num_j(),
    numpy_returning_volume.num_k()),
    dtype=np.float32)

xarray_data = xr.DataArray(empty_arr, name="seis", coords={
    'track': track_coords,
    'bin': bin_coords, 'time': time_coords},
    dims=dimensions)

xarray_data.map_blocks(write_values, kwargs={
                       "volume": numpy_returning_volume}, template=xarray_data).compute()
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)

with ProgressBar():
    delayed_results.compute()

1 Answer 1

5

OMG! I just realized that my problem was the simplest thing in the world! I just needed to set a variable equal to the result of map blocks and everything works. Here is the complete working script if anyone is interested. It generates a 6GB dataset though

import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar

class NumpyReturningVolume():
    def __init__(self):
        self.data = da.random.random_sample([1000, 2000, 1000])
        # self.data = np.random.random_sample([500, 1000, 100])

    def num_i(self):
        return self.data.shape[0]

    def num_j(self):
        return self.data.shape[1]

    def num_k(self):
        return self.data.shape[2]
    
    def get_float(self, start_coordinate, end_coordinate):
        return self.data[
            start_coordinate[0]:end_coordinate[0],
            start_coordinate[1]:end_coordinate[1],
            start_coordinate[2]:end_coordinate[2]
            ].compute()


def write_values(chunk, **kwargs):

    start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
    end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)

    volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
    chunk.data = volume_data

    return(chunk)


seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
    seismic_file_name + "_test.zarr")

numpy_returning_volume = NumpyReturningVolume()

dimensions = ('track', 'bin', 'time')

track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)

empty_arr = da.empty(shape=(
    numpy_returning_volume.num_i(),
    numpy_returning_volume.num_j(),
    numpy_returning_volume.num_k()),
    dtype=np.float32)

xarray_data = xr.DataArray(empty_arr, name="seis", coords={
    'track': track_coords,
    'bin': bin_coords, 'time': time_coords},
    dims=dimensions)

# This xarray_data = is what I was missing!!
xarray_data = xarray_data.map_blocks(write_values, kwargs={
                       "volume": numpy_returning_volume}, template=xarray_data)
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)

with ProgressBar():
    delayed_results.compute()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.