TimeoutError in distributed.nanny causing gRPC server crash after prolonged analysis

Description
We are experiencing TimeoutError exceptions in our Dask-based gRPC server, which eventually cause the server to crash. This issue arises after running an analysis for around 5 hours, followed by a test case that starts and then cancels analysis.

distributed.nanny - WARNING - Worker process still alive after 4.0 seconds, killing

distributed.nanny - WARNING - Worker process still alive after 4.0 seconds, killing

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x0000011908B03760>>, <Task finished name='Task-4944025' coro=<SpecCluster._correct_state_internal() done, defined at site-packages\distributed\deploy\spec.py:346> exception=TimeoutError()>)

Traceback (most recent call last):
File "lib\asyncio\tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
...
Traceback (most recent call last):
File "lib\asyncio\tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
...
Traceback (most recent call last):
File "lib\runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "lib\runpy.py", line 86, in _run_code
exec(code, run_globals)
File "server.py", line 369, in
sys.exit(main(sys.argv))
File "server.py", line 343, in main
with serve(conf) as context:
File "lib\contextlib.py", line 142, in **exit**
next(self.gen)
File "server.py", line 233, in serve
File "lib\site-packages\distributed\deploy\cluster.py", line 537, in **exit**
aw = self.close()
File "lib\site-packages\distributed\deploy\spec.py", line 293, in close
aw = super().close(timeout)
File "lib\site-packages\distributed\deploy\cluster.py", line 223, in close
return self.sync(self._close, callback_timeout=timeout)
File "lib\site-packages\distributed\utils.py", line 358, in sync
return sync(
File "lib\site-packages\distributed\utils.py", line 434, in sync
raise error
File "lib\site-packages\distributed\utils.py", line 408, in f
result = yield future
File "lib\site-packages\tornado\gen.py", line 767, in run
value = future.result()
File "lib\site-packages\distributed\deploy\spec.py", line 448, in _close
await self._correct_state()
File "lib\site-packages\distributed\deploy\spec.py", line 359, in _correct_state_internal
await asyncio.gather(*tasks)
File "lib\site-packages\distributed\nanny.py", line 623, in close
await self.kill(timeout=timeout, reason=reason)
File "lib\site-packages\distributed\nanny.py", line 400, in kill
await self.process.kill(reason=reason, timeout=timeout)
File "lib\site-packages\distributed\nanny.py", line 879, in kill
await process.join(max(0, deadline - time()))
File "lib\site-packages\distributed\process.py", line 330, in join
await wait_for(asyncio.shield(self._exit_future), timeout)
File "lib\site-packages\distributed\utils.py", line 1961, in wait_for
return await asyncio.wait_for(fut, timeout)
File "lib\asyncio\tasks.py", line 458, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

Steps Taken to Troubleshoot:
1. Increased nanny timeouts - Set the following Dask configuration options: dask.config.set({ 'distributed.nanny.timeouts.startup': '60s', 'distributed.nanny.timeouts.connect': '60s', 'distributed.nanny.timeouts.terminate': '60s', 'distributed.worker.memory.terminate': False, })

2. Resource Monitoring - At the time of the crash about 40% of RAM and 50% CPU is in use

3. Logging - Added worker logs to the cluster:

client = distributed.Client(self)
plugin = WorkerLoggingPlugin(log_dir)
client.register_plugin(plugin, name='worker-logging-plugin')

class WorkerLoggingPlugin(WorkerPlugin):
    def __init__(self, log_dir):
        self.log_dir = log_dir

    def setup(self, worker):
        os.makedirs(self.log_dir, exist_ok=True)
        safe_worker_address = worker.address.replace(':', '_').replace('/', '_')

        # Set up logging for each worker
        log_file = os.path.join(self.log_dir, f"worker-{safe_worker_address}.log")
        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger = logging.getLogger(f"distributed.worker.{worker.address}")
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)

        self.logger = logger
        self.logger.info("Worker setup complete")

    def transition(self, key, start, finish, *args, **kwargs):
        self.logger.info(f"Task {key} transitioned from {start} to {finish}")
        if finish == "error":
            self.logger.error(f"Task {key} resulted in an error")

    def teardown(self, worker):
        # Clean up the logger
        self.logger.info("Worker teardown initiated")
        logger = logging.getLogger(f"distributed.worker.{worker.address}")
        for handler in logger.handlers:
            logger.removeHandler(handler)
            handler.close()

Nothing valuable seems to be logged in any of the workers during the time. About 5 seconds before the crash the 3 workers logged:

Worker 1: 2024-07-16 20:40:21,046 - distributed.worker.tcp://127.0.0.1:54297 - INFO - Task _run-594fb4b6-7bcc-4d79-b05b-4b8d6a9f762b transitioned from long-running to released
2024-07-16 20:40:21,046 - distributed.worker.tcp://127.0.0.1:54297 - INFO - Task ('run_protocol_on_fov-d30689b1c999798cdcf5f4dbafb4fb7d', 0) transitioned from executing to released
2024-07-16 20:40:21,046 - distributed.worker.tcp://127.0.0.1:54297 - INFO - Task ('from_sequence-44f325bf1aee8a5855fda4f6e3e600c3', 0) transitioned from memory to released
2024-07-16 20:40:21,053 - distributed.worker.tcp://127.0.0.1:54297 - INFO - Worker teardown initiated

