Hi, I am using Dask to create a small SGE cluster and run jobs over there. Occasionally I would see the following error when closing the client and the cluster. Does anyone has any idea what’s going on here? Thanks.
File "dask_utils.py", line 62, in dask_del
cluster.close()
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 110, in close
return self.sync(self._close, callback_timeout=timeout)
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 189, in sync
return sync(self.loop, func, *args, **kwargs)
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/distributed/utils.py", line 351, in sync
raise exc.with_traceback(tb)
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/distributed/utils.py", line 334, in f
result[0] = yield future
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/distributed/deploy/spec.py", line 418, in _close
assert w.status == Status.closed, w.status
AssertionError: Status.running
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/distributed/deploy/spec.py", line 652, in close_clusters
cluster.close(timeout=10)
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 110, in close
return self.sync(self._close, callback_timeout=timeout)
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 189, in sync
return sync(self.loop, func, *args, **kwargs)
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/site-packages/distributed/utils.py", line 348, in sync
e.wait(10)
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/threading.py", line 558, in wait
signaled = self._cond.wait(timeout)
File "/hpc/apps/pyhpc/dist/conda/x86_64/envs/cuda-11.0/lib/python3.8/threading.py", line 306, in wait
gotit = waiter.acquire(True, timeout)
This is my handy cleanup routine:
def dask_del(cluster, client, odask):
"""remove a dask job as well as tmp directory"""
client.close()
cluster.close()
# delete all dask file
for file in os.listdir(odask):
file_path = os.path.join(odask, file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path): shutil.rmtree(file_path)
except Exception as e:
print(e)