0

I need to save a dask array from a big numpy array. Below there is a minimum working example that’s show the process. Note that a is created with numpy.random only for this mwe, unfortunately I can not create the array with dask.

import numpy as np
import dask.array as da
from dask.distributed import Client

a = numpy.random.randint(0,2,size=4000000*8000).reshape((4000000,8000))
# here the conversion and saving
client = Client(n_workers=90, threads_per_worker=20, processes=True)
dask_array = da.from_array( a, chunks = 100000)
da.to_npy_stack(‘my-folder/’, dask_array)
client.close()

The problem that I am facing is that a in memory takes around 100GB, however when running the dask part the memory used starts going up until it almost full the available ram, that is more than 300GB. Then it does some computing and I got a memory error after some time (like 10 minutes). I need the array saved by dask as I have another pipeline (which cannot be connected directly to this pipeline) that’s using dask arrays and to read a dask array from memory the info file is required (if there is any other method to dump the array and create the info file I am open to try it).

Any suggestions on how to speed up and solve this task?

6
  • Instead of creating the array with numpy, can you create with dask ? > arr = da.random.randint(0,2,size=4000000*8000).reshape((4000000,8000)) Commented Feb 20, 2020 at 16:23
  • No, unfortunately I can not create the array with dask. @quasiben Commented Feb 20, 2020 at 16:27
  • how are you reading in the data ? Commented Feb 20, 2020 at 16:34
  • Data are coming from torch. But that’s out of scope for this question. I can also accept that it is not possible to use dask in this way, I am just looking for answers. Thank you @quasiben Commented Feb 20, 2020 at 17:08
  • any techniques in examples.dask.org/machine-learning/torch-prediction.html useful for you ? Commented Feb 20, 2020 at 18:04

2 Answers 2

1

If you are on a single machine then I recommend using the standard threaded scheduler rather than the dask.distributed.Client. You will keep all of the data in the same process this way, and remove the need to make copies of your large Numpy array.

Sign up to request clarification or add additional context in comments.

2 Comments

I will try it. In the meantime let me ask why dask developers suggest to always use the distributed scheduler even on a single machine @MRocklin
I personally do not suggest that you always use the distributed scheduler, just that it's usually a robustly good choice. There is no solution that is always the best choice in all situations.
1

Creating all of your data in the main process and then uploading it to the worker processes is a bad idea! You should always endeavour to load/create the data directly in the workers, which will a) avoid repeating work and copying data and b) keep the data lazy, only materialising it into memory at need:

In this case, this might look like

arr = da.random.randint(0, 2, size=4000000*8000, chunks=100000).reshape((4000000,8000))

2 Comments

(I wasn't sure about the chunks, whether you wanted that many original pieces or output pieces)
Thanks @mdurant, however as I commented also in the main question body I can not directly create the array with dask. The code is just an example for running it. The data are coming from another pipeline.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.