Running a cluster on an unreliable network

I have a somewhat peculiar issue: I’m running a Dask distributed cluster on a high-performance computing cluster (using dask-jobqueue/SLURM) and the network is unreliable. Depending on what other users are doing, the network seems to slow down so much that I get timeout errors:

  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Scheduler->Client local=tcp://10.80.11.32:41021 remote=tcp://10.80.11.32:45114> already closed.
2024-07-03 19:37:57,669 - distributed.scheduler - CRITICAL - Closed comm <BatchedSend: closed> while trying to write [{'op': 'event', 'topic': 'forwarded-log-record-forward-logging-<root>', 'event': (1720049877.6694033, {'name': 'goldrush', 'msg': 'Optimized TS for scan c83fd981_F-8-37_F-12-25 obtained (with lig)', 'args': None, 'levelname': 'INFO', 'levelno': 20, 'pathname': '/home/robidasr/GoldRush/goldrush/procedures.py', 'filename': 'procedures.py', 'module': 'procedures', 'exc_info': None, 'exc_text': None, 'stack_info': None, 'lineno': 423, 'funcName': 'xtb_opt_lig_ts', 'created': 1720049872.487778, 'msecs': 487.0, 'relativeCreated': 17211067.803382874, 'thread': 22615137691328, 'threadName': 'B3-190d30d3', 'processName': 'Dask Worker process (from Nanny)', 'process': 276519, 'label': 'C8-xtb_reopt_ts-7155d941-6b42-4f4e-9697-e743a482f9f6', 'asctime': '2024-07-03 19:37:52', 'worker': 'tcp://10.80.85.29:33159'})}]
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 6079, in send_all
    c.send(*msgs)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Scheduler->Client local=tcp://10.80.11.32:41021 remote=tcp://10.80.11.32:45114> already closed.
2024-07-03 19:37:58,571 - distributed.scheduler - CRITICAL - Closed comm <BatchedSend: closed> while trying to write [{'op': 'event', 'topic': 'forwarded-log-record-forward-logging-<root>', 'event': (1720049878.5717142, {'name': 'goldrush', 'msg': 'Job XtbOptJob of TS_80ea76a4_taut1, calc_activations=False', 'args': None, 'levelname': 'INFO', 'levelno': 20, 'pathname': '/home/robidasr/GoldRush/goldrush/jobs/xtb_jobs.py', 'filename': 'xtb_jobs.py', 'module': 'xtb_jobs', 'exc_info': None, 'exc_text': None, 'stack_info': None, 'lineno': 106, 'funcName': '_run', 'created': 1720049874.9193227, 'msecs': 919.0, 'relativeCreated': 5010268.465280533, 'thread': 23329196271296, 'threadName': 'B3-TS_80ea76a4_taut1', 'processName': 'Dask Worker process (from Nanny)', 'process': 2972237, 'label': 'B3-TS_80ea76a4_taut1-d65ed72a-4a8f-419a-888a-cf4fcda3694b', 'asctime': '2024-07-03 19:37:54', 'worker': 'tcp://10.80.85.46:40939'})}]
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 6079, in send_all
    c.send(*msgs)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Scheduler->Client local=tcp://10.80.11.32:41021 remote=tcp://10.80.11.32:45114> already closed.
2024-07-03 19:37:58,583 - distributed.scheduler - CRITICAL - Closed comm <BatchedSend: closed> while trying to write [{'op': 'event', 'topic': 'forwarded-log-record-forward-logging-<root>', 'event': (1720049878.583157, {'name': 'goldrush', 'msg': 'Intermediate TS_80ea76a4_taut1 has 1 imaginary frequencies', 'args': None, 'levelname': 'WARNING', 'levelno': 30, 'pathname': '/home/robidasr/GoldRush/goldrush/jobs/xtb_jobs.py', 'filename': 'xtb_jobs.py', 'module': 'xtb_jobs', 'exc_info': None, 'exc_text': None, 'stack_info': None, 'lineno': 121, 'funcName': '_run', 'created': 1720049874.919539, 'msecs': 919.0, 'relativeCreated': 5010268.681526184, 'thread': 23329196271296, 'threadName': 'B3-TS_80ea76a4_taut1', 'processName': 'Dask Worker process (from Nanny)', 'process': 2972237, 'label': 'B3-TS_80ea76a4_taut1-d65ed72a-4a8f-419a-888a-cf4fcda3694b', 'asctime': '2024-07-03 19:37:54', 'worker': 'tcp://10.80.85.46:40939'})}]
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 6079, in send_all
    c.send(*msgs)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Scheduler->Client local=tcp://10.80.11.32:41021 remote=tcp://10.80.11.32:45114> already closed.
