FutureCancelledError: scheduler-connection-lost due to high load?

Hello,

im using Dask Distributed Version 2024.12.0 to distribute work on a single Linux machine to multiple processes.
The machine has 8 cores.
The worker function is kind of long running (extracting text from large PDFs and RegEx-ing through it multiple times) - that can take seconds or even minutes for larger files.

I decided to use submit() and futures which looks (simplified) as follows:

with Client(processes=True, n_workers=8, threads_per_worker=1) as client:

    futures: list[Future] = []
    for submit_task_data in tasks:

        futures.append(
            client.submit(
                my_function,
                submit_task_data,
                pure=False,
                key=submit_task_data.my_id,
                retries=None)
        )
    
    ac = as_completed(futures)
    for future in ac:
        f = cast(Future, future)
        print(f"status: {f.status}")

        try:
	    my_result = future.result()

        except Exception as e:
            print(e)

When my workers get really busy (CPU load on all cores ~100%), Dask sometimes cancels my futures with multiple exceptions like this:

distributed.client.FutureCancelledError: ... cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.

Afterwards, the “cluster” does not heal itself.
I had to kill the client process and start again.

Some time ago, I had similar problems, where I suspected that the worker processes were so busy,
that they did not have time to respond to heartbeat from the Scheduler (Nanny?).
So I sprinkled some calls to time.sleep(0.05) between longer running code chunks on the worker,
which seemed to have helped.

That kind of innoculation obviously did not help to prevent the above problem.
I’m not sure between which participants the connection-losses occur.
Is it a connection beween the Client and the Scheduler?

How could I prevent this problem?
Should I reduce the worker count by 1 so that my the client process gets enough CPU cycles to keep the conenction to the Scheduler alive?
Should I try to throttle the load on each Worker so that the CPU utilization goes < 100%?

Is there a chance to “repair” the connection when I get such FutureCancelledErrors?
Whould client.restart() help?

Best regards,

Bernd

Hi @bhofner, welcome to Dask Discourse forum!

It looks like so from your logs, and yes, this is probably some resources saturation problem. What would help would be a complete stacktrace of the error to be sure from where it is raised.

I think this would be appropriate to try. Scheduler and Client also need CPU cycles to behave correctly. So if your workers are saturating every resources, they might not be able to communicate properly.

You can also try to tweak some communications timeout, but that might not be a proper solution.

Hello Guillaume,

thanks for the reply!

The stacktrace is quite short:

job_regex_extraktion.py", line 170, in extract
     Optional[Police]] = future.result()

File "/root/.local/lib/python3.10/site-packages/distributed/client.py", line 402, in result
     return self.client.sync(self._result, callback_timeout=timeout)
File "/root/.local/lib/python3.10/site-packages/distributed/client.py", line 418, in _result
     raise exception

distributed.client.FutureCancelledError: 4dcd7611-229c-4ccb-8c7c-f5e8c07fc5f0#ed1cee63-5074-4a89-96f3-a169dc2354cd cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.

Regarding the timeout settings:
I also had that reflex, but was not sure which setting would be the right one.

What kind of data are you passing as argument? (i.e., what is submit_task_data?) You define the key as the argument’s ID, so it’s some sort of database object or just a path reference to the PDF files?

Hello Raphael,

the PDF files are opened in the worker, only pathnames and some metadata are passed as a parameter, using a python dataclass.

The same goes for the result: it’s a python dataclass with the extracted attributes, shouldn’t be really big.

Why are you asking? Could the error also be caused by too large payloads transferred to/from the Worker? Could too excessive logging also be a cause?

Best regards,

Bernd

I was asking because I found that SQLAlchemy objects, despite being light, can induce unexpectedly high load on the scheduler. I received similar errors to yours while dealing with this issue. It seems like they are slow to pickle, which becomes an issue with hundreds of cores, as I was dealing with. However, if you’re just using Python classes to hold the data, it should work fine. You could still check how long Dask takes to pickle your payloads, it might be helpful.

Best of lucks!

Thanks for that hint, Raphael.
I’ll check if I can lighten the pickle-load.
Might be anyway a good idea to move storing the extracted data in the DB
inside the worker function instead of passing it to the scheduler.

Do you have anything above that?

I’m not either, maybe browsing Configuration — Dask documentation will give some hindsights.

Did you try leaving one core free for Client and Scheduler?

Hi Guillaume,

there’s nothing Dask-specific in the stacktrace above that.
I’ll go with reducing the number of workers by one to #vCPUs - 1 to give the process running the client and the scheduler more air to breathe.