Hello,
im using Dask Distributed Version 2024.12.0 to distribute work on a single Linux machine to multiple processes.
The machine has 8 cores.
The worker function is kind of long running (extracting text from large PDFs and RegEx-ing through it multiple times) - that can take seconds or even minutes for larger files.
I decided to use submit() and futures which looks (simplified) as follows:
with Client(processes=True, n_workers=8, threads_per_worker=1) as client:
futures: list[Future] = []
for submit_task_data in tasks:
futures.append(
client.submit(
my_function,
submit_task_data,
pure=False,
key=submit_task_data.my_id,
retries=None)
)
ac = as_completed(futures)
for future in ac:
f = cast(Future, future)
print(f"status: {f.status}")
try:
my_result = future.result()
except Exception as e:
print(e)
When my workers get really busy (CPU load on all cores ~100%), Dask sometimes cancels my futures with multiple exceptions like this:
distributed.client.FutureCancelledError: ... cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.
Afterwards, the “cluster” does not heal itself.
I had to kill the client process and start again.
Some time ago, I had similar problems, where I suspected that the worker processes were so busy,
that they did not have time to respond to heartbeat from the Scheduler (Nanny?).
So I sprinkled some calls to time.sleep(0.05) between longer running code chunks on the worker,
which seemed to have helped.
That kind of innoculation obviously did not help to prevent the above problem.
I’m not sure between which participants the connection-losses occur.
Is it a connection beween the Client and the Scheduler?
How could I prevent this problem?
Should I reduce the worker count by 1 so that my the client process gets enough CPU cycles to keep the conenction to the Scheduler alive?
Should I try to throttle the load on each Worker so that the CPU utilization goes < 100%?
Is there a chance to “repair” the connection when I get such FutureCancelledErrors?
Whould client.restart() help?
Best regards,
Bernd