Double memory usage when calling `.compute()` on an array

Hello all,

I’ve spent today looking into this issue, and I think there’s a good chance it’s not really solvable, but on the off chance it’s actually a dask bug or someone has another bright idea, here goes.

I have a duck array which opens a file and loads the data. I then have a lot of these duck arrays for many files and then stack them into a single large dask array. Delaying the IO and everything works well.

What a user noticed is that when .compute() is called on this array there’s (for a while) a 2x memory usage to the total amount of bytes in the resulting array. I have been looking into this with memray and it seems that compute ends up calling dask.array.core.concatenate3 which allocates a new output array. Leading to each of the duck computed duck arrays and the output array being resident in memory at the same time.

I would have expected dask to stack the computed duck arrays into the resulting array and not created a new output array and copied the memory in, but maybe that assumption was incorrect?

Here’s the a standalone code example which generates the memray tree above:

MWE
import dask
dask.config.set(scheduler='synchronous')  # overwrite default with single-threaded scheduler

from functools import partial
import uuid
from time import sleep

import numpy as np
import dask.array as da


class ProxyArray:
    def __init__(self, *, shape, dtype=float):
        self.shape = shape
        self.dtype = dtype
        self.ndim = len(self.shape)
        self.size = np.prod(self.shape)

    def __getitem__(self, item):
        # sleep(0.05)
        return np.ones(self.shape)[item]

    
def _partial_to_array(loader, *, meta):
    # Set the name of the array to the filename, that should be unique within the array
    return da.from_array(loader, meta=meta, name=str(uuid.uuid4()))


def proxy_to_dask(loader_array):
    """
    Map a call to `dask.array.from_array` onto all the elements in ``loader_array``.

    This is done so that an explicit ``meta=`` argument can be provided to
    prevent loading data from disk.
    """
    if loader_array.size != 1 and len(loader_array.shape) != 1:
        raise ValueError("Can only be used on one dimensional arrays")

    loader_array = np.atleast_1d(loader_array)

    # The meta argument to from array is used to determine properties of the
    # array, such as dtype. We explicitly specify it here to prevent dask
    # trying to auto calculate it by reading from the actual array on disk.
    meta = np.zeros((0,), dtype=loader_array[0].dtype)

    to_array = partial(_partial_to_array, meta=meta)

    return map(to_array, loader_array)


element_shape = (1024, 2048)
stack = 256
arrays = np.array([ProxyArray(shape=element_shape) for _ in range(stack)])

dasks = proxy_to_dask(arrays)
final = da.stack(dasks)
final.compute()

Thanks!

Hi @Cadair, welcome to Dask community!

I didn’t check the implementation, but I’m not really surprised by this. I don’t think there has been a lot of optimization in order to stream rebuilding a local Numpy Array from a Dask one. And it might probably be complicated in a distributed Scheduler context.

Question is: do you really need to build this local Numpy Array? If you are using Dask just to optimize or simplify your IO operation, then it might not work in a memory optimized way. When using Dask, you usually reduce the amount of Data, or write result to disk in parallel once your computations are done.

Hi @guillaumeeb thanks a lot for your response.

I am investigating based on this user report: DKIST dataset load memory usage is doubled · Issue #377 · DKISTDC/dkist · GitHub

I don’t think they are really doing “the best” thing, just the obvious thing that someone who didn’t actively “choose” to use dask (we use it for loading because the alternative is madness :laughing:). If the answer to this is that we need better docs to teach our users how to do things differently then that’s good too.

Do you think it’s worth a feature request on GitHub for more efficient concatenation of local arrays?

Well, I think this is worth reporting at least to get more feedback from maintainers, but I don’t think there will be an active work on this kind of optimisation short term.