Iterative array update using persist() - Scheduler tasks accumulation

mercredi 20 août 2025

15:25

Hello,

I am working on a case where I want to update an array located on the cluster. In the little example below, this array fits in memory but in real life, it will be much bigger. Thus, we want to avoid getting the computation results in our main process, and instead want dask to perform the computation and keep the results on the workers.

Using persist(), I get the expected result: the array is properly computed on the cluster. However, I am troubled by the task retention on the scheduler. I expected the scheduler to clean the old tasks at each iteration, but we instead see a lot of ‘released’ tasks that are kept until the last iteration.

Only when deleting the last result in my main process, are the tasks finally removed from the scheduler.

I tried cancelling the old futures but it triggers FutureCancelledErrors, and I suspect that persist() does not break the dependencies chain properly.

I also tried assigning the persisted array to a temporary variable and cleaning the old array at each iteration, but to no avail.

Am I missing something ? I expect cleaning the tasks along the iterations is a good pratice, but maybe it has no impact whatsoever ?

import time
import dask.array
from distributed import LocalCluster
import numpy as np

def get_tasks(dask_scheduler=None):
    return dict(zip(*np.unique(list(map(lambda t: t.state, dask_scheduler.tasks.values())), return_counts=True)))

if __name__ == '__main__':

    cluster = LocalCluster(processes=False, n_workers=4, threads_per_worker=1)
    client = cluster.get_client()

    a = dask.array.ones((10, 10), chunks=(5, 5))
    a = a.persist()

    itr = 0
    while (itr < 10):
        print("ITR %d - Tasks in scheduler: %s" % (itr, client.run_on_scheduler(get_tasks)))
        a = (a + 2).persist()
        time.sleep(0.5)
        itr += 1

    time.sleep(1)
    print('Before array deletion: %s' % client.run_on_scheduler(get_tasks))
    del a
    time.sleep(1)
    print('After array deletion: %s' % client.run_on_scheduler(get_tasks))

    cluster.close()
ITR 0 - Tasks in scheduler: {}
ITR 1 - Tasks in scheduler: {'memory': 4, 'released': 4}
ITR 2 - Tasks in scheduler: {'memory': 4, 'released': 8}
ITR 3 - Tasks in scheduler: {'memory': 4, 'released': 12}
ITR 4 - Tasks in scheduler: {'memory': 4, 'released': 16}
ITR 5 - Tasks in scheduler: {'memory': 4, 'released': 20}
ITR 6 - Tasks in scheduler: {'memory': 4, 'released': 24}
ITR 7 - Tasks in scheduler: {'memory': 4, 'released': 28}
ITR 8 - Tasks in scheduler: {'memory': 4, 'released': 32}
ITR 9 - Tasks in scheduler: {'memory': 4, 'released': 36}
Before array deletion: {'memory': 4, 'released': 40}
After array deletion: {}

Hi @robin-cls, welcome to Dask community!

Thanks for the detailed post and the reproducible example! I did not try to reproduce it yet, just some thoughts first:

Well, it might, but it might not be a good thing: if you lose a chunk of your persisted array, you might want Dask to be able to recompute it by re running released tasks.

Most important in your case would be to be sure that previous persisted results are not kept in Workers memory. Tasks cleaning on Scheduler side might only be important if the number of tasks being compute is huged (e.g. > 1 million).

This leads to another question, do you really need to persist after each iteration?

Hi and thanks for the warm welcome !

Most important in your case would be to be sure that previous persisted results are not kept in Workers memory.

I didn’t see any memory leak on the full case, so I am quite sure that the memory is released from one iteration to another.

Well, it might, but it might not be a good thing: if you lose a chunk of your persisted array, you might want Dask to be able to recompute it by re running released tasks.

I understand that if I lose a chunk of the persisted array, dask will have to recompute from the first iteration to the current. Wouldn’t it better to introduce something similar to a checkpoint ?

This leads to another question, do you really need to persist after each iteration?

Perhaps more details about the original problem would be useful here. This minimal issue is a simplified resolution of a big matrix inversion using the conjuguate gradient method. The matrix is split per blocks on the cluster, and intermediate arrays are updated at each iteration. Each arrays (3 in total) is needed by each matrix block to compute their next state, so I think I need them persisted.

I guess checkpointing would mean writing the result to disk, which can be expensive, but you could probably do that.

Hm, I’m not fully understanding the processing here, but you are probably right. You could try without persisting though to see what’s hapening.