Why do already gathered tasks run a second time 10-20h later?

Hi,

Win10, dask 2023.10.0, no MCVE, any hint welcome.

After ~20h of nominal dask operation, several hundreds of tasks that were already run and gathered, are surprisingly being run again. These tasks had been previously submitted and gathered successfully ~15h earlier with the same client.
I am confident that I am not wrongly re-submitting them.

Some context:

  • I process 10k tasks on a local, process-based, dask.distributed client over the course of 24h
  • a new task is submitted every ~10 seconds with client.submit(..., retries=1)
  • tasks complete in 10-20 seconds
  • moderate memory usage: remains at ~30% of available memory
  • moderate CPU usage: spikes to 100% at every task submission for 1-2 seconds, then decreases to 20%, for an overall usage less than 40%
  • some tasks launch tasks themselves using the worker_client context manager for a clean secede/rejoin
  • this behavior is reproducible, has happened in >4 different 24h-long compute sessions
  • it occurred about 10-20h after start

Questions

  1. why?
  2. what should I log to catch what is happening? Maybe log the states and transitions of all the tasks known to the scheduler? Is there already a way to activate such logs or should I do it manually?

Exploration

  1. I added the following safeguards, but it did not solve the problem (I understand that these are in theory not needed). After I successfully gather a future with client.gather:

    • explicitly cancel the future with client.cancel
    • delete the (only) reference to the future on the concrete side (OK to do because tasks are independent, no data reused later in other tasks)
  2. I am aware of the stealing mechanism mentioned here or here.
    It would be capable of re-running tasks. I have not yet tried to turn stealing off as suggested here. It likely will be my next fix attempt.

  3. The task journey docs mentions:
    “The graph/recipe to create the result of z persists in the scheduler for all time”.
    That is a confirmation that dask has the capability to re-run tasks as long as the scheduler is alive.

Thanks for any hint

Edit

a similar behavior was reported here:

it is on a SLURM cluster as opposed to a local machine here, but their duplicate running seems to occur over long time scales of multi-hour/day compute sessions, like here

Hi @templiert, welcome to Dask community!

Thanks for the effort of detailing your problem so much.

Unfortunately, right now I’ve no idea of what could cause this behavior. Retrying a task can occur if it has failed once, or if the result is in the cluster memory and referenced by a Future, and the worker holding it in memory dies.

If there is no task or Worker failure, and moreover you release the future explicitly, there is no reason that a Future would be reexecuted.

Is it always different tasks that are being re-run, did you identify some pattern? For example only tasks from task are being rerun?

Did you try to look at the default Scheduler log to see if there was any hint of what was happening?

Surely having more information about the task transitions should help, I’ve not looked if there are already logs you can enable, or if you would need to use a SchedulerPlugin.

Thanks @guillaumeeb.

OK, that is good to know

Here are two graphs showing two instances:

  • x axis: time
  • y axis: incremental task count
  • each run of a task plotted with a dot (time, id_task): black for first occurrence, orange for second occurrence
  • task run more than once: draw a vertical line
  • blue vertical line from bottom up to marker: first task run
  • orange vertical line from marker up to top: second task run

case 1

  • hundreds of tasks suddenly got re-run (the orange patch on the right)
  • tasks were spread from the beginning of the compute session up until the unexpected re-run event
  • the zoomed inset shows that only a subset of the tasks were re-run, not all of them. No obvious pattern to me.

zoomed inset:

case 2

  • similar to case #1 except that a smaller subset got re-run, up until an earlier point in time. Also, a short re-run session occurred early on.

case 3

I looked at a case 3 (no plot shown), which was after I implemented the safeguards (explicitly cancel future + delete future reference). I am noticing that there were less reruns than I thought, and these could be explained by work stealing (they were launched close in time). So the safeguards actually maybe worked. I will edit my original post (actually no, I apparently cannot edit my original post any more).

Good point: these are the main tasks, not the subtasks launched from main tasks.

I am not sure how much of the Scheduler logs I am seeing by default. I only know that nothing in the general dask logs was suspicious.

actually there is, the docs mention Scheduler.story: it logs the past states of a task identified by its key.

Looking at the Scheduler code, I believe it is keeping all past tasks in its Scheduler.tasks dictionary, regardless of the task state (even in “forgotten” state). So, periodically and/or at the end of a compute session I could do something like the following to inspect the state transitions:

for key in scheduler.tasks():
    logger.debug(scheduler.story(key))

summary

  • I still think my original duplicates were real
  • cancel the future and/or delete future reference maybe solved the problem
  • I will use the Scheduler.story if it happens again to dig deeper

Thanks again @guillaumeeb

Thanks to you for the once more deep analysis.

I admit I have no clue here, the original duplicates should not happen, and I can’t see what in your code could cause this…

Anyway I’m glad you’ve maybe found a solution and have some plan for the future if this is not a real one.