Use map_blocks with function that returns a tuple

In principle, Dask always delays a chunk function’s execution, so dask doesn’t know what it returns until graph-execution time. This means that, technically, a chunk-function can return anything you like. However, map_blocks() is nosey and tries to find out what it returns even at graph-construction time. But you can prevent it from doing so by providing both the meta and dtype keywords.

meta need not be precisely of the same type/form as the chunk type returned. Its value is only nominal. In fact, it can be used to spoof dask into expecting something other than what the chunk-function actually returns. But the one essential thing is that meta should have a .ndim attribute.

A tuple does not have a .ndim attribute, so it cannot be used as a meta. So it is better to use an array for meta. In fact, the meta array does not even need to be of the right dimensionality.

Now, when I say that a chunk function can return anything, you have to be aware that not everything that it returns will result in successful computation, because compute() will eventually attempt to concatenate the computed chunks, and its success depends on whether those chunks are of the right type and shape. These constraints do not apply to persist() however. So never do compute() on a dask array whose chunk-function returns a tuple, for instance. Rather do persist().

The following code snippet is an example to help one deal with chunk-functions which return tuples (that is, more than one output if you will):

import numpy as np
import dask.array as da


# define your chunk function:
def make_tuple(x):
    # input `x` is expected to be a chunk of the same dimensionality
    # (3D) as its dask array `da_x` defined below: 
    subarrays = np.split(x, 2, axis=2)
    return subarrays[0], subarrays[1]  # return tuple

def get_1st(x):
    return x[0]

def get_2nd(x):
    return x[1]


np_x = np.arange(8).reshape(2, 2, 2)
da_x = da.from_array(np_x, chunks=(1, 2, 2))

# Create a `meta`:
# `meta` is just a zero-size array which is used to inform `map_blocks()`
# of what chunk type and dimensionality to expect to be returned from
# your chunk-function.  It saves/prevents dask from having to determine
# these itself and saves you a lot of hassle!
meta = np.array([], dtype=da_x.dtype)[(np.newaxis,)*(da_x.ndim - 1)]

tuple_array = da_x.map_blocks(make_tuple, dtype=da_x.dtype, meta=meta)

# You may persist `tuple_array`, but never compute it, as its chunks are
# tuples!  It's good to persist here to invoke `make_tuple()` only once
# per chunk, thus improving performance, otherwise later, `_1st_element`
# and `_2nd_element` will each invoke `make_tuple()` whenever they are
# computed!
tuple_array = tuple_array.persist()

_1st_element = tuple_array.map_blocks(get_1st, dtype=da_x.dtype, meta=meta)
_2nd_element = tuple_array.map_blocks(get_2nd, dtype=da_x.dtype, meta=meta)

# _1st_element or _2nd_element may be safely computed:
_1st_element.compute()
1 Like