Worker 2: 2024-07-16 20:40:21,046 - distributed.worker.tcp://127.0.0.1:54292 - INFO - Task ('from_sequence-44f325bf1aee8a5855fda4f6e3e600c3', 15) transitioned from memory to released
2024-07-16 20:40:21,046 - distributed.worker.tcp://127.0.0.1:54292 - INFO - Task ('run_protocol_on_fov-d30689b1c999798cdcf5f4dbafb4fb7d', 12) transitioned from executing to released
2024-07-16 20:40:21,046 - distributed.worker.tcp://127.0.0.1:54292 - INFO - Task ('from_sequence-44f325bf1aee8a5855fda4f6e3e600c3', 12) transitioned from memory to released
2024-07-16 20:40:21,055 - distributed.worker.tcp://127.0.0.1:54292 - INFO - Worker teardown initiated

Worker 3: 2024-07-16 20:40:21,047 - distributed.worker.tcp://127.0.0.1:54284 - INFO - Task ('run_protocol_on_fov-d30689b1c999798cdcf5f4dbafb4fb7d', 10) transitioned from executing to released
2024-07-16 20:40:21,047 - distributed.worker.tcp://127.0.0.1:54284 - INFO - Task ('from_sequence-44f325bf1aee8a5855fda4f6e3e600c3', 10) transitioned from memory to released
2024-07-16 20:40:21,053 - distributed.worker.tcp://127.0.0.1:54284 - INFO - Worker teardown initiated

4. Shorter Analysis Duration - If we run analysis for 30 min prior to calling the testcase that starts analysis then cancels it, we only get this logged in the server logs and the server stays up.
distributed.nanny - WARNING - Worker process still alive after 4.0 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 4.0 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 4.0 seconds, killing

Potential Cause
The problem seems related to the distributed.nanny handling of worker processes, potentially compounded by Tornado’s IOLoop handling of task cancellations and timeouts. Seems as if the nanny is trying to reset the cluster back to a healthy state and is timing out for some reason doing it.

Request for Assistance
Is this issue related to Tornado or Dask Distributed?
Are there any known issues or bug fixes related to TimeoutError in the nanny process?
Any additional configuration or debugging steps that can be recommended?

Additional Information
Cluster configuration:

def __init__(self, cluster_config: config.ClusterConfig, security_config: config.SecurityConfig):
        """
        create local Scheduler and Workers

        This creates a "cluster" of a scheduler and workers running on the local machine. A worker named 'gpu' is
        created for dedicated access to the GPU.

        Args:
            cluster_config: cluster configuration used to initialize this clusters worker configuration
            security_config: security configuration used to initialize TLS if configured
        """
        security = _get_security(security_config)
        protocol = "tls://" if security.require_encryption else 'tcp://'
        host = "127.0.0.1"
        level = logging.WARN
        self.status = None
        self.processes = True

        log_dir = os.path.join(tempfile.gettempdir(), 'logging', 'Dask', 'logs')
        os.makedirs(log_dir, exist_ok=True)

        worker_kwargs = {
            "host": host,
            "nthreads": 18,
            "services": {},
            "dashboard_address": None,
            "dashboard": False,
            "interface": None,
            "protocol": protocol,
            "security": security,
            "silence_logs": level,
            "memory_limit": 0,
        }

        worker_kwargs['env'] = {
            'DASK_WORKER_LOG_DIRECTORY': log_dir,
        }

        scheduler = {
            "cls": distributed.Scheduler,
            "options": toolz.merge(
                dict(
                    host=host,
                    services={},
                    service_kwargs=None,
                    security=security,
                    port=0,
                    interface=None,
                    protocol=protocol,
                    dashboard=True,
                    dashboard_address=":8787",
                    blocked_handlers=None,
                ),
                {},
            ),
        }

        no_gpu = nvml.device_get_count() <= 0

        cpu_worker = {"cls": distributed.Nanny, "options": dict(resources=dict(cpu=1), **worker_kwargs)}
        if no_gpu:
            gpu_worker = None
        else:
            gpu_worker = copy.deepcopy(cpu_worker)
            gpu_worker['options']['resources']['gpu'] = 1
        listener_worker = {"cls": distributed.Nanny, "options": worker_kwargs}
        if no_gpu:
            workers = dict({f'cpu{i}': cpu_worker for i in range(cluster_config.n_workers - 1)},
                           listener0=listener_worker)
        else:
            workers = dict({f'cpu{i}': cpu_worker for i in range(cluster_config.n_workers - 1)},
                           gpu0=gpu_worker,
                           listener0=listener_worker)

        warnings.warn(f'disabling dask fuse optimization that breaks annotations due to '
                      f'https://github.com/dask/dask/issues/7036')
        warnings.warn(f'disabling daemonic workers since cpu workers are spawning subprocesses')

        temp_dir = os.path.join(tempfile.gettempdir(), 'logging', 'Dask')
        os.makedirs(temp_dir, exist_ok=True)

        dask.config.set({
            'optimization.fuse.active': False,
            'distributed.scheduler.allowed-failures': 0,
            'distributed.worker.daemon': False,
            'temporary-directory': temp_dir,
        })

        super().__init__(
            name=None,
            scheduler=scheduler,
            workers=workers,
            worker=None,
            loop=None,
            asynchronous=False,
            silence_logs=level,
            security=security,
            scheduler_sync_interval=1,
        )

        # Register the WorkerLoggingPlugin
        client = distributed.Client(self)
        plugin = WorkerLoggingPlugin(log_dir)
        client.register_plugin(plugin, name='worker-logging-plugin')

Environment:
Dask version: 2024.5.1
Distributed version: 2024.5.1
Tornado version: 6.4
Python version: 3.10
Operating System: Windows 11
Install method: pip

Hi @Defqon7, welcome to Dask community!

Thanks for this detailed post and the search for potential solutions. Unfortunately, these kinds of edge behavior are really hard to debug. I admit I don’t have any suggestion on how to fix it. Maybe @fjetter or @crusaderky?