Cancelling dask task and handling of BaseException(asyncio.CancelledError) in a task

coming from issue in github as reference.

I am building a computational cluster using dask as a backend for use with docker containers (see here for reference).

In principle we do have 1 dask-scheduler and N dask-workers. The workers are able to run/monitor docker containers for computations. These containers are not controlled by us, and might take a fair amount of time to run. Therefore it is desirable to be able to stop them.

The dask-worker (when running a task) continuously checks whether it should cancel that task using dask Pub/Sub mechanism. In case the client wants to cancel the task, then the container is stopped and the task would raise asyncio.CancelledError. Calling the cancel() method on the dask future would not work since the task is already started.
I found out that this exception is not propagated back to the client. It seems to be expected but I was a bit surprised.

Is it really what is expected, and am I correctly understanding how to cancel a dask task?

Hi @sanderegg and welcome to discourse! Thanks for opening up your question again here. Would you mind explaining a bit more about how you’re using Dask and why you need to communicate a cancellation? For instance, if you’re using Dask just to monitor the the docker containers, then you don’t necessarily need Dask at all, and using asyncio would be the recommended approach.

Hi @scharlottej13, and thank you for answering me.
I will try to explain in a few words what we are doing.

We provide a computational platform (made of several computers in a cluster) where computational services (both as docker containers and python code thanks to dask) are put together in pipelines. Our architecture is micro-services based. We have a dask-scheduler and multiple dask-workers running in their respective docker containers. The “dask client” is also running in its own container and asks the dask-backend to run the tasks from the pipeline. Since these services are typically long running. It is desirable to be able to cancel these tasks.

1 Like

Yes. asyncio.CancelledError is a special exception type used internally in asyncio tasks. Since dask is also asyncio based, we have no method to determine where the CancelledError came from and treat it as an internal cancellation (rather than just your task raising a CancelledError). I recommend not reraising this error inside your task and instead using a different mechanism (a different error class, returning None, etc…) to propagate this info back to the client.

What @scharlottej13 is asking about is what’s actually running on your dask workers? Is it computational work using e.g. numpy/pandas/etc… (where dask is a good fit), or is it administrative tasks for monitoring the state of an external docker container (something like the following pseudocode):

def run_docker_container(image):
    container_id = start_docker_container(image)
    while container_is_running(container_id):
        if job_is_not_ok(container_id):
            return "job errored"
    return "job succeeded"

client.submit(run_docker_container, "my-example-image")

While dask can run this, it’s not the best fit for dask (long running task with lots of idle sleeping while still consuming a thread). You could change it to an async def function which would remove the need to consume a thread on the worker, but this kind of code may be better run elsewhere without dask entirely.

That said, I’m only guessing at what you’re running on dask based on your description above. We’d likely need a better description to be able to recommend a solution.

1 Like

Hello @jcristharif, thank you very much for the explanation. That makes sense, in the end I ended up creating my own TaskCancelledError exception and this works now properly. I was, as you said, getting some weird behaviours when raising asyncio.CancelledError. Actually I ended up doing this since I preload some async functions in the dask-worker… therefore that exception seemed like the logical thing to do.

As to the second question, I’ll try to better explain what we do (here a link to our repo).

  1. On one side yes that is in essence what we do. We use the dask-worker as a in a sidecar pattern. It starts,monitors and recover the result of a computation that runs inside a docker container (kind of what you wrote but using async libraries an example of how we use it).
  2. We do use the resource system of dask in order to prevent having too many of these containers on the same machine
  3. We want to mix these container “tasks” with python code, and that is where we thought that dask would be a good candidate in our toolset.

In the past we used celery to this end, but we were having some other problems.

1 Like