I intend to monitor the progress of distributed work following the distributed example under the “Progress Bar” section (I cannot put a link here due to the restriction on new users).
I fixed it myself and initially considered it’s a bug in cpython instead of Dask. I then reported the asyncio bug to the cpython team at:
However, cpython team considers this a dask implementation issue. I don’t know enough of dask to judge whether this is an issue with dask progress() or asyncio.
Could you help check and see what the root cause is and whether it is something that can be patched within Dask?
I was able to reproduce the error, but only on an HPC cluster (in my case using SLURMCluster).
When trying your code with only a LocalCluster, I didn’t run into the issue. Can you confirm it?
I’m not sure what in dask-jobqueue causes this error to arise at the end of the script execution. Unfortunately I’m one of the maintainer of the lib and don’t have a lot of time to investigate on this.
Great that this is reproducibl. Yes, I only encounter this when a cluster backend is used. The example does a basic task that should work according to the Dask document, hope this can be looked into as it makes the cluster distributed mode difficult to use.
import numpy as np
import zarr
from dask.distributed import Client, LocalCluster
from dask import array as da
from dask.distributed import progress
def same(x):
return x
x = np.arange(100000)
with LocalCluster(processes=False, n_workers=1, threads_per_worker=2) as cluster, Client(cluster) as client:
da_x = da.from_array(x,chunks=(2000,))
da_y = da_x.map_blocks(same)
futures = client.persist(da_y)
progress(futures,notebook=False)
y = da.compute(futures)[0]
It will produce:
[########################################] | 100% Completed | 0.3s
2024-04-15 18:37:18,521 - distributed.core - ERROR -
Traceback (most recent call last):
File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/utils.py", line 803, in wrapper
return await func(*args, **kwargs)
File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/scheduler.py", line 7203, in feed
await asyncio.sleep(interval)
File "/users/kangl/miniforge3/envs/work/lib/python3.10/asyncio/tasks.py", line 605, in sleep
return await future
asyncio.exceptions.CancelledError
But when I change to cluster to LocalCluster(processes=True, n_workers=1, threads_per_worker=2). It just works without any problem. I don’t know if it is related.