Search for deadlock cause | freeze upon data read; distributed; other?

Hi,

I am experiencing deadlocks with a python application using, among others, dask.distributed, xarray, zarr, SMB storage.

The application runs continuously with a PC reboot every ~24h. It has run OK for several months with dask 2023.6.0.
Updates were then made and dask/xarray/distributed updated to the latest stable in 07/2024. Since then I have experienced deadlocks and I cannot trace them to a particular change.

After suspecting many components of the application, a deadlock recently occurred in a rare period while the system was doing almost nothing: checking the content of a text file on SMB storage every 30s (with a proper context manager with path.open("r")). This deadlock happened several hours after the main application performed its usual tasks, in the same python session (I cannot rule out that something bad occurred at that time leading to the deadlock). I think that this deadlock occurrence still narrows down the issue a lot.

My only way to break the deadlocks is to Ctrl-C the console running the python main, which always gives this traceback (and stops the application):

Traceback
ERROR 2024-08-17T07:44:21.081  | 11972 | Dask Worker process (from Nanny) | MainThread | 11968 | utils | distributed.worker | __exit__ | 862 | (<class 'asyncio.exceptions.CancelledError'>, CancelledError(), <traceback object at 0x0000020C6073F640>) | None
Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\compatibility.py", line 204, in asyncio_run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 691, in run_until_complete
    self.run_forever()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 658, in run_forever
    self._run_once()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 2070, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\worker.py", line 1558, in close
    await r.close_gracefully(reason=reason)
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1251, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1479, in connect
    return await self._connect(addr=addr, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1423, in _connect
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\core.py", line 377, in connect
    handshake = await comm.read()
                ^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError
ERROR 2024-08-17T07:44:21.080  | 11724 | Dask Worker process (from Nanny) | MainThread | 11728 | utils | distributed.worker | __exit__ | 862 | (<class 'asyncio.exceptions.CancelledError'>, CancelledError(), <traceback object at 0x00000290266AF100>) | None
Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\compatibility.py", line 204, in asyncio_run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 691, in run_until_complete
    self.run_forever()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 658, in run_forever
    self._run_once()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 2070, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\worker.py", line 1558, in close
    await r.close_gracefully(reason=reason)
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1251, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1479, in connect
    return await self._connect(addr=addr, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1423, in _connect
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\core.py", line 377, in connect
    handshake = await comm.read()
                ^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError
ERROR 2024-08-17T07:44:21.081  | 2228 | Dask Worker process (from Nanny) | MainThread | 2148 | utils | distributed.worker | __exit__ | 862 | (<class 'asyncio.exceptions.CancelledError'>, CancelledError(), <traceback object at 0x000001AE2FB7E0C0>) | None
Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\compatibility.py", line 204, in asyncio_run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 691, in run_until_complete
    self.run_forever()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 658, in run_forever
    self._run_once()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 2070, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\worker.py", line 1558, in close
    await r.close_gracefully(reason=reason)
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1251, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1479, in connect
    return await self._connect(addr=addr, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1423, in _connect
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\core.py", line 377, in connect
    handshake = await comm.read()
                ^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError
ERROR 2024-08-17T07:44:21.082  | 9744 | Dask Worker process (from Nanny) | MainThread | 1372 | utils | distributed.worker | __exit__ | 862 | (<class 'asyncio.exceptions.CancelledError'>, CancelledError(), <traceback object at 0x0000027015E72940>) | None
Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\compatibility.py", line 204, in asyncio_run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 691, in run_until_complete
    self.run_forever()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 658, in run_forever
    self._run_once()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 2070, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\worker.py", line 1558, in close
    await r.close_gracefully(reason=reason)
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1251, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1479, in connect
    return await self._connect(addr=addr, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1423, in _connect
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\core.py", line 377, in connect
    handshake = await comm.read()
                ^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError
ERROR 2024-08-17T07:44:21.083  | 1672 | Dask Worker process (from Nanny) | MainThread | 1668 | utils | distributed.worker | __exit__ | 862 | (<class 'asyncio.exceptions.CancelledError'>, CancelledError(), <traceback object at 0x0000026ECE076800>) | None
Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\compatibility.py", line 204, in asyncio_run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 691, in run_until_complete
    self.run_forever()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 658, in run_forever
    self._run_once()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 2070, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\worker.py", line 1558, in close
    await r.close_gracefully(reason=reason)
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1251, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1479, in connect
    return await self._connect(addr=addr, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1423, in _connect
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\core.py", line 377, in connect
    handshake = await comm.read()
                ^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError
ERROR 2024-08-17T07:44:21.084  | 6832 | Dask Worker process (from Nanny) | MainThread | 1612 | utils | distributed.worker | __exit__ | 862 | (<class 'asyncio.exceptions.CancelledError'>, CancelledError(), <traceback object at 0x0000016CDB987940>) | None
Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\compatibility.py", line 204, in asyncio_run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 691, in run_until_complete
    self.run_forever()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 658, in run_forever
    self._run_once()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 2070, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\worker.py", line 1558, in close
    await r.close_gracefully(reason=reason)
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1251, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1479, in connect
    return await self._connect(addr=addr, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1423, in _connect
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\core.py", line 377, in connect
    handshake = await comm.read()
                ^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError
ERROR 2024-08-17T07:44:21.090  | 11880 | Dask Worker process (from Nanny) | MainThread | 11872 | utils | distributed.worker | __exit__ | 862 | (<class 'asyncio.exceptions.CancelledError'>, CancelledError(), <traceback object at 0x0000015867EB5240>) | None
Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\compatibility.py", line 204, in asyncio_run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 691, in run_until_complete
    self.run_forever()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 658, in run_forever
    self._run_once()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 2070, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\worker.py", line 1558, in close
    await r.close_gracefully(reason=reason)
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1251, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1479, in connect
    return await self._connect(addr=addr, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1423, in _connect
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\core.py", line 377, in connect
    handshake = await comm.read()
                ^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError
ERROR 2024-08-17T07:44:21.092  | 11592 | Dask Worker process (from Nanny) | MainThread | 11636 | utils | distributed.worker | __exit__ | 862 | (<class 'asyncio.exceptions.CancelledError'>, CancelledError(), <traceback object at 0x000001BABFD1B480>) | None
Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\compatibility.py", line 204, in asyncio_run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 691, in run_until_complete
    self.run_forever()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 658, in run_forever
    self._run_once()
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 2070, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\asyncio\runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\worker.py", line 1558, in close
    await r.close_gracefully(reason=reason)
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1251, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1479, in connect
    return await self._connect(addr=addr, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\core.py", line 1423, in _connect
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\core.py", line 377, in connect
    handshake = await comm.read()
                ^^^^^^^^^^^^^^^^^
  File "C:\Users\myuser\AppData\Local\anaconda3\Lib\site-packages\distributed\comm\tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError
  • this simple text file reading task was not performed when all the previous deadlocks occurred
  • in the application there is always a distributed process-based client running (8 processes with 4 threads each, 32 core PC)
  • the deadlocks typically occur after 2-10 hours of the main application running but still seem random: yesterday it ran for 24 hours without deadlock, today 3 deadlocks already occurred.
  • after activating all possible distributed logging on the client and workers, I only found one warning that apparently is not responsible for the deadlock, see here.
xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.11.5 | packaged by Anaconda, Inc. | (main, Sep 11 2023, 13:26:23) [MSC v.1916 64 bit (AMD64)]
python-bits: 64
OS: Windows
OS-release: 10
machine: AMD64
processor: Intel64 Family 6 Model 85 Stepping 7, GenuineIntel
byteorder: little
LC_ALL: None
LANG: None
LOCALE: (‘English_United States’, ‘1252’)
libhdf5: 1.12.1
libnetcdf: None

xarray: 2024.6.0
pandas: 2.0.3
numpy: 1.24.3
scipy: 1.13.0
netCDF4: None
pydap: None
h5netcdf: None
h5py: 3.9.0
zarr: 2.18.2
cftime: None
nc_time_axis: None
iris: None
bottleneck: 1.3.5
dask: 2024.7.1
distributed: 2024.7.1
matplotlib: 3.8.2
cartopy: None
seaborn: 0.12.2
numbagg: None
fsspec: 2024.6.1
cupy: None
pint: None
sparse: None
flox: None
numpy_groupies: None
setuptools: 68.0.0
pip: 24.1.2
conda: 23.9.0
pytest: 7.4.0
mypy: None
IPython: 8.15.0
sphinx: 5.0.2

The main application runs about 100k tasks per hour: mostly analyzing images saved on SMB storage and storing numeric results in a xarray dataset saved in .zarr format on SMB storage.

Less than 10x/ hour there are occurrences of Worker failed to heartbeat for 83s; attempting restart. To inspect the tasks of the dying worker, I added a log statement of the worker.processing in check_worker_ttl here in the Scheduler code. There are usually about 4-6 tasks. The only common feature I could find among the hanging tasks is that they read data from SMB storage. I added many try/except in these tasks which do not flag anything. I am currently imagining a sort of freeze upon entering a with path.open(): or PIL.Image.open(path) with files stored on SMB storage.

My current 2 hypotheses are:

  1. something freezes upon file read on SMB storage (I now added a logger.debug message before and after any file read, waiting for the next deadlock).
  2. distributed alone ends up deadlocking after some time

During a deadlock:

  • the client dashboard is not available any more. I also cannot access the client from another python process when specifying the port address.
  • the many heartbeating logging debug messages of distributed.core stop
  • I once added some logging in a root asyncio or distributed loop (I think the one shown in the traceback), and I could see that the loop was alive during a deadlock when everything else was frozen.

One observation: most of my tasks are bound methods of agents originally defined using dataclasses. I have seen this older comment and this recent PR that maybe hint at some instability with dataclasses.

Questions

  1. does the deadlock traceback upon KeyboardInterrupt tell something that I missed?
  2. are the traceback and symptoms consistent with the hypothesis that my system sometimes freezes when reading data?
  3. is it otherwise possible that distributed simply deadlocks out of nowhere after several hours, and I am wrongly blaming interaction with storage?
  4. what can I do to locate the deadlock? should I register plugins on workers to log some specific events?

Thanks for any help.

I used to have the default worker-ttl: "600s" in the distributed config. When these worker failures occurred, the dashboard was showing a freeze at the end of completing a batch of tasks, such as in the screenshot below, with 86/91 completed. Everything froze for 600s, the worker was restarted (by checker_worker_ttl), then things continued.

This symptom of freeze always occurred at 95% percent completion with only a few tasks left.

I see that this symptom of deadlock at the very end of a task batch has been reported in at least 2x more instances: here, and here.

I now have worker-ttl: "60s". When the worker failures occur I believe that there is no more general freeze due to the single worker failure (the true deadlocks still occur as reported in the original post).

  1. Could the deadlock be caused by dask/distributed/#8616? If yes, then I could maybe simply upgrade to py3.12 instead of my current py3.11.

Correction: 2023.10.0.

Another deadlock occurred and it did not happen around a data read. I think that this is weakening this hypothesis.

I am pasting below the last few seconds of all client-side distributed logs below (32k character post limit). I can share full logs if needed.

The logs show that completed tasks reach the released state but never reach the forgotten state. I believe that it is not normal? My local cluster might be up for 24 hours, runs more than 1e6 tasks, and no task ever gets forgotten?

Some notes maybe related to that:

  1. most of my tasks are launched from other tasks. I always use a worker_client context manager with the recommended scheme:
with worker_client(separate_thread=True) as client:
	future = client.submit(task)
	client.gather(future)
  1. I am still experiencing this issue. Already gathered tasks get randomly re-run in large batches several hours later, unless I use this workaround: cancelling root tasks upon gathering them. I call “root” tasks the original ones in the context of launching tasks from tasks. Is this cancel-upon-gathering workaround a problem?

  2. Almost all my root tasks are bound methods of dataclass instances.

  3. Many of the tasks read data from a zarr-formatted xarray dataset. It can happen that tasks read the same data. Could this cause a dependency issue among tasks that is preventing them to reach the forgotten state?
    In one of the tasks that gets run 20x/minute, I switched to using chunks=None to avoid dask in xarray.open_zarr. But I cannot do this in other tasks where I need dask to load more data in many chunks.

Last few seconds of distributed logs before deadlock
DEBUG 2024-08-20T22:13:09.879 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:09.928 | distributed.core | Message from 'tcp://127.0.0.1:52831': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:51874', 'now': 1724206389.9142122, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'latency': 0.00785517692565918, 'tick-duration': 0.5060315132141113}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 239, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 6347}, 'event_loop_interval': 0.020789613326390583, 'cpu': 0.0, 'memory': 770019328, 'time': 1724206389.3883238, 'host_net_io': {'read_bps': 187185.69774082265, 'write_bps': 5075574.697301888}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 100856.97151691602}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:09.930 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:09.951 | distributed.core | Message from 'tcp://127.0.0.1:52993': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52968', 'now': 1724206389.9348505, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.5160410404205322, 'latency': 0.029094219207763672}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 189, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 3306}, 'event_loop_interval': 0.020760964374152983, 'cpu': 0.0, 'memory': 673681408, 'time': 1724206389.4199903, 'host_net_io': {'read_bps': 169312.66657718315, 'write_bps': 5147930.481073159}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 101469.09946771116}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:09.955 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:09.979 | distributed.core | Message from 'tcp://127.0.0.1:52032': {'op': 'identity', 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:09.980 | distributed.core | Calling into handler identity | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.013 | distributed.core | Message from 'tcp://127.0.0.1:50707': {'op': 'identity', 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.015 | distributed.core | Calling into handler identity | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.028 | distributed.core | Message from 'tcp://127.0.0.1:54122': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:54089', 'now': 1724206390.0254462, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'latency': 0.020717382431030273, 'tick-duration': 0.5045604705810547}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 59}, 'event_loop_interval': 0.020172195434570314, 'cpu': 9.1, 'memory': 421269504, 'time': 1724206389.522894, 'host_net_io': {'read_bps': 171754.8490986988, 'write_bps': 5202476.753975689}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.030 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.079 | distributed.core | Message from 'tcp://127.0.0.1:52804': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52800', 'now': 1724206390.054684, 'metrics': {'task_counts': {'executing': 1}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'profile-duration': 0.007178544998168945, 'latency': 0.016718626022338867, 'tick-duration': 0.4435253143310547}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 258, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 3616}, 'event_loop_interval': 0.02072794096810477, 'cpu': 11.7, 'memory': 657149952, 'time': 1724206389.5803127, 'host_net_io': {'read_bps': 167238.9418620977, 'write_bps': 5098746.718282465}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {'_run-e3e39d0c8564d22315971f56ffbedd69': 1.25016188621521}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.081 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.106 | distributed.core | Message from 'tcp://127.0.0.1:53547': {'op': 'identity', 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.106 | distributed.core | Calling into handler identity | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.218 | distributed.core | Message from 'tcp://127.0.0.1:53526': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:53523', 'now': 1724206390.204076, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.5164482593536377, 'latency': 0.02159428596496582}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 33, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 1574}, 'event_loop_interval': 0.020813201864560444, 'cpu': 0.0, 'memory': 698822656, 'time': 1724206390.194567, 'host_net_io': {'read_bps': 140642.99157572613, 'write_bps': 4414243.372120053}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.218 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.221 | distributed.core | Message from 'tcp://127.0.0.1:54123': {'op': 'identity', 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.221 | distributed.core | Calling into handler identity | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.262 | distributed.core | Message from 'tcp://127.0.0.1:52041': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52029', 'now': 1724206390.2301447, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.4809408187866211, 'latency': 0.04036593437194824}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 366, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 5920}, 'event_loop_interval': 0.02025573861365225, 'cpu': 6.2, 'memory': 794079232, 'time': 1724206389.7502222, 'host_net_io': {'read_bps': 170728.9686614519, 'write_bps': 5207211.566705583}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.262 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.361 | distributed.core | Message from 'tcp://127.0.0.1:52452': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52449', 'now': 1724206390.34587, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.5130734443664551, 'latency': 0.007166147232055664}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 350, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 4496}, 'event_loop_interval': 0.019679331311992572, 'cpu': 3.2, 'memory': 710520832, 'time': 1724206390.3271322, 'host_net_io': {'read_bps': 144173.60316119067, 'write_bps': 4599621.29790883}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.362 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.378 | distributed.core | Message from 'tcp://127.0.0.1:52542': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52539', 'now': 1724206390.3692362, 'metrics': {'task_counts': {'long-running': 1}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.49875473976135254, 'profile-duration': 0.0038979053497314453, 'latency': 0.021382808685302734}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 343, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 4284}, 'event_loop_interval': 0.021115181293893368, 'cpu': 39.4, 'memory': 2138636288, 'time': 1724206389.8718908, 'host_net_io': {'read_bps': 164541.58734773673, 'write_bps': 5049403.500779633}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {'_run-4afda04ff3bdaa754bd5b62dbef06233': 215.26723909378052}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.379 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.432 | distributed.core | Message from 'tcp://127.0.0.1:51918': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:51874', 'now': 1724206390.4176645, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.48685312271118164, 'latency': 0.01943182945251465}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 239, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 6347}, 'event_loop_interval': 0.020789613326390583, 'cpu': 3.0, 'memory': 770019328, 'time': 1724206389.9151478, 'host_net_io': {'read_bps': 165026.49364236387, 'write_bps': 5118670.449071058}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.434 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.461 | distributed.core | Message from 'tcp://127.0.0.1:54092': {'op': 'identity', 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.463 | distributed.core | Calling into handler identity | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.468 | distributed.core | Message from 'tcp://127.0.0.1:54106': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52968', 'now': 1724206390.4482503, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'latency': 0.01152801513671875, 'tick-duration': 0.4828603267669678}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 189, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 3306}, 'event_loop_interval': 0.020760964374152983, 'cpu': 3.0, 'memory': 673681408, 'time': 1724206389.9359024, 'host_net_io': {'read_bps': 154627.35551608173, 'write_bps': 4815533.21277468}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.469 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.510 | distributed.core | Message from 'tcp://127.0.0.1:54122': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:54089', 'now': 1724206390.5073721, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.5123684406280518, 'latency': 0.011069536209106445}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 59}, 'event_loop_interval': 0.019717864990234375, 'cpu': 0.0, 'memory': 421269504, 'time': 1724206390.0262668, 'host_net_io': {'read_bps': 154490.14933141595, 'write_bps': 4831854.936692853}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.511 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.592 | distributed.core | Message from 'tcp://127.0.0.1:52848': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52800', 'now': 1724206390.5690987, 'metrics': {'task_counts': {'executing': 1}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.5274598598480225, 'latency': 0.020902633666992188, 'profile-duration': 0.007411956787109375}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 258, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 3616}, 'event_loop_interval': 0.020236516485408862, 'cpu': 3.1, 'memory': 657149952, 'time': 1724206390.5497952, 'host_net_io': {'read_bps': 144144.83365835727, 'write_bps': 4573182.1889181845}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 3695757.637621699}}, 'executing': {'_run-e3e39d0c8564d22315971f56ffbedd69': 1.7645766735076904}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.592 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.603 | distributed.core | Message from 'tcp://127.0.0.1:52971': {'op': 'identity', 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.606 | distributed.core | Calling into handler identity | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.610 | distributed.core | Message from 'tcp://127.0.0.1:50649': {'op': 'set_metadata', 'keys': ('cluster-manager-info',), 'value': {'name': 'process', 'type': 'distributed.deploy.local.LocalCluster'}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.612 | distributed.core | Calling into handler set_metadata | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.657 | distributed.core | Message from 'tcp://127.0.0.1:52478': {'op': 'identity', 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.660 | distributed.core | Calling into handler identity | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.724 | distributed.core | Message from 'tcp://127.0.0.1:53547': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:53523', 'now': 1724206390.711933, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.4953775405883789, 'latency': 0.006109476089477539}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 33, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 1574}, 'event_loop_interval': 0.02002980662327187, 'cpu': 3.1, 'memory': 698822656, 'time': 1724206390.6897569, 'host_net_io': {'read_bps': 164298.5520704387, 'write_bps': 5124608.736025628}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 3686019.507652876}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.725 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.797 | distributed.core | Message from 'tcp://127.0.0.1:52476': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52029', 'now': 1724206390.7602613, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.47981858253479004, 'latency': 0.0326840877532959}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 366, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 5920}, 'event_loop_interval': 0.02045477197525349, 'cpu': 0.0, 'memory': 794079232, 'time': 1724206390.7401502, 'host_net_io': {'read_bps': 160104.912050841, 'write_bps': 4938728.885129144}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 3588715.662984565}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.797 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.842 | distributed.core | Message from 'tcp://127.0.0.1:52477': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52449', 'now': 1724206390.8379126, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.492218017578125, 'latency': 0.015117645263671875}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 350, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 4496}, 'event_loop_interval': 0.019679331311992572, 'cpu': 0.0, 'memory': 710520832, 'time': 1724206390.8192666, 'host_net_io': {'read_bps': 160738.64390191913, 'write_bps': 4944908.330893248}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 3708906.288367296}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.844 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.863 | distributed.core | Message from 'tcp://127.0.0.1:52596': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52539', 'now': 1724206390.8530564, 'metrics': {'task_counts': {'long-running': 1}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.5076727867126465, 'profile-duration': 0.0048067569732666016, 'latency': 0.009158134460449219}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 343, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 4284}, 'event_loop_interval': 0.021115181293893368, 'cpu': 38.7, 'memory': 2139447296, 'time': 1724206390.8461154, 'host_net_io': {'read_bps': 159124.6745864264, 'write_bps': 4883870.184333095}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 189306.7882544334}}, 'executing': {'_run-4afda04ff3bdaa754bd5b62dbef06233': 215.75105929374695}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.865 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.886 | distributed.core | Message from 'tcp://127.0.0.1:51877': {'op': 'identity', 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.887 | distributed.core | Calling into handler identity | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.918 | distributed.core | Message from 'tcp://127.0.0.1:52831': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:51874', 'now': 1724206390.9042845, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.523423433303833, 'latency': 0.021486997604370117}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 239, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 6347}, 'event_loop_interval': 0.02021289825439453, 'cpu': 0.0, 'memory': 770019328, 'time': 1724206390.4189193, 'host_net_io': {'read_bps': 151288.7683047524, 'write_bps': 4695936.671300749}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 3623228.537837676}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.921 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:10.979 | distributed.core | Message from 'tcp://127.0.0.1:52993': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:52968', 'now': 1724206390.955338, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.48592066764831543, 'latency': 0.01643824577331543}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 189, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 3306}, 'event_loop_interval': 0.02009161313374837, 'cpu': 6.2, 'memory': 673681408, 'time': 1724206390.934552, 'host_net_io': {'read_bps': 159527.44216730804, 'write_bps': 4941169.436311122}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:10.979 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822
DEBUG 2024-08-20T22:13:11.014 | distributed.core | Message from 'tcp://127.0.0.1:54092': {'op': 'heartbeat_worker', 'address': 'tcp://127.0.0.1:54089', 'now': 1724206391.0100653, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'latency': 0.012102603912353516, 'tick-duration': 0.5027740001678467}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 59}, 'event_loop_interval': 0.019717864990234375, 'cpu': 0.0, 'memory': 421269504, 'time': 1724206390.5082853, 'host_net_io': {'read_bps': 142699.76909649736, 'write_bps': 4534095.129038394}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 3786738.697029174}}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True} | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 760
DEBUG 2024-08-20T22:13:11.017 | distributed.core | Calling into handler heartbeat_worker | 2260 | MainProcess | IO loop | 13880 | core | _handle_comm | 822

Hi @templiert,

Thanks for these detailed post and following messages. There is a lot to grasp into them, and I have to admit it is hard to give you an answer.

I’ve never heard of that, there is probably a cause, and the access to the network storage is indeed a good lead. This kind of storage (like NFS), often freeze and cn deadlock many applications.

So it Means the Scheduler process is deadlocked, which is really unusual. How many tasks are you submitting at that time?

I don’t see anything, it is blocked on a communication. Maybe more a ntework issue than storage? But Workers and Scheduler/Client are on the same machine if I understood correctly…

Unfortunately, I don’t know.

I just spotted something I missed in your dashboard screenshot: The total available memory and memory per worker looks very low!! And why are the bars Blue if they completly fill the dashboard? How did you setup your LocalCluster? Worker freezing often means memory problems… Combine that with tasks from tasks…

Thank you @guillaumeeb.

That was my impression too from reading online.

Thanks, that is useful to hear.

Here is a 8-min screencast of a few dashboards during nominal operation.
Every ~5 seconds I start a task that in turn launches a batch of about 200x tasks.
Every ~5 minutes there are a few batches of about 2000x tasks each.
There used to be many more tasks every 5 minutes but I deactivated non-critical tasks due to the deadlocks.

Correct: it is a local distributed process-based client on a single machine.

You read that from the “bytes stored per worker” panel I assume. Does this panel not scale over time depending on the load?
Ah, I see in the docs example that the axes limits are constant there. Mine must scale because I use memory_limit=0. The docs say here If None or 0, no limit is applied. I believe it is still OK?

PARAMS_PROCESS = dict(
    name="process",
    n_workers=8,
    threads_per_worker=4,
    memory_limit=0,
    processes=True,
    host=cfg.localhost,
    scheduler_port=cfg.pc.workstation.dask.process.scheduler_port,
    dashboard_address=cfg.pc.workstation.dask.process.dashboard_address,
)
with (
	LocalCluster(**PARAMS_PROCESS) as cluster_process,
	Client(cluster_process, name="process", set_as_default=True),
):
	...

OK, good to know. Nevertheless, I have never seen the total memory used at more than ~60% of the available 128GB on the machine.

Today by chance I caught another of these benign worker deaths, characterized by this ~1 min long deadlock at 86/91 task completion, see dashboard picture below, similar to the one in the original post. These often are mild: the worker dies and the lost tasks are re-run without issue. I believe nevertheless that they are linked to the complete deadlock, because these did not occur when the complete deadlock was not present.

Here is another milder example of the symptom that batches of tasks hang at 95% completion. In this 2-min dashboard screencast, I see batches of tasks hang at 86/91 for a few seconds during the first minute. In the second minute, this does not happen any more.

I have since also tried:

  • deactivate work stealing
  • flag all tasks with submit/map with pure=False

but I am not hopeful as I still see the frequent benign worker deaths. I am probably heading for another deadlock soon, the search continues…

Thanks again Guillaume, very useful to bounce ideas here.

Looking a bit at your screecast, I observe two things:

  • Most critical: Your tasks are really short!! Do you really need all theses tasks? Do you really need Dask? Wouldn’t it be possible to batch some of those tasks? They are terminating in a few millisecond, it’s really short for Dask.
  • Do you really need to launch task from tasks? It’s making somewhat the Dashboard fail a bit.

OK, so yes, this should be OK. I would still recommend to set a limit, bur you probably have enough memory.

Yes, it seems one of you Worker is blocked for some reason for a minute or so. This might be due to tasks from tasks deadlock…

1 Like

Thanks a lot again @guillaumeeb.

TLDR; I think I got rid of the deadlocks (deadlock-free for the last 5 days): I batched many of the small tasks into larger ones.

You are right. Over time I optimized my tasks, and I lost track of this obvious constraint, mentioned in the docs:

For optimal performance, task durations should be greater than 10-100ms

Yes: I need to control asynchronicity with Futures, and dask offers much more than the stdlib concurrent.futures (clients, diagnostics, priorities, scalability).
The screencasts showed the application in safe mode, with reduced computations to try to avoid deadlocks. In normal mode, more computations are run.

It currently is a crucial point in my architecture: the main thread performs time-critical operations, and asynchronously submits agents to a process-based client to perform tasks. Agents may perform many tasks themselves.

I cannot find the reference, but I remember a dask team member mentioning that launching tasks from tasks makes the task dashboard expand vertically, and that it is a known, benign issue.

Yes it is. I made two crucial changes:

  1. specify chunks in xr.open_zarr(..., chunks=xxx)

A few months ago I re-chunked my zarr-formatted xarray into smaller chunks, and I overlooked the consequence that it led to many more smaller dask tasks.
For some small data loads I simply got rid of dask with chunks=None.
For larger data loads I fine-tuned the chunking with batching along some dimensions, e.g. chunks={"my_dimension": -1}.

  1. I bagged my futures whenever possible

I changed small future tasks created by

results = client.gather(client.map(func, items, priority=priority))

into

results = (
    bag.from_sequence(items, partition_size=xxx)
    .map(func)
    .compute(scheduler=client, priority=priority)
)

It led to longer tasks. The performance also often increased, likely due to much less scheduling overhead.

I believe these are these two changes that got rid of the deadlocks.

1 Like