Hi all – I have a fairly large data pipeline that processes data from a series of radio telescopes. I have used prefect to write the actual workflow, but under the hood it is using dask to manage the computation. That is to say, the dask scheduler and workers are used to actually execute tasks that prefect constructs.
Some of these a short lived tasks (~20 seconds), others are much longer (>3 hours). I am running them on a SLURM based high-performance computing cluster. This cluster is pretty over subscribed, so I am hoping / relying on the adaptive scaling mode of dask to allow some level of dynamic scaling.
I am not using any dask array / dataframe objects. I am using functions decorated with something akin to
@delayed - in prefect world it is
@task. No large data is being passed around between functions - it is mostly Path objects, or spectic named tuples of Paths / strings.
My problem is a simple one - it appears as though dask is rescheduling tasks that have already been completed. This tends to happen when the SLURM cluster is congested, and I do not get a block of works instantly. Functions which are no idempotent will fail when they are executed again. As in the complete successfully, and some hours later might be re-executed, and then the fall over, e.g. trying to access a file that does not exist as it has already been zipped.
I am pretty sure this has something to do with either job stealing, or the dask scheduler deciding that it would be quicker to recompute work rather than transfer it. I was under the impression that the job stealing is transactional, i.e. that it would not repeat work that has been verified to have been completed (ignoring node / network outages).
Are there any suggestions here? Any logs I can access that gives me more information from the scheduler about why it is repeating completed tasks? And outright tell it not to? Are there any other reasons why dask would want to redo work that has already been completed? Ideally, my workflow would have 36 workers, and these errors seem to happen when I am only able to get a fraction of these initially, and later in the run more workers joining the cluster once SLURM allocates them resources.
I am using
fask_jobqueue to manage communicatingwith slurm and establishing workers.
I have increased the stability a little with settings like the following:
and using these options when calling adaptive
Ab alternative to this problem is to try to make my tasks idempotent. This might be possible without too much work. But these errors highlight behavior of Dask that I want to actually understand.