Client connection to scheduler fails after 45s

I have dask gateway deployed in kubernetes for the past few months. Its been used by quite a few teams but one of my users came back with a concern that the client was exiting during usage with no error or a client connection failed error. I have been able to reproduce it but I’m scratching my head as to where its coming from. I’ve tried increasing the LB and checking versions and all the versions seem to match for the dask cluster. Anyone have any ideas?

Here are our versions
{‘scheduler’: {‘host’: {‘python’: ‘3.7.13.final.0’, ‘python-bits’: 64, ‘OS’: ‘Linux’, ‘OS-release’: ‘5.4.209-116.367.amzn2.x86_64’, ‘machine’: ‘x86_64’, ‘processor’: ‘x86_64’, ‘byteorder’: ‘little’, ‘LC_ALL’: ‘en_US.UTF-8’, ‘LANG’: ‘en_US.UTF-8’}, ‘packages’: {‘python’: ‘3.7.13.final.0’, ‘dask’: ‘2022.02.0’, ‘distributed’: ‘2022.02.0’, ‘msgpack’: ‘1.0.4’, ‘cloudpickle’: ‘2.2.0’, ‘tornado’: ‘6.2’, ‘toolz’: ‘0.12.0’, ‘numpy’: ‘1.19.5’, ‘pandas’: ‘1.1.5’, ‘lz4’: None, ‘blosc’: None}}, ‘workers’: {}, ‘client’: {‘host’: {‘python’: ‘3.7.13.final.0’, ‘python-bits’: 64, ‘OS’: ‘Linux’, ‘OS-release’: ‘5.4.0-1092-aws’, ‘machine’: ‘x86_64’, ‘processor’: ‘x86_64’, ‘byteorder’: ‘little’, ‘LC_ALL’: ‘en_US.UTF-8’, ‘LANG’: ‘C.UTF-8’}, ‘packages’: {‘python’: ‘3.7.13.final.0’, ‘dask’: ‘2022.02.0’, ‘distributed’: ‘2022.02.0’, ‘msgpack’: ‘1.0.4’, ‘cloudpickle’: ‘2.2.0’, ‘tornado’: ‘6.2’, ‘toolz’: ‘0.12.0’, ‘numpy’: ‘1.19.5’, ‘pandas’: ‘1.1.5’, ‘lz4’: None, ‘blosc’: None}}}

One thing I noted in the scheduler logs was this but I’m not sure why it happens

(‘INFO’,
‘distributed.scheduler - INFO - Remove client Client-7626f7ae-8166-11ed-bc46-126628f3adf1’),
(‘DEBUG’,
‘distributed.scheduler - DEBUG - Client Client-7626f7ae-8166-11ed-bc46-126628f3adf1 releases keys: ’),
(‘DEBUG’,
‘distributed.scheduler - DEBUG - Finished handling client Client-7626f7ae-8166-11ed-bc46-126628f3adf1’),
(‘INFO’,
‘distributed.scheduler - INFO - Close client connection: Client-7626f7ae-8166-11ed-bc46-126628f3adf1’),
(‘INFO’,
‘distributed.scheduler - INFO - Receive client connection: Client-7626f7ae-8166-11ed-bc46-126628f3adf1’),
(‘DEBUG’,

@guillaumeeb Do you have any idea on the above?

The dask, distributed, python, and rest of the library versions are quite old. Is there a chance you can upgrade the version of the packages and see if the problem persist?

There is a recent version of dask-gateway as well.

cc: @jacobtomlinson maybe you have something else you can add?

1 Like

Unfortunately, we are a bit stuck due to wanting to maintain the same python version(3.7) over our monorepo.

Hi @amcnair,

Could you give more inputs here? Do you use daskhub or only dask-gateway, what is your client setup? What is you reproducer of the problem, which error occurs when you see one? What actually means the client is exiting?

@guillaumeeb
We only use dask gateway directly. We have three ways to run clients, a kubeflow instance that runs jupyter notebooks, running jupyter notebooks on the labtop directly or python scripts. For the reproducer we are using jupyterlab on our labtop top run the script. The error we get is this


CancelledError Traceback (most recent call last)
/tmp/ipykernel_27596/1083132163.py in
13 # Runs function on local or remote Dask.
14 df = pm.run_function_with_custom_callables(
—> 15 dataset.parent_dataset, simple_function, tuple()
16 )
17

~/.cache/bazel/_bazel_amcnair/38ccd7eed5f9a8a018612d0d8493e710/execroot/main/bazel-out/k8-fastbuild/bin/experimental/xyan/dask/debug_dask.runfiles/main/planning/ml/ranker/analysis/parallelization/run_function_on_dataset_rows.py in run_function_with_custom_callables(self, dataset, function, scriptable_callables, subset_of_rows_to_run_on)
273
274 return self._run_internal(
→ 275 dataset, subset_of_rows_to_run_on, read_rows_in_range_and_run_callables
276 )

~/.cache/bazel/_bazel_amcnair/38ccd7eed5f9a8a018612d0d8493e710/execroot/main/bazel-out/k8-fastbuild/bin/experimental/xyan/dask/debug_dask.runfiles/main/planning/ml/ranker/analysis/parallelization/run_function_on_dataset_rows.py in _run_internal(self, dataset, subset_of_rows_to_run_on, row_iteration_function)
135 print(“before try”)
136 try:
→ 137 output_dicts = self.client.compute(tasks, sync=True)
138 except TypeError as e:
139 if “pickle _thread.lock” in str(e):

~/.cache/bazel/_bazel_amcnair/38ccd7eed5f9a8a018612d0d8493e710/execroot/main/bazel-out/k8-fastbuild/bin/experimental/xyan/dask/debug_dask.runfiles/internal_pip_dependency_distributed_3_7/pypi__distributed/distributed/client.py in compute(self, collections, sync, optimize_graph, workers, allow_other_workers, resources, retries, priority, fifo_timeout, actors, traverse, **kwargs)
3207
3208 if sync:
→ 3209 result = self.gather(futures)
3210 else:
3211 result = futures

~/.cache/bazel/_bazel_amcnair/38ccd7eed5f9a8a018612d0d8493e710/execroot/main/bazel-out/k8-fastbuild/bin/experimental/xyan/dask/debug_dask.runfiles/internal_pip_dependency_distributed_3_7/pypi__distributed/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
2150 direct=direct,
2151 local_worker=local_worker,
→ 2152 asynchronous=asynchronous,
2153 )
2154

