Understanding persisted task names

I’m working on an optimization for some processing code and have started playing with persisting different parts. I noticed that the dask task name of the persisted array seems to be the same as the original dask array. Is this expected? Wouldn’t this cause issues if both the unpersisted and persisted array were computed at the same time? I mean, the scheduler would think both are the same task, right?

Can someone shine some light on this?

1 Like

:man_facepalming: Forgot to paste the example:

In [1]: import dask.array as da

In [2]: a = da.zeros((5, 5)) ** 2 + 1

In [3]: b = a.persist()

In [4]: a.name
Out[4]: 'add-6c2072910faa9fa64b5e20ea328a0fca'

In [5]: b.name
Out[5]: 'add-6c2072910faa9fa64b5e20ea328a0fca'

@djhoese Good question!

@ian and I think this is to be expected. Having the same name for the same computations allows Dask to avoid re-running them if their results are still available.

Do you have a use case where you don’t want this to happen?

The behavior in a distributed scheduler where the scheduler likely knows that the persisted data exists makes sense, is expected, and is desired. But is this true for a threaded scheduler? In my experience and past bugs I’ve reported, computing two separate tasks with the same name can cause some major and hard to debug issues where the scheduler ends up executing whichever task it finds first. My concern is in this threaded scheduler case where I might have something like this:

import dask.array as da

a = da.zeros((5, 5)) ** 2 + 1
b = a.persist()
c = a + 1
d = b + 1

da.compute(c, d)

In this case, are any of the following outcomes guaranteed by both threaded and distributed schedulers:

  1. a will be used for c, b will be used for d: This results in two total computes of a, once during the persist, once during the compute of c. The compute of d will use the persisted b.
  2. b will be used for c and d: This is the positive outcome of the name conflict in the scheduler where the scheduler sees b and a have the same name and chooses b first resulting in only one computation of a (when it is persisted).
  3. a will be used for c and d: This is the negative outcome of the name conflict in the scheduler where the scheduler sees b and a have the same name and chooses a first resulting in two computations of a (when it is persisted to b and when c and d are computed).

I hope this makes sense.

@djhoese BAsed on the following task graphs, looks like b will be used for both c and d (not super sure if it’s guaranteed though).

import dask.array as da
import dask

a = da.zeros((5, 5)) ** 2 + 1
b = a.persist()
c = a + 1
d = b + 1
e = c + d

e.visualize()

e.visualize(optimize_graph=True)

download-12

However, in the single-machine threaded scheduler case, persist() isn’t helpful in general because it triggers an immediate computation. Do you have a specific case where you need to use persist with the threaded scheduler?

Here’s some more documentation around persist (we can definitely improve this though!):
https://docs.dask.org/en/stable/custom-collections.html#persist

The keys of this graph are the same as __dask_keys__ for the corresponding collection, and the values are computed results (for the single-machine scheduler) or futures (for the distributed scheduler).

Good idea on visualizing the graphs. As for a specific case of using persist on a threaded scheduler, yes I do. I have some satellite data processing where some of the arrays (the geolocation lon/lat coordinates) are used multiple times for boolean/scalar operations like determining the min/max of the data or generating a bounding polygon and then they are used one last time with every pixel being used. In this case, where the calculations leading up to the geolocation are costly (ex. involve interpolation from data in a file) and where the usage is something that requires computation, it is more efficient to persist the geolocation at the beginning to hold it in memory and then continue on in the processing.

I could fully compute the data and use numpy arrays for the rest of the processing. But by doing it this way and keeping things as dask arrays I get the benefit of dask for the initial computation but also the flexibility of each component that uses these arrays later on not needing to depend on the persist/compute. So I can write all the code assuming dask. Put another way, I persist the computations that drive other dask computations. Both benefit speed-wise from using dask.

And I should point out that the behavior I’m concerned about would probably be considered a bug. I’ve run into this in the past where a “bug” in da.map_blocks was producing the same task name for two different calls. Dask didn’t know this was a problem until it was executing the computations and the numpy arrays being passed weren’t the right size/dtype.

And I just ran your first snippet of code locally and I get the same graph, but if you swap d and c (so e = d + c) you get:

This shows that what is computed/used is based on the order that dask traverses the graph and that there is a collision between the two names. This, I think, also means that if I had something other than dask arrays (Delayed functions maybe?) that I wanted to be computed again because they had some side effect, that this may not happen depending on what order the user specifies things in.

1 Like