Dask array, twice delayed

Question:
Does anyone foresee any problems (in particular, scheduling issues) with delaying the creation of a dask array until execution time?

import dask.array as da
import numpy as np

def create_array(sz):
   return da.arange(sz)


indices = da.from_array([2, 0, 5])
size = da.max(indices) + 1

kwargs = {'dtype': np.int32, 'meta': np.array(0)}
_2x_dlayd_array = size.map_blocks(create_array, **kwargs)
_2x_dlayd_array.compute().compute()

Premise:
There are often times (especially in sparse matrix settings) when one needs to create a dask array whose shape/size is unknown at graph-construction time but will be determined later during execution time.

As at now, dask does not directly allow the creation of arrays of delayed shape/size.

One could, of course, simply call compute() to get the shape/size during graph-construction, but this is likely to be a computationally expensive and inconvenient approach that one would prefer to defer to the very end when it is more convenient to compute.

With the above example code-snippet this becomes possible.

Naturally, this complicates how one handles the “twice-delayed” array _2x_dlayd_array , since every subsequent result derived from this oddity is also twice-delayed. For example:

from functools import partial

do_something_to = partial(da.outer, indices)
_2x_dlayd_array = da.map_blocks(do_something_to, _2x_dlayd_array, **kwargs)
_2x_dlayd_array.compute().compute()

Any ideas, cautionary notes, etc., would be greatly appreciated!

Hi @ParticularMiner, welcome to discourse and thanks for this question! To start, we do not recommend the “twice delayed” approach, map_blocks is designed to map a function across each block of a Dask array, so using it this way is a bit unpredictable.

Instead, we recommend a different approach, and (if I’m understanding correctly) a similar question has been answered on stack overflow. As noted here, though not all operations on arrays with unknown chunk sizes are supported, map_blocks will work. You could therefore do something like:

import random
import numpy as np
import dask.array as da

def my_func(x):
    return x * 2

x = da.from_array(np.random.randn(100), chunks=20)
x += 0.1
# don't know how many values are greater than 0 ahead of time
y = x[x > 0]
y.map_blocks(my_func).compute()

Additionally, since you mention sparse matrices, you may find these docs helpful.

1 Like

Hi @scharlottej13 ,

Thank you very much for your welcoming reply and informative links!

I did have a feeling that my approach was not recommended. But it was the only way I could come up with to be able to continue supporting certain operations in my code, like slicing, which the recommended approach admits it does not support.

… so using it this way is a bit unpredictable.

If you don’t mind, could you expand on the unpredictability of this usage? As so far, I’ve gotten away with it with little to no blowback. I don’t mind if you get more technical about the details.

Thanks again.

Of course! I will track down a good answer for you :).

It’s not so much that it’s “unpredictable”, it’s more that it’s unexpected (and less performant).

  • You have to know that your code expects to run dask within dask, and know how many chained calls to .compute() are needed for your code to work
  • Dask array code expects each chunk to be a concrete (non-dask) array. Things might work the way you’re doing them, but they also might break if you later try a different operation. It’s not a supported method of use.
  • If you’re running with the distributed scheduler, you’ll run into inefficiencies as the graph is moved from the client to the scheduler then back to the client (after the first compute) then back to the scheduler again (for the second compute), leading to excessive data transfer and less than optimal scheduling.

It’s just not good practice to nest dask objects.

Rather, I’d recommend using arrays with unknown dimensions (we use np.nan to indicate this). See Chunks — Dask documentation for more information. This lets the array work for many common dask array operations, and a nice error should be raised for those where unknown chunk sizes are unsupported.

In your case, the following code should work:

In [1]: import dask.array as da

In [2]: import numpy as np

In [3]: indices = da.from_array([2, 0, 5])

In [4]: size = da.max(indices) + 1

In [5]: res = size.map_blocks(np.arange, dtype=np.int32, chunks=((np.nan,),))

In [6]: res
Out[6]: dask.array<arange, shape=(nan,), dtype=int32, chunksize=(nan,), chunktype=numpy.ndarray>

In [7]: res.compute()
Out[7]: array([0, 1, 2, 3, 4, 5])
3 Likes

Many thanks, @jcristharif

Your arguments are eye-opening and quite compelling, particularly the last point regarding the distributed scheduler. I suppose I’ll have to weigh my options more carefully now.

Thanks @scharlottej13 for tracking this down for me!