# Use map_blocks with function that returns a tuple

As shown below in Example 1, I have a function that takes a single input value and returns two values as a tuple. This function is applied to a list of parameters. In Example 2, I use Dask to chunk the parameters and map the function over the chunks. I had to change the return type of the mapped function from a tuple to a NumPy array to make it work with Dask. Since the computed output from map_blocks is a single array, I have to flatten the computed results into the separate a and b results. Is there a way to have map_blocks return a tuple?

``````import time

def calc_result(p: float) -> float:
time.sleep(1)
a = p + 1
b = p + 2
return a, b

def main():
tic = time.perf_counter()

params = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

results_a = []
results_b = []

for p in params:
a, b = calc_result(p)
results_a.append(a)
results_b.append(b)

toc = time.perf_counter()

print(f'elapsed time {toc - tic:.2f} s')
print(f'params\n{params}')
print(f'results_a\n{results_a}')
print(f'results_b\n{results_b}')

if __name__ == '__main__':
main()
``````

## Example 2

``````import dask.array as da
import numpy as np
import time
from distributed import Client

def calc_result(p: float) -> float:
time.sleep(1)
a = p + 1
b = p + 2
return np.array([a, b])

def main():
client = Client(n_workers=8)

tic = time.perf_counter()

ps = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
params = da.from_array(ps, chunks=2)

futures = da.map_blocks(calc_result, params, new_axis=1)
ab = futures.compute()
results_a = ab[::2].flatten()
results_b = ab[1::2].flatten()

toc = time.perf_counter()

print(f'elapsed time {toc - tic:.2f}')
print(f'params\n{np.array(params)}')
print(f'results_a\n{results_a}')
print(f'results_b\n{results_b}')

client.close()

if __name__ == '__main__':
np.set_printoptions(precision=2)
main()
``````

Is there a way to have map_blocks return a tuple?

Generally no, map_blocks is doing a fair bit of work behind the scenes to join all the small results back together, and that doesnâ€™t work well with tuple output from the function.

Itâ€™s not entirely clear to me what your use case is, but I suspect making good use of the `new_axis` and/or `drop_axis` keyword arguments in map_overlap might be a better fit for your problem. If thatâ€™s not flexible enough, alternatively you could also use dask delayed.

Iâ€™ve written some more on a related topic here: Ragged output, how to handle awkward shaped results

How would you use Dask delayed for this example?

Hi @wigging! Re your question on using Dask delayed, the post @Genevieve linked has a great example. Additionally, it is a bit difficult to understand your use case, and Iâ€™d encourage you to check out this example and this example on how to best demonstrate your issue/question.

One idea would be to restructure your `calc_result` function so that it doesnâ€™t return a tuple, and then perform any subsequent operations on the two resulting arrays. Perhaps something like:

``````import dask.array as da
import numpy as np

def calc_result(p, value):
return p + value

params = da.from_array(np.arange(1,11), chunks=2)
results_a = da.map_blocks(calc_result, params, 1)
results_b = da.map_blocks(calc_result, params, 2)
``````
1 Like

@scharlottej13 For your example, how would you call `compute` on the results? Would you call it twice like `results_a.compute()` and `results_b.compute()` or can compute be called on both results at once?

You can compute several collections at once using `dask.compute`, as discussed here in the API docs. If you are using a distributed client, you can also use `Client.compute` for the same purpose. Computing related collections together is often better because this allows for more graph optimization.

In principle, `Dask` always delays a chunk functionâ€™s execution, so `dask` doesnâ€™t know what it returns until graph-execution time. This means that, technically, a chunk-function can return anything you like. However, `map_blocks()` is nosey and tries to find out what it returns even at graph-construction time. But you can prevent it from doing so by providing both the `meta` and `dtype` keywords.

`meta` need not be precisely of the same type/form as the chunk type returned. Its value is only nominal. In fact, it can be used to spoof `dask` into expecting something other than what the chunk-function actually returns. But the one essential thing is that `meta` should have a `.ndim` attribute.

A tuple does not have a `.ndim` attribute, so it cannot be used as a `meta`. So it is better to use an array for `meta`. In fact, the `meta` array does not even need to be of the right dimensionality.

Now, when I say that a chunk function can return anything, you have to be aware that not everything that it returns will result in successful computation, because `compute()` will eventually attempt to concatenate the computed chunks, and its success depends on whether those chunks are of the right type and shape. These constraints do not apply to `persist()` however. So never do `compute()` on a `dask` array whose chunk-function returns a tuple, for instance. Rather do `persist()`.

The following code snippet is an example to help one deal with chunk-functions which return tuples (that is, more than one output if you will):

``````import numpy as np

def make_tuple(x):
# input `x` is expected to be a chunk of the same dimensionality
# (3D) as its dask array `da_x` defined below:
subarrays = np.split(x, 2, axis=2)
return subarrays[0], subarrays[1]  # return tuple

def get_1st(x):
return x[0]

def get_2nd(x):
return x[1]

np_x = np.arange(8).reshape(2, 2, 2)
da_x = da.from_array(np_x, chunks=(1, 2, 2))

# Create a `meta`:
# `meta` is just a zero-size array which is used to inform `map_blocks()`
# of what chunk type and dimensionality to expect to be returned from
# these itself and saves you a lot of hassle!
meta = np.array([], dtype=da_x.dtype)[(np.newaxis,)*(da_x.ndim - 1)]

tuple_array = da_x.map_blocks(make_tuple, dtype=da_x.dtype, meta=meta)

# You may persist `tuple_array`, but never compute it, as its chunks are
# tuples!  It's good to persist here to invoke `make_tuple()` only once
# per chunk, thus improving performance, otherwise later, `_1st_element`
# and `_2nd_element` will each invoke `make_tuple()` whenever they are
# computed!
tuple_array = tuple_array.persist()

_1st_element = tuple_array.map_blocks(get_1st, dtype=da_x.dtype, meta=meta)
_2nd_element = tuple_array.map_blocks(get_2nd, dtype=da_x.dtype, meta=meta)

# _1st_element or _2nd_element may be safely computed:
_1st_element.compute()
``````
1 Like