mercredi 20 août 2025
15:25
Hello,
I am working on a case where I want to update an array located on the cluster. In the little example below, this array fits in memory but in real life, it will be much bigger. Thus, we want to avoid getting the computation results in our main process, and instead want dask to perform the computation and keep the results on the workers.
Using persist(), I get the expected result: the array is properly computed on the cluster. However, I am troubled by the task retention on the scheduler. I expected the scheduler to clean the old tasks at each iteration, but we instead see a lot of ‘released’ tasks that are kept until the last iteration.
Only when deleting the last result in my main process, are the tasks finally removed from the scheduler.
I tried cancelling the old futures but it triggers FutureCancelledErrors, and I suspect that persist() does not break the dependencies chain properly.
I also tried assigning the persisted array to a temporary variable and cleaning the old array at each iteration, but to no avail.
Am I missing something ? I expect cleaning the tasks along the iterations is a good pratice, but maybe it has no impact whatsoever ?
import time
import dask.array
from distributed import LocalCluster
import numpy as np
def get_tasks(dask_scheduler=None):
return dict(zip(*np.unique(list(map(lambda t: t.state, dask_scheduler.tasks.values())), return_counts=True)))
if __name__ == '__main__':
cluster = LocalCluster(processes=False, n_workers=4, threads_per_worker=1)
client = cluster.get_client()
a = dask.array.ones((10, 10), chunks=(5, 5))
a = a.persist()
itr = 0
while (itr < 10):
print("ITR %d - Tasks in scheduler: %s" % (itr, client.run_on_scheduler(get_tasks)))
a = (a + 2).persist()
time.sleep(0.5)
itr += 1
time.sleep(1)
print('Before array deletion: %s' % client.run_on_scheduler(get_tasks))
del a
time.sleep(1)
print('After array deletion: %s' % client.run_on_scheduler(get_tasks))
cluster.close()
ITR 0 - Tasks in scheduler: {}
ITR 1 - Tasks in scheduler: {'memory': 4, 'released': 4}
ITR 2 - Tasks in scheduler: {'memory': 4, 'released': 8}
ITR 3 - Tasks in scheduler: {'memory': 4, 'released': 12}
ITR 4 - Tasks in scheduler: {'memory': 4, 'released': 16}
ITR 5 - Tasks in scheduler: {'memory': 4, 'released': 20}
ITR 6 - Tasks in scheduler: {'memory': 4, 'released': 24}
ITR 7 - Tasks in scheduler: {'memory': 4, 'released': 28}
ITR 8 - Tasks in scheduler: {'memory': 4, 'released': 32}
ITR 9 - Tasks in scheduler: {'memory': 4, 'released': 36}
Before array deletion: {'memory': 4, 'released': 40}
After array deletion: {}