Hello all,
I’ve spent today looking into this issue, and I think there’s a good chance it’s not really solvable, but on the off chance it’s actually a dask bug or someone has another bright idea, here goes.
I have a duck array which opens a file and loads the data. I then have a lot of these duck arrays for many files and then stack them into a single large dask array. Delaying the IO and everything works well.
What a user noticed is that when .compute()
is called on this array there’s (for a while) a 2x memory usage to the total amount of bytes in the resulting array. I have been looking into this with memray and it seems that compute
ends up calling dask.array.core.concatenate3
which allocates a new output array. Leading to each of the duck computed duck arrays and the output array being resident in memory at the same time.
I would have expected dask to stack the computed duck arrays into the resulting array and not created a new output array and copied the memory in, but maybe that assumption was incorrect?
Here’s the a standalone code example which generates the memray tree above:
MWE
import dask
dask.config.set(scheduler='synchronous') # overwrite default with single-threaded scheduler
from functools import partial
import uuid
from time import sleep
import numpy as np
import dask.array as da
class ProxyArray:
def __init__(self, *, shape, dtype=float):
self.shape = shape
self.dtype = dtype
self.ndim = len(self.shape)
self.size = np.prod(self.shape)
def __getitem__(self, item):
# sleep(0.05)
return np.ones(self.shape)[item]
def _partial_to_array(loader, *, meta):
# Set the name of the array to the filename, that should be unique within the array
return da.from_array(loader, meta=meta, name=str(uuid.uuid4()))
def proxy_to_dask(loader_array):
"""
Map a call to `dask.array.from_array` onto all the elements in ``loader_array``.
This is done so that an explicit ``meta=`` argument can be provided to
prevent loading data from disk.
"""
if loader_array.size != 1 and len(loader_array.shape) != 1:
raise ValueError("Can only be used on one dimensional arrays")
loader_array = np.atleast_1d(loader_array)
# The meta argument to from array is used to determine properties of the
# array, such as dtype. We explicitly specify it here to prevent dask
# trying to auto calculate it by reading from the actual array on disk.
meta = np.zeros((0,), dtype=loader_array[0].dtype)
to_array = partial(_partial_to_array, meta=meta)
return map(to_array, loader_array)
element_shape = (1024, 2048)
stack = 256
arrays = np.array([ProxyArray(shape=element_shape) for _ in range(stack)])
dasks = proxy_to_dask(arrays)
final = da.stack(dasks)
final.compute()
Thanks!