Dask workers killed because of heartbeat fail

Hi everyone,

I’m currently using dask to parallelize some pdal pipelines but I face a problem.

I’m using dask distributed to parallelize and on little treatments, my code works perfectly. But as soon as try to process bigger files, my workers are killed. Here is the behaviour of my workers before they are killed :

2022-06-28 16:57:23,145 - distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <WorkerState 'tcp://127.0.0.1:51825', name: 3, status: running, memory: 0, processing: 17>
2022-06-28 16:57:23,145 - distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <WorkerState 'tcp://127.0.0.1:51849', name: 4, status: running, memory: 0, processing: 25>
Traceback (most recent call last):
  File "D:\applications\miniconda3\envs\pdal_env\lib\runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "D:\applications\miniconda3\envs\pdal_env\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "D:\applications\miniconda3\envs\pdal_env\Scripts\pdal-parallelizer.exe\__main__.py", line 7, in <module>
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\click\core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\click\core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\click\core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\click\core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\click\core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "D:\applications\miniconda3\envs\pdal_env\lib\site-packages\pdal_parallelizer\__main__.py", line 42, in process_pipelines
    dask.compute(*delayed)
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\dask\base.py", line 602, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\distributed\client.py", line 3000, in get

    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\distributed\client.py", line 2174, in gather
    return self.sync(
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\distributed\utils.py", line 336, in sync
    return sync(
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\distributed\utils.py", line 403, in sync
    raise exc.with_traceback(tb)
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\distributed\utils.py", line 376, in f
    result = yield future
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\tornado\gen.py", line 762, in run
    value = future.result()
  File "C:\Users\gguidirontani\AppData\Roaming\Python\Python39\site-packages\distributed\client.py", line 2037, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('process-1b7e35e7-e5b6-4f4b-a648-186d64f7f044', <WorkerState 'tcp://127.0.0.1:51849', name: 4, status: closed, memory: 0, processing: 25>)
2022-06-28 16:57:27,304 - distributed.nanny - WARNING - Worker process still alive after 3.9999965667724613 seconds, killing
2022-06-28 16:57:27,304 - distributed.nanny - WARNING - Worker process still alive after 3.9999969482421878 seconds, killing
2022-06-28 16:57:27,306 - distributed.nanny - WARNING - Worker process still alive after 3.9999959945678714 seconds, killing

It seems the worker was killed because it was unable to heartbeat within 300 seconds. I read this documentation and it helps me to understand a little better. So, I guess my worker is unable to heartbeat because he is processing some treatments that take more than 300 seconds. Tell me if I’m wrong.

My first idea was to increase the time between heartsbeats so my worker can finish its treatment and then heartbeat. But I guess it will not fix the problem when I will process much more bigger data.

Did someone can explain to me the right way to avoid this problem ?
Thank you in advance ! :smile:
Clément

1 Like

@ClementAlba I’ll keep looking into this. Would you be able to share a minimal version of your code that I can use to reproduce this locally?

I chatted with @ian, and we think pdal might be holding onto the event loop. The solution here would be to use threadpool instead of a processpool, and here is a video that might help: Advanced Dask Customization | Customizing Dask Workers | Matt Rocklin - YouTube

Hi @pavithraes and thanks for your responses ! I finally solve the problem with changing some dask configuration.

By adding this line :

from dask import config as cfg

cfg.set({'distributed.scheduler.worker-ttl': None})

You can disable the heartbeats of the dask workers.

1 Like