How to keep GIL-holding tasks from killing your workers?

Hello!

We have a task graph containing a computationally expensive task that holds the GIL. The GIL-holding method is third-party code, which we have no control over. While running a LocalCluster config such as

LocalCluster(n_workers=3, processes=True, threads_per_worker=1)

this has led to workers failing to heartbeat and being closed:

distributed.scheduler - WARNING - Worker failed to heartbeat within 4 seconds. Closing: <WorkerState … >

One solution has been to simply give the workers an indefinite time to live dask.config.set({"distributed.scheduler.worker-ttl": None}). This works, but seems not-ideal.

As a second approach we attempted to have each worker start a process-pool, as suggested in this video. Here is a minimal example:

import dask
from dask.distributed import (
     LocalCluster,
     Client,
     WorkerPlugin,
     wait,
     performance_report
)
from loky import ProcessPoolExecutor
import ctypes

def test_heartbeat():
    """Temp test."""
    class AddProcessPool(WorkerPlugin):
        def setup(self, worker):
            worker.executors["processes"] = ProcessPoolExecutor(max_workers=2)
    
    @dask.delayed
    def load(x: int):
        libc = ctypes.PyDLL(None)
        libc.sleep(x)
        return x
    hold_gil_sleep = load(16)

    with dask.config.set(
        {"distributed.worker.daemon": False, 
          "distributed.client.heartbeat": "0.5s", 
          "distributed.scheduler.worker-ttl": "4s"}
        ):
        with LocalCluster(processes=True, n_workers=3, threads_per_worker=1) as cluster:
            with Client(cluster) as c:
                c.register_worker_plugin(AddProcessPool())
                with dask.annotate(executor="processes"):
                    with performance_report(filename="dask-report.html"):
                        data = c.compute(hold_gil_sleep)
                        wait(data)
                        print(f"slept and held GIL for {data.result()} seconds!")

if __name__=="__main__":
    test_heartbeat()

This example is attempting to mimic our use-case, in which a Delayed task-graph is computed, and the PyDLL sleep serves as the GIL-holding function. This script should succeed as is and fail when the dask.annotate() line is commented out.

However, in our implementation this strategy still results in the above “Worker failed to heartbeat” error, which is puzzling. In our application the cluster is basically set up in one module and called from another.

  • Can anyone think of any contextual factors that would cause this strategy to fail?
  • Are there introspection tools that I can use to verify the workers are heartbeating on a different process than the one running the GIL-holding task?
  • Does anyone have suggestions for an alternative strategy?

Thanks!

1 Like

Hi @elleryames, welcome to Dask community!

Thanks a lot for this detailed post and the reproducible example. I can confirm that it works as is but fails without the annotate.

However, I’ve not much to say to help… In this github issue, it is said that it is unlikely that a computation hold the GIL for more than 5 minutes, the default worker-ttl. I understand that you have no control over this code, but you should really see if you cannot do anything about it. And do you know for a long a computation can hold the GIL?

Also, I don’t have any idea why this code is failing in your real use case. In the github issue above, the tool pyspy is mentioned. You could try to use it!

Thanks for the reply @guillaumeeb , as well as for linking the github issue (a good discussion) and the tip to try py-spy.

I will see if I can get py-spy running. I suppose I could also make use of GIL contention metric introduced in v2023.3.2 to better understand whether the issue is truly a GIL-holding one or something like the convoy effect.

1 Like

Just to follow up on why adding a ProcessPoolExecutor was not working in our application. Credit to my colleague for finding that within the dask.distributed.Worker.execute() the worker first inspects if the task it is to execute is a coroutine, before trying the requested executor. Indeed, we were wrapping each of our tasks as a coroutine.

I have not grokked this distributed.Worker class, but this seems to be a reasonable explanation and is consistent with my testing.

As a test one can make load() in the above example into a coroutine:

import asyncio
async def test_heartbeat(): 
...
    async def load(x: int): 
    ....

asyncio.run(test_heartbeat())

and the example should fail with “Worker failed to heartbeat within 4s”.

Nice catch! Is this enough for you to find a workaround to your problem?

Yes, thanks @guillaumeeb; this allows us to move forward.

1 Like