Use map_blocks with function that returns a tuple

As shown below in Example 1, I have a function that takes a single input value and returns two values as a tuple. This function is applied to a list of parameters. In Example 2, I use Dask to chunk the parameters and map the function over the chunks. I had to change the return type of the mapped function from a tuple to a NumPy array to make it work with Dask. Since the computed output from map_blocks is a single array, I have to flatten the computed results into the separate a and b results. Is there a way to have map_blocks return a tuple?

Example 1 (no Dask)

import time

def calc_result(p: float) -> float:
    time.sleep(1)
    a = p + 1
    b = p + 2
    return a, b

def main():
    tic = time.perf_counter()

    params = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    results_a = []
    results_b = []

    for p in params:
        a, b = calc_result(p)
        results_a.append(a)
        results_b.append(b)

    toc = time.perf_counter()

    print(f'elapsed time {toc - tic:.2f} s')
    print(f'params\n{params}')
    print(f'results_a\n{results_a}')
    print(f'results_b\n{results_b}')

if __name__ == '__main__':
    main()

Example 2

import dask.array as da
import numpy as np
import time
from distributed import Client

def calc_result(p: float) -> float:
    time.sleep(1)
    a = p + 1
    b = p + 2
    return np.array([a, b])

def main():
    client = Client(n_workers=8)
    print(client.dashboard_link)

    tic = time.perf_counter()

    ps = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    params = da.from_array(ps, chunks=2)

    futures = da.map_blocks(calc_result, params, new_axis=1)
    ab = futures.compute()
    results_a = ab[::2].flatten()
    results_b = ab[1::2].flatten()

    toc = time.perf_counter()

    print(f'elapsed time {toc - tic:.2f}')
    print(f'params\n{np.array(params)}')
    print(f'results_a\n{results_a}')
    print(f'results_b\n{results_b}')

    client.close()

if __name__ == '__main__':
    np.set_printoptions(precision=2)
    main()

Is there a way to have map_blocks return a tuple?

Generally no, map_blocks is doing a fair bit of work behind the scenes to join all the small results back together, and that doesn’t work well with tuple output from the function.

It’s not entirely clear to me what your use case is, but I suspect making good use of the new_axis and/or drop_axis keyword arguments in map_overlap might be a better fit for your problem. If that’s not flexible enough, alternatively you could also use dask delayed.

I’ve written some more on a related topic here: Ragged output, how to handle awkward shaped results

How would you use Dask delayed for this example?

Hi @wigging! Re your question on using Dask delayed, the post @Genevieve linked has a great example. Additionally, it is a bit difficult to understand your use case, and I’d encourage you to check out this example and this example on how to best demonstrate your issue/question.

One idea would be to restructure your calc_result function so that it doesn’t return a tuple, and then perform any subsequent operations on the two resulting arrays. Perhaps something like:

import dask.array as da
import numpy as np

def calc_result(p, value):
    return p + value

params = da.from_array(np.arange(1,11), chunks=2)
results_a = da.map_blocks(calc_result, params, 1)
results_b = da.map_blocks(calc_result, params, 2)
1 Like

@scharlottej13 For your example, how would you call compute on the results? Would you call it twice like results_a.compute() and results_b.compute() or can compute be called on both results at once?

You can compute several collections at once using dask.compute, as discussed here in the API docs. If you are using a distributed client, you can also use Client.compute for the same purpose. Computing related collections together is often better because this allows for more graph optimization.

In principle, Dask always delays a chunk function’s execution, so dask doesn’t know what it returns until graph-execution time. This means that, technically, a chunk-function can return anything you like. However, map_blocks() is nosey and tries to find out what it returns even at graph-construction time. But you can prevent it from doing so by providing both the meta and dtype keywords.

meta need not be precisely of the same type/form as the chunk type returned. Its value is only nominal. In fact, it can be used to spoof dask into expecting something other than what the chunk-function actually returns. But the one essential thing is that meta should have a .ndim attribute.

A tuple does not have a .ndim attribute, so it cannot be used as a meta. So it is better to use an array for meta. In fact, the meta array does not even need to be of the right dimensionality.

Now, when I say that a chunk function can return anything, you have to be aware that not everything that it returns will result in successful computation, because compute() will eventually attempt to concatenate the computed chunks, and its success depends on whether those chunks are of the right type and shape. These constraints do not apply to persist() however. So never do compute() on a dask array whose chunk-function returns a tuple, for instance. Rather do persist().

The following code snippet is an example to help one deal with chunk-functions which return tuples (that is, more than one output if you will):

import numpy as np
import dask.array as da


# define your chunk function:
def make_tuple(x):
    # input `x` is expected to be a chunk of the same dimensionality
    # (3D) as its dask array `da_x` defined below: 
    subarrays = np.split(x, 2, axis=2)
    return subarrays[0], subarrays[1]  # return tuple

def get_1st(x):
    return x[0]

def get_2nd(x):
    return x[1]


np_x = np.arange(8).reshape(2, 2, 2)
da_x = da.from_array(np_x, chunks=(1, 2, 2))

# Create a `meta`:
# `meta` is just a zero-size array which is used to inform `map_blocks()`
# of what chunk type and dimensionality to expect to be returned from
# your chunk-function.  It saves/prevents dask from having to determine
# these itself and saves you a lot of hassle!
meta = np.array([], dtype=da_x.dtype)[(np.newaxis,)*(da_x.ndim - 1)]

tuple_array = da_x.map_blocks(make_tuple, dtype=da_x.dtype, meta=meta)

# You may persist `tuple_array`, but never compute it, as its chunks are
# tuples!  It's good to persist here to invoke `make_tuple()` only once
# per chunk, thus improving performance, otherwise later, `_1st_element`
# and `_2nd_element` will each invoke `make_tuple()` whenever they are
# computed!
tuple_array = tuple_array.persist()

_1st_element = tuple_array.map_blocks(get_1st, dtype=da_x.dtype, meta=meta)
_2nd_element = tuple_array.map_blocks(get_2nd, dtype=da_x.dtype, meta=meta)

# _1st_element or _2nd_element may be safely computed:
_1st_element.compute()
1 Like