Culling old-nodes from the task graph

I am using dask to implement an iterative algorithm (See [1] for context). Each iteration of this algorithm is adding ~100K tasks to the scheduler.tasks dictionary. Over the course of the run this can lead to several million tasks.

IIUC dask’s performance deteriorates beyond 1M nodes. Does this take into account even older tasks which have already been computed and persisted ?
If they are going to be a problem I was curious if I can manually reduce the size of the graph. One mechanism to do this would be restart the algorithm from a stored checkpoint but this necessitates re-execution of some expensive initial operations (e.g. data download) which I would prefer to avoid.

I am wondering if it is possible to reduce the size of the task graph by removing older tasks, possibly replacing them with another task which fetches the relevant output from a cache location.

[1] : Resume capabilities from a persistent cache


NOT A CONTRIBUTION

1 Like

Are you gathering results between iterations? Which Dask API are you using?

Submitting a graph containing over 1M tasks to the Scheduler begins to overload it. But depending on your workflow, adding tasks after some of them has terminated might be OK.

Are you gathering results between iterations? Which Dask API are you using?

Yes it is conducted between API calls. I use dask.Bag with 3 synchronous compute calls/iteration. However the persisted collections are used in the next iterations which results in adding to the same graph across iterations.

adding tasks after some of them has terminated might be OK

Thanks this is good to know. I assume the graph optimization phase excludes previously executed tasks.

As soon as you are releasing all references to the futures, the tasks on the scheduler are forgotten.

For instance

f1 = client.submit(inc, 1)
f2 = client.submit(inc, f1)

# Both tasks are still known to the scheduler at this point
assert f2.result() == 3

# After this del, we lose all local references to f1 but are still holding on to f1 since it is required by f2
del f1

# after this del, the scheduler will forget everything
del f2

If you are forgetting tasks and generating new ones over and over again, even if the total count in the end it a couple million, you shouldn’t expect any performance implications.

If you do not forget the tasks you’ll likely need more memory on the scheduler but otherwise you should be fine. The issue with large graphs often stems from the tasks being very small s.t. the overhead is becoming a problem. You can expect something like 50ms-100ms overhead per task so the runtime of a task should be large enough to compensate this overhead.

In my case I have an iterative algorithm where each iteration uses the output from the previous one. Thus the Futures from the older iterations are not deleted till the end of the algorithm.

If you do not forget the tasks you’ll likely need more memory on the scheduler but otherwise you should be fine. The issue with large graphs often stems from the tasks being very small s.t. the overhead is becoming a problem. You can expect something like 50ms-100ms overhead per task so the runtime of a task should be large enough to compensate this overhead.

The tasks each run a few 10s of seconds. So I hope this is OK for now.

More generally the idea of task graph manipulation is also of interest to me due to Resume capabilities from a persistent cache
I have been trying to create a mechanism to modify the task graph so that the computation is not redone from the first task in a 100 task chain after a node failure, but is instead fetched from the cache.
My thinking here is to modify the graph to create a load-from-cache task as the source after cutting out the older tasks from the chain.

One way to achieve this behavior is to basically restart the algorithm from a cached point periodically, but this would necessitate reexecuting data-fetch which can be incredibly expensive in my case. So if there is a mechanism to create a task-affinity-per-node to existing nodes, so that I can restart the pipeline and not download the data from remote sources, but fetch it from local memory/disk it would be helpful.

Would implementing task-affinity be a more easy to accomplish ?


NOT A CONTRIBUTION