Hi all – I have a fairly large data pipeline that processes data from a series of radio telescopes. I have used prefect to write the actual workflow, but under the hood it is using dask to manage the computation. That is to say, the dask scheduler and workers are used to actually execute tasks that prefect constructs.
Some of these a short lived tasks (~20 seconds), others are much longer (>3 hours). I am running them on a SLURM based high-performance computing cluster. This cluster is pretty over subscribed, so I am hoping / relying on the adaptive scaling mode of dask to allow some level of dynamic scaling.
I am not using any dask array / dataframe objects. I am using functions decorated with something akin to @delayed
- in prefect world it is @task
. No large data is being passed around between functions - it is mostly Path objects, or spectic named tuples of Paths / strings.
My problem is a simple one - it appears as though dask is rescheduling tasks that have already been completed. This tends to happen when the SLURM cluster is congested, and I do not get a block of works instantly. Functions which are no idempotent will fail when they are executed again. As in the complete successfully, and some hours later might be re-executed, and then the fall over, e.g. trying to access a file that does not exist as it has already been zipped.
I am pretty sure this has something to do with either job stealing, or the dask scheduler deciding that it would be quicker to recompute work rather than transfer it. I was under the impression that the job stealing is transactional, i.e. that it would not repeat work that has been verified to have been completed (ignoring node / network outages).
Are there any suggestions here? Any logs I can access that gives me more information from the scheduler about why it is repeating completed tasks? And outright tell it not to? Are there any other reasons why dask would want to redo work that has already been completed? Ideally, my workflow would have 36 workers, and these errors seem to happen when I am only able to get a fraction of these initially, and later in the run more workers joining the cluster once SLURM allocates them resources.
I am using fask_jobqueue
to manage communicatingwith slurm and establishing workers.
I have increased the stability a little with settings like the following:
export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=0.01
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=True
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING_INTERVAL="120s"
export DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="3600s"
export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=100
export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL="10000ms"
export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE="1000000ms"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="300s"
export DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG="16384"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="300s"
export DASK_DISTRIBUTED__COMM__RETRY__COUNT=12
and using these options when calling adaptive
adapt_kwargs:
minimum: 1
maximum: 36
wait_count: 10
target_duration: "300s"
interval: "30s"
Ab alternative to this problem is to try to make my tasks idempotent. This might be possible without too much work. But these errors highlight behavior of Dask that I want to actually understand.