As shown below in Example 1, I have a function that takes a single input value and returns two values as a tuple. This function is applied to a list of parameters. In Example 2, I use Dask to chunk the parameters and map the function over the chunks. I had to change the return type of the mapped function from a tuple to a NumPy array to make it work with Dask. Since the computed output from map_blocks is a single array, I have to flatten the computed results into the separate a and b results. Is there a way to have map_blocks return a tuple?
Example 1 (no Dask)
import time
def calc_result(p: float) -> float:
time.sleep(1)
a = p + 1
b = p + 2
return a, b
def main():
tic = time.perf_counter()
params = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
results_a = []
results_b = []
for p in params:
a, b = calc_result(p)
results_a.append(a)
results_b.append(b)
toc = time.perf_counter()
print(f'elapsed time {toc - tic:.2f} s')
print(f'params\n{params}')
print(f'results_a\n{results_a}')
print(f'results_b\n{results_b}')
if __name__ == '__main__':
main()
Example 2
import dask.array as da
import numpy as np
import time
from distributed import Client
def calc_result(p: float) -> float:
time.sleep(1)
a = p + 1
b = p + 2
return np.array([a, b])
def main():
client = Client(n_workers=8)
print(client.dashboard_link)
tic = time.perf_counter()
ps = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
params = da.from_array(ps, chunks=2)
futures = da.map_blocks(calc_result, params, new_axis=1)
ab = futures.compute()
results_a = ab[::2].flatten()
results_b = ab[1::2].flatten()
toc = time.perf_counter()
print(f'elapsed time {toc - tic:.2f}')
print(f'params\n{np.array(params)}')
print(f'results_a\n{results_a}')
print(f'results_b\n{results_b}')
client.close()
if __name__ == '__main__':
np.set_printoptions(precision=2)
main()