Hello!
We have a task graph containing a computationally expensive task that holds the GIL. The GIL-holding method is third-party code, which we have no control over. While running a LocalCluster
config such as
LocalCluster(n_workers=3, processes=True, threads_per_worker=1)
this has led to workers failing to heartbeat and being closed:
distributed.scheduler - WARNING - Worker failed to heartbeat within 4 seconds. Closing: <WorkerState … >
One solution has been to simply give the workers an indefinite time to live dask.config.set({"distributed.scheduler.worker-ttl": None})
. This works, but seems not-ideal.
As a second approach we attempted to have each worker start a process-pool, as suggested in this video. Here is a minimal example:
import dask
from dask.distributed import (
LocalCluster,
Client,
WorkerPlugin,
wait,
performance_report
)
from loky import ProcessPoolExecutor
import ctypes
def test_heartbeat():
"""Temp test."""
class AddProcessPool(WorkerPlugin):
def setup(self, worker):
worker.executors["processes"] = ProcessPoolExecutor(max_workers=2)
@dask.delayed
def load(x: int):
libc = ctypes.PyDLL(None)
libc.sleep(x)
return x
hold_gil_sleep = load(16)
with dask.config.set(
{"distributed.worker.daemon": False,
"distributed.client.heartbeat": "0.5s",
"distributed.scheduler.worker-ttl": "4s"}
):
with LocalCluster(processes=True, n_workers=3, threads_per_worker=1) as cluster:
with Client(cluster) as c:
c.register_worker_plugin(AddProcessPool())
with dask.annotate(executor="processes"):
with performance_report(filename="dask-report.html"):
data = c.compute(hold_gil_sleep)
wait(data)
print(f"slept and held GIL for {data.result()} seconds!")
if __name__=="__main__":
test_heartbeat()
This example is attempting to mimic our use-case, in which a Delayed
task-graph is computed, and the PyDLL sleep serves as the GIL-holding function. This script should succeed as is and fail when the dask.annotate()
line is commented out.
However, in our implementation this strategy still results in the above “Worker failed to heartbeat” error, which is puzzling. In our application the cluster is basically set up in one module and called from another.
- Can anyone think of any contextual factors that would cause this strategy to fail?
- Are there introspection tools that I can use to verify the workers are heartbeating on a different process than the one running the GIL-holding task?
- Does anyone have suggestions for an alternative strategy?
Thanks!