Calling dask.array.compute_chunk_sizes() with Asynchronous Client

In the following code:

async def main():
    import dask.array as da
    import numpy as np
    from distributed import Client
    client = Client(threads_per_worker=12, n_workers=1, asynchronous=True)

    depth = (1, 0)
    arr = da.from_array(np.zeros((4, 4), dtype=np.int64), chunks=(2, 2))
    print(arr.chunksize)
    padded = arr.map_overlap(func=lambda x: x, depth=depth)
    print(padded.chunksize)  # wrong chunk sizes
    padded.compute_chunk_sizes()  # crush!!

    await client.close()


if __name__ == '__main__':
    import asyncio
    asyncio.run(main())

I got a program crush with message “‘coroutine’ object is not iterable”. This would run fine in a synchronous environment, but in my case I need an asynchronous client and was unable to run this function. How should this be handled?

Any help would be welcome, thanks!

Hi,

I’m able to reproduce the problem, even in a notebook. I’m not an async expert, but I think that it comes from the fact that Dask computations are launched in the compute_chunk_sizes function. This result into the stacktrace:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[5], line 1
----> 1 padded.compute_chunk_sizes()

File /work/scratch/env/eynardbg/.conda/envs/pangeo2411/lib/python3.13/site-packages/dask/array/core.py:1553, in Array.compute_chunk_sizes(self)
   1548     c.append(tuple(chunk_shapes[s]))
   1550 # `map_blocks` assigns numpy dtypes
   1551 # cast chunk dimensions back to python int before returning
   1552 x._chunks = tuple(
-> 1553     tuple(int(chunk) for chunk in chunks) for chunks in compute(tuple(c))[0]
   1554 )
   1555 return x

File /work/scratch/env/eynardbg/.conda/envs/pangeo2411/lib/python3.13/site-packages/dask/base.py:662, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    659 with shorten_traceback():
    660     results = schedule(dsk, keys, **kwargs)
--> 662 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

TypeError: 'coroutine' object is not iterable

I guess this method is not async compatible. Maybe you could try reimplementing it using an await call before the compute.

cc @jacobtomlinson,

Maybe the best is to open an issue with your reproducer?

This just looks like an outright bug. Could you open an issue with your reproducer?