2024-07-03 19:37:58,583 - distributed.scheduler - CRITICAL - Closed comm <BatchedSend: closed> while trying to write [{'op': 'event', 'topic': 'forwarded-log-record-forward-logging-<root>', 'event': (1720049878.5834103, {'name': 'goldrush', 'msg': 'Finishing XtbOptJob of TS_80ea76a4_taut1', 'args': None, 'levelname': 'INFO', 'levelno': 20, 'pathname': '/home/robidasr/GoldRush/goldrush/jobs/xtb_jobs.py', 'filename': 'xtb_jobs.py', 'module': 'xtb_jobs', 'exc_info': None, 'exc_text': None, 'stack_info': None, 'lineno': 158, 'funcName': '_run', 'created': 1720049874.9196095, 'msecs': 919.0, 'relativeCreated': 5010268.7520980835, 'thread': 23329196271296, 'threadName': 'B3-TS_80ea76a4_taut1', 'processName': 'Dask Worker process (from Nanny)', 'process': 2972237, 'label': 'B3-TS_80ea76a4_taut1-d65ed72a-4a8f-419a-888a-cf4fcda3694b', 'asctime': '2024-07-03 19:37:54', 'worker': 'tcp://10.80.85.46:40939'})}]
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 6079, in send_all
    c.send(*msgs)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Scheduler->Client local=tcp://10.80.11.32:41021 remote=tcp://10.80.11.32:45114> already closed.
2024-07-03 19:37:58,583 - distributed.scheduler - CRITICAL - Closed comm <BatchedSend: closed> while trying to write [{'op': 'event', 'topic': 'forwarded-log-record-forward-logging-<root>', 'event': (1720049878.5839353, {'name': 'goldrush', 'msg': 'Parsing the opt log of int s1_03fded2b_grad_xtb_opt', 'args': None, 'levelname': 'INFO', 'levelno': 20, 'pathname': '/home/robidasr/GoldRush/goldrush/procedures.py', 'filename': 'procedures.py', 'module': 'procedures', 'exc_info': None, 'exc_text': None, 'stack_info': None, 'lineno': 77, 'funcName': 'parse_xtb_opt', 'created': 1720049876.4668548, 'msecs': 466.0, 'relativeCreated': 6446509.414196014, 'thread': 22990455502528, 'threadName': 'B3-s1_03fded2b_grad', 'processName': 'Dask Worker process (from Nanny)', 'process': 815363, 'label': 'B3-s1_03fded2b_grad-b06fb020-57f1-4c70-8472-0c625beef4d1', 'asctime': '2024-07-03 19:37:56', 'worker': 'tcp://10.80.84.46:45341'})}]
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 6079, in send_all
    c.send(*msgs)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Scheduler->Client local=tcp://10.80.11.32:41021 remote=tcp://10.80.11.32:45114> already closed.
2024-07-03 19:38:00,139 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 5725, in add_client
    del self.client_comms[client]
        ~~~~~~~~~~~~~~~~~^^^^^^^^
KeyError: 'Client-worker-f5e10c9f-3987-11ef-8096-b8cb29b1b2dd'
Task exception was never retrieved
future: <Task finished name='Task-1856924' coro=<Server._handle_comm() done, defined at /home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/core.py:876> exception=KeyError('Client-worker-f5e10c9f-3987-11ef-8096-b8cb29b1b2dd')>
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 5725, in add_client
    del self.client_comms[client]
        ~~~~~~~~~~~~~~~~~^^^^^^^^
KeyError: 'Client-worker-f5e10c9f-3987-11ef-8096-b8cb29b1b2dd'
2024-07-03 19:38:07,018 - distributed.client - ERROR - 
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/tornado/iostream.py", line 861, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/tornado/iostream.py", line 1116, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TimeoutError: [Errno 110] Connection timed out

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/client.py", line 1556, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/comm/tcp.py", line 236, in read
    convert_stream_closed_error(self, e)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/comm/tcp.py", line 140, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://10.80.11.32:45114 remote=tcp://10.80.11.32:41021>: TimeoutError: [Errno 110] Connection timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/client.py", line 1331, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/client.py", line 1388, in _ensure_connected
    assert len(msg) == 1
           ^^^^^^^^^^^^^