~/.cache/bazel/_bazel_amcnair/38ccd7eed5f9a8a018612d0d8493e710/execroot/main/bazel-out/k8-fastbuild/bin/experimental/xyan/dask/debug_dask.runfiles/internal_pip_dependency_distributed_3_7/pypi__distributed/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
308 else:
309 return sync(
→ 310 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
311 )
312

~/.cache/bazel/_bazel_amcnair/38ccd7eed5f9a8a018612d0d8493e710/execroot/main/bazel-out/k8-fastbuild/bin/experimental/xyan/dask/debug_dask.runfiles/internal_pip_dependency_distributed_3_7/pypi__distributed/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
374 if error:
375 typ, exc, tb = error
→ 376 raise exc.with_traceback(tb)
377 else:
378 return result

~/.cache/bazel/_bazel_amcnair/38ccd7eed5f9a8a018612d0d8493e710/execroot/main/bazel-out/k8-fastbuild/bin/experimental/xyan/dask/debug_dask.runfiles/internal_pip_dependency_distributed_3_7/pypi__distributed/distributed/utils.py in f()
347 future = asyncio.wait_for(future, callback_timeout)
348 future = asyncio.ensure_future(future)
→ 349 result = yield future
350 except Exception:
351 error = sys.exc_info()

~/.cache/bazel/_bazel_amcnair/38ccd7eed5f9a8a018612d0d8493e710/execroot/main/bazel-out/k8-fastbuild/bin/experimental/xyan/dask/debug_dask.runfiles/internal_pip_dependency_tornado_3_7/pypi__tornado/tornado/gen.py in run(self)
767
768 try:
→ 769 value = future.result()
770 except Exception:
771 exc_info = sys.exc_info()

CancelledError:

So in this case it looks like the client failing to find a connection to the scheduler and therefore the computation is cancelled. We are using a homebrew parallelization manager for this along with some in house custom classes for the data so I don’t know how straightforward it will be to try and extract a representative sample.

When reading your answer, I’m not sure of something: do you manage to submit computations to the Dask cluster with the client on your local laptop? Just a simple example as in Custom Workloads with Futures — Dask Examples documentation?

I mean, does it work at some point or does it always fail?

We are able to submit workloads. It even works for smaller sized workloads doing the same thing as the failure case

It’s a bit hard to tell without a concrete example.

Do you have more outputs after the CancelError: string?

Did you see What causes dask job failure with CancelledError exception - Stack Overflow? I know it’s not the same setup, but maybe the same kind of mechanism is happening.

Maybe you could check if you get the same error with a submission of a task that just sleeps?