Use map_blocks with function that returns a tuple

As shown below in Example 1, I have a function that takes a single input value and returns two values as a tuple. This function is applied to a list of parameters. In Example 2, I use Dask to chunk the parameters and map the function over the chunks. I had to change the return type of the mapped function from a tuple to a NumPy array to make it work with Dask. Since the computed output from map_blocks is a single array, I have to flatten the computed results into the separate a and b results. Is there a way to have map_blocks return a tuple?

Example 1 (no Dask)

import time

def calc_result(p: float) -> float:
    time.sleep(1)
    a = p + 1
    b = p + 2
    return a, b

def main():
    tic = time.perf_counter()

    params = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    results_a = []
    results_b = []

    for p in params:
        a, b = calc_result(p)
        results_a.append(a)
        results_b.append(b)

    toc = time.perf_counter()

    print(f'elapsed time {toc - tic:.2f} s')
    print(f'params\n{params}')
    print(f'results_a\n{results_a}')
    print(f'results_b\n{results_b}')

if __name__ == '__main__':
    main()

Example 2

import dask.array as da
import numpy as np
import time
from distributed import Client

def calc_result(p: float) -> float:
    time.sleep(1)
    a = p + 1
    b = p + 2
    return np.array([a, b])

def main():
    client = Client(n_workers=8)
    print(client.dashboard_link)

    tic = time.perf_counter()

    ps = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    params = da.from_array(ps, chunks=2)

    futures = da.map_blocks(calc_result, params, new_axis=1)
    ab = futures.compute()
    results_a = ab[::2].flatten()
    results_b = ab[1::2].flatten()

    toc = time.perf_counter()

    print(f'elapsed time {toc - tic:.2f}')
    print(f'params\n{np.array(params)}')
    print(f'results_a\n{results_a}')
    print(f'results_b\n{results_b}')

    client.close()

if __name__ == '__main__':
    np.set_printoptions(precision=2)
    main()

Is there a way to have map_blocks return a tuple?

Generally no, map_blocks is doing a fair bit of work behind the scenes to join all the small results back together, and that doesn’t work well with tuple output from the function.

It’s not entirely clear to me what your use case is, but I suspect making good use of the new_axis and/or drop_axis keyword arguments in map_overlap might be a better fit for your problem. If that’s not flexible enough, alternatively you could also use dask delayed.

I’ve written some more on a related topic here: Ragged output, how to handle awkward shaped results

How would you use Dask delayed for this example?

Hi @wigging! Re your question on using Dask delayed, the post @Genevieve linked has a great example. Additionally, it is a bit difficult to understand your use case, and I’d encourage you to check out this example and this example on how to best demonstrate your issue/question.

One idea would be to restructure your calc_result function so that it doesn’t return a tuple, and then perform any subsequent operations on the two resulting arrays. Perhaps something like:

import dask.array as da
import numpy as np

def calc_result(p, value):
    return p + value

params = da.from_array(np.arange(1,11), chunks=2)
results_a = da.map_blocks(calc_result, params, 1)
results_b = da.map_blocks(calc_result, params, 2)
1 Like

@scharlottej13 For your example, how would you call compute on the results? Would you call it twice like results_a.compute() and results_b.compute() or can compute be called on both results at once?

You can compute several collections at once using dask.compute, as discussed here in the API docs. If you are using a distributed client, you can also use Client.compute for the same purpose. Computing related collections together is often better because this allows for more graph optimization.