Adaptive Scaling while not rerunning non-idempotent tasks

Hi all – I have a fairly large data pipeline that processes data from a series of radio telescopes. I have used prefect to write the actual workflow, but under the hood it is using dask to manage the computation. That is to say, the dask scheduler and workers are used to actually execute tasks that prefect constructs.

Some of these a short lived tasks (~20 seconds), others are much longer (>3 hours). I am running them on a SLURM based high-performance computing cluster. This cluster is pretty over subscribed, so I am hoping / relying on the adaptive scaling mode of dask to allow some level of dynamic scaling.

I am not using any dask array / dataframe objects. I am using functions decorated with something akin to @delayed - in prefect world it is @task. No large data is being passed around between functions - it is mostly Path objects, or spectic named tuples of Paths / strings.

My problem is a simple one - it appears as though dask is rescheduling tasks that have already been completed. This tends to happen when the SLURM cluster is congested, and I do not get a block of works instantly. Functions which are no idempotent will fail when they are executed again. As in the complete successfully, and some hours later might be re-executed, and then the fall over, e.g. trying to access a file that does not exist as it has already been zipped.

I am pretty sure this has something to do with either job stealing, or the dask scheduler deciding that it would be quicker to recompute work rather than transfer it. I was under the impression that the job stealing is transactional, i.e. that it would not repeat work that has been verified to have been completed (ignoring node / network outages).

Are there any suggestions here? Any logs I can access that gives me more information from the scheduler about why it is repeating completed tasks? And outright tell it not to? Are there any other reasons why dask would want to redo work that has already been completed? Ideally, my workflow would have 36 workers, and these errors seem to happen when I am only able to get a fraction of these initially, and later in the run more workers joining the cluster once SLURM allocates them resources.

I am using fask_jobqueue to manage communicatingwith slurm and establishing workers.

I have increased the stability a little with settings like the following:

export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=0.01
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=True
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING_INTERVAL="120s"
export DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="3600s"
export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=100
export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL="10000ms"
export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE="1000000ms"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="300s"
export DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG="16384"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="300s"
export DASK_DISTRIBUTED__COMM__RETRY__COUNT=12

and using these options when calling adaptive

adapt_kwargs:
    minimum: 1
    maximum: 36
    wait_count: 10
    target_duration: "300s"
    interval: "30s"

Ab alternative to this problem is to try to make my tasks idempotent. This might be possible without too much work. But these errors highlight behavior of Dask that I want to actually understand.

It is almost like keys are not being moved / copied from an existing worker when a fresh worker that just joins the cluster (as in SLURM allocates a new compute node) in the work stealing environment.

I see some references to maxmimum incoming and outgoing connections that might inhibit keys from being copied in the Worker.get_data method. But much more than that I am not sure why tasks would be executed again when a fresh worker joins.

Hi @tjgalvin, welcome to Dask community!

Well yes, it should be, according to the docs.

And as you say:

However, concurrent or repeated execution of the same task is still possible in the event of worker death or a disrupted network connection.

The only reason I can think of if that one of your Worker ended (Slurm job terminated), and some results must be recomputed.

Are there some relation between your tasks? Some task result is an input of the next one?

You should have some information in the Scheduler logs about tasks, maybe by changing its level. Ultimately, you could try to implement a SchedulerPlugin to try to identify replayed tasks and the reason why.

There sure might be some complex mechanism that I don’t know of under hood.

cc @fjetter @crusaderky.

The only reason I can think of if that one of your Worker ended (Slurm job terminated), and some results must be recomputed.

Yes, this is my thinking as well. But I am having a pretty hard time following whether this is actually happening and why. My typical grep on the output slurm logs do not indicate any type of OOM situation. I did see some moments where a node was taken offline in the middle of processing, but I am pretty sure this was a one off and not the real root cause.

I did lower my imaging properties so that the basic workflow completes faster (around 2 times quicker, similar reduction in memory footprint as well) and the problem happens far less. This implies that the issue is related to congestion still (system gets through work quicker, so queue is smaller for instances of the workflow started last), or something about SLURM monitoring and killing jobs that break my compute requests.

Are there some relation between your tasks? Some task result is an input of the next one?