AssertionError
2024-07-03 19:38:07,043 - distributed.client - ERROR - 
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/tornado/iostream.py", line 861, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/tornado/iostream.py", line 1116, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TimeoutError: [Errno 110] Connection timed out

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/client.py", line 1556, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/comm/tcp.py", line 236, in read
    convert_stream_closed_error(self, e)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/comm/tcp.py", line 140, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://10.80.11.32:45114 remote=tcp://10.80.11.32:41021>: TimeoutError: [Errno 110] Connection timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/client.py", line 1572, in _handle_report
    await self._reconnect()
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/client.py", line 1331, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/client.py", line 1388, in _ensure_connected
    assert len(msg) == 1
           ^^^^^^^^^^^^^
AssertionError
2024-07-03 19:38:13,126 - distributed.scheduler - CRITICAL - Closed comm <BatchedSend: closed> while trying to write [{'op': 'key-in-memory', 'key': 'C8-SP_TS_c83fd981_F-7-37_F-25-37-41dea610-54cd-41cd-9c67-13d44e858efe', 'type': b"\x80\x05\x95'\x00\x00\x00\x00\x00\x00\x00\x8c\x16goldrush.jobs.dft_jobs\x94\x8c\x08DftSpJob\x94\x93\x94."}]
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 6079, in send_all
    c.send(*msgs)
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Scheduler->Client local=tcp://10.80.11.32:41021 remote=tcp://10.80.11.32:49294> already closed.
2024-07-03 19:38:13,130 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 5725, in add_client
    del self.client_comms[client]
        ~~~~~~~~~~~~~~~~~^^^^^^^^
KeyError: 'Client-f057586e-394d-11ef-83fb-b8cb29ca4dc3'
Task exception was never retrieved
future: <Task finished name='Task-16' coro=<Server._handle_comm() done, defined at /home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/core.py:876> exception=KeyError('Client-f057586e-394d-11ef-83fb-b8cb29ca4dc3')>
Traceback (most recent call last):
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/home/robidasr/goldenv_3.11/lib/python3.11/site-packages/distributed/scheduler.py", line 5725, in add_client
    del self.client_comms[client]
        ~~~~~~~~~~~~~~~~~^^^^^^^^
KeyError: 'Client-f057586e-394d-11ef-83fb-b8cb29ca4dc3'

At this point, I’m desperate for any hack or ducttape fix that will stop this from crashing my whole cluster. I set distributed.comm.retry as 10 and distributed.comm.timeouts.connect at 30, but it’s not enough. Is there a way to make Dask block until the connection is recovered? Or is there a list of all timeouts that can be increased in the configuration file?

Thanks in advance for any suggestion!

Hi @RaphaelRobidas,

I remember an attempt to run Dask at big scale on Summit some years ago, which leads to network communications issues. The post here describes some configuration changes.Other parts of the thread might be useful.

You can also search in distributed configuration reference various timeouts that can be configured.

1 Like

Thanks a lot @guillaumeeb, very helpful! I’m trying this out and it does seem to help avoid some exceptions.

I still got cancelled futures with little helpful logging, leaving the cluster running idly without any task nor exception in the dashboard. I got a full cluster dump using client.dump_cluster_state(), but I’m not too sure what to look for in order to figure out the source of the issue. Can you suggests parts of the dump that would be very revealing as to the reason for this problem?

I’m not familliar with this kind of analysis. cc @crusaderky maybe.

Just to tie this loose end: I managed to resolve this issue in the end. I set timeouts to larger values as suggested above, I employed the Infiniband network interface for more reliable networking and most importantly split my code to use multiple processes on the scheduling node. It seems that I was running too many threads in the same process, which caused the process to slow down enough to drop connections sometimes, depending on the load.

To solve this, I split my main process into multiple processes using multiprocessing. With dask, it was important to use multiprocessing.set_start_method("spawn") and pass cluster.scheduler_address to each process in order to create independent Client objects in each process. This could explain the weird deadlock state I was observing. I also made sure not to use Python 3.11.9 due to the reported deadlock issue.

Thanks for the help!

1 Like