Error in monitoring progress of distributed work - related to asyncio

I intend to monitor the progress of distributed work following the distributed example under the “Progress Bar” section (I cannot put a link here due to the restriction on new users).

My code and exception I got was described at: asyncio exception when monitoring distributed compute · Issue #8382 · dask/distributed · GitHub

I fixed it myself and initially considered it’s a bug in cpython instead of Dask. I then reported the asyncio bug to the cpython team at:

However, cpython team considers this a dask implementation issue. I don’t know enough of dask to judge whether this is an issue with dask progress() or asyncio.

Could you help check and see what the root cause is and whether it is something that can be patched within Dask?

Thank you!

The example I refer to is on this page: Diagnostics (distributed) — Dask documentation

Hi @data2code, welcome to Dask community,

I was able to reproduce the error, but only on an HPC cluster (in my case using SLURMCluster).
When trying your code with only a LocalCluster, I didn’t run into the issue. Can you confirm it?

I’m not sure what in dask-jobqueue causes this error to arise at the end of the script execution. Unfortunately I’m one of the maintainer of the lib and don’t have a lot of time to investigate on this.

Great that this is reproducibl. Yes, I only encounter this when a cluster backend is used. The example does a basic task that should work according to the Dask document, hope this can be looked into as it makes the cluster distributed mode difficult to use.

Thank you!

I have a similar problem:

import numpy as np
import zarr
from dask.distributed import Client, LocalCluster
from dask import array as da
from dask.distributed import progress

def same(x):
    return x

x = np.arange(100000)

with LocalCluster(processes=False, n_workers=1, threads_per_worker=2) as cluster, Client(cluster) as client:
    da_x = da.from_array(x,chunks=(2000,))
    da_y = da_x.map_blocks(same)
    futures = client.persist(da_y)
    progress(futures,notebook=False)
    y = da.compute(futures)[0]

It will produce:

[########################################] | 100% Completed |  0.3s
2024-04-15 18:37:18,521 - distributed.core - ERROR - 
Traceback (most recent call last):
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/utils.py", line 803, in wrapper
    return await func(*args, **kwargs)
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/scheduler.py", line 7203, in feed
    await asyncio.sleep(interval)
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/asyncio/tasks.py", line 605, in sleep
    return await future
asyncio.exceptions.CancelledError

But when I change to cluster to LocalCluster(processes=True, n_workers=1, threads_per_worker=2). It just works without any problem. I don’t know if it is related.