Yes, that is right. My basic flow will process in parallel 36 data sets, converting the raw data into images. At three points throughout the workflow the images produced in these 36 streams are combined into a single larger field image. All this happens in isolation though e.g. they act of adding them together is an isolated branch on the DAG.

What is the correct way of increasing the logging level in the dask distributed environment? Is it:

export DASK_DISTRIBUTED__LOGGING=DEBUG

It is a little confusing why this is happening. I did go reading through the scheduler and worker code and I saw these options in the distributed.worker.get_data method about limiting the number incoming and outgoing connections when exchanging key → data mappings. I bumped those numbers up thinking why not try.

export DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING=2048
export DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING=2048

I was wondering whether I could just broadcast all key->value results of tasks run among all workers in a cluster. As I understand the data mapping component of a worker, it is simply storing the result of a task in the value part, and the unique task identifier is the key, My data are super small. I would have a laugh if this ‘everyone gets everything’ approach would help me out when a worker is recovering from something nasty (or joining the cluster once some resource from SLURM is allocated.)

A closed worker is the only reason I can come up with that would explain the recomputation of tasks. That could also be a worker that attempted to retire/close gracefully but couldn’t for whatever reason. Particularly in adaptive scenarios we are trying to retire workers gracefully when the adaptive algorithm wants to scale down.
A common cause for a worker not being able to retire in time (it typically gets 30s before we terminate the process) is that the task on the worker runs for a longer time. We would then need to recompute that task again.

Another failure during graceful retirement is the eviction of the workers data. When downscaling a worker, it is typically holding result data (even if it’s just None) and before we close a worker, we are attempting to move that result off to another worker. If this fails for whatever reason, we have to recompute those tasks as well. There is a fix in starting with version 2023.10.0 that accelerates this eviction process which might be worth trying if you’re on an older version.

method about limiting the number incoming and outgoing connections when exchanging key → data mappings. I bumped those numbers up thinking why not try.

It should not be necessary to bump those. This is basically used for rate limiting and the defaults should be fine.

You also may want to give API — Dask.distributed 2023.11.0 documentation a shot. This should help dask release no longer needed data once a computation branch is done more quickly such that the impact of dying workers are more contained (the problem is still there but the likelihood is smaller)

Thanks so much to both for the responses.

It really does seem like there is a worker that is either killed outright, or unable to exchange result data. In the latter case, is there an expected message I could try grep’ing for?

A common cause for a worker not being able to retire in time (it typically gets 30s before we terminate the process) is that the task on the worker runs for a longer time. We would then need to recompute that task again.

This part confuses me. Why would a worker be asked to scale down if it is currently running a task (which adds to the workers data store)? I naively would have thought that the adapt method would only scale down workers that are not doing anything. Or is this a case where the scheduler does not have knowledge of what a worker is doing when it requests the worker to shut down? Some of my longest running tasks are calling out through subprocess a compiled application (in a container) that can taken tens of minutes to hours tor run.

Another failure during graceful retirement is the eviction of the workers data. When downscaling a worker, it is typically holding result data (even if it’s just None) and before we close a worker, we are attempting to move that result off to another worker. If this fails for whatever reason, we have to recompute those tasks as well. There is a fix in starting with version 2023.10.0 that accelerates this eviction process which might be worth trying if you’re on an older version.

I will have a look at this version. I am currently stuck on version 2023.3.1, as my python is currently locked to 3.8. I know this is pretty outdated, but a separate package is tied to it. I am frustrated by this…

Just to tack onto this. The package keeping me tied to python 3.8 is slowly being updated (not by me!) so hopefully I can move onto the new distributed version soon.

Even so, I was able to get a work around going. Prefect supports tasks cachine results in a persistent manner (at least for the lifetime of the workflow). Turning that on has resolved all of my issues - by issues I mean functions with side-effects (e.g. zipping files when done) from rerunning and crashing.

I am sure this is probably me ‘fixing’ the problem in the wrong spot. I would like to try to understand what is causing this to happen in the dask scheduler / dask workers, if for nothing else to help my own understanding.

Once I am able to move into the new distributed version listed above I will rerun my initial testing without the magic prefect fix and let you know what happens. For the moment though I am happy.

It is also known that dask-jobqueue is having several problem with adaptive setup, this might be the source of your problem.

See some github issues: