Substitute an array by another in a task graph

Dear Dask community,

I’m from the astrophysics community and recently discovered Dask with delight — so a big thanks! However my needs quickly grew and it seems that I now evolve in unchartered (let’s say, without high level doc) territories!

Here’s my problem: I’m simulating an instrument where we have 3 independent laser sources represented by 3 independent noise time series. I model them by 3 Dask arrays. They are transformed in complicated ways as they propagate through the system — all the way down to our measurements, the results of my simulation.

Here’s the catch: lasers are actually locked onto each other. An easy way to implement this with very minimal changes (in my head) is by editing slightly the task graph in the end: I substitute some “random generation” tasks by the result of a propagated beam. However it seems that there’s no interface (even low level) to “rebind” or “substitute” a task by another.

How would you go about doing this?

Thanks a lot for your help!

1 Like

Welcome @j2b.bayle !

I’m no expert in HighLevelGraph (HLG) technology. So really someone more conversant with it than I should be answering you. But since no one has replied to your query yet, I’ll venture an answer in the meantime.

After having used dask for a while now, I’m not aware of any functionality that allows task-graphs to be “parameterized” or edited in the way you describe. Normally, once constructed, a task-graph is not expected to be tampered with by the end-user.

But if you must go this way, I can at least point out that HLG’s are implemented as Python dicts, and therefore can be modified in the same way you would a dict.
At first glance however, excising a part of an HLG in order to “graft” another (belonging to different array) in its place does not strike me as a performance enhancer, and I’m wondering if you’re not better off reconstructing the graph from scratch with the required array included instead.

It appears dask developers are still contemplating an HLG API/protocol, with very few people at the moment understanding how HLG construction actually works (see link). Still, you might be able to figure out how to manipulate HighLevelGraphs in the way you want by studying the source-code in the modules dask/dask/highlevelgraph.py, dask/dask/graph_manipulation.py, and their accompanying unit tests in `dask/dask.

Perhaps, if you share some (pseudo-) code detailing what you’re aiming to calculate, I might be able to suggest a convenient alternative.

1 Like

Thanks for the reply @ParticularMiner.

I’m actually not sure that you can reconstruct the graph from scratch easily. Here’s a small snippet of pseudo code, showing what I intend to do.

# Independent sources of noise
source_1 = da.random.normal(size=size)
source_2 = da.random.normal(size=size)
source_3 = da.random.normal(size=size)

# Propagation of this noise
propag_1 = delayed(propagate)(source_1)
propag_2 = delayed(propagate)(source_2)
propag_3 = delayed(propagate)(source_3)

# Measurements
meas_1 = propag_2 - source_1
meas_2 = propag_3 - source_2
meas_3 = propag_1 - source_3

# Source 1 not locked so it is not changed

# Source 3 locked so that
# meas_3 = 0  =>  source_3 = propag_1
dask.rebing(source_3, propag_1)

# Source 2 locked so that
# meas_2 = 0  =>  source_2 = propag_3
dask.rebind(source_2, propag_3)

where the rebind() method actually replaces the tasks corresponding to its first argument with the graph of its second argument.

Thanks for the references – I’ll take a look at them as well!

@j2b.bayle

  1. The following is not recommended practice (see the docs):

    # Propagation of this noise
    propag_1 = delayed(propagate)(source_1)
    propag_2 = delayed(propagate)(source_2)
    propag_3 = delayed(propagate)(source_3)
    

    It may be replaced with this:

    # Propagation of this noise
    propag_1 = source_1.map_blocks(propagate)
    propag_2 = source_2.map_blocks(propagate)
    propag_3 = source_3.map_blocks(propagate)
    
  2. Regarding your function rebind(), unless I’m misunderstanding something, I would expect simple assignment to do the trick:

    source_3 = propag_1
    source_2 = propag_3
    

    But do feel free to counter if this does not suit your needs.

@j2b.bayle

I guess, the important question here is: how do you intend to use source_2 and source_3 after calling rebind()? That is what will determine how rebind() should be implemented.

For instance, if the pseudo-code block you posted is going to be inside a loop, then simple assignment (as in my previous post) will suffice to implement rebind() — though performance implications might necessitate additionally persisting source_2 and source_3 to memory (if running the code on a multiple-machine cluster), or to disk (if running on a single machine).

1 Like

Dear @ParticularMiner,

Thanks for your help and your answers – they are much appreciated.

Regarding your first reply:

  1. That’s a good point, thanks for reminding me that. Hopefully this is not something I’d do in a real block of code.

  2. As you pointed out, if I just reassign source_3 = propag_1 and source_2 = propag_3, then I have to duplicate the code (or use a loop) to recompute meas_1, meas_2 and meas_3 accordingly. In fact, because changing source_3 changes propag_3, then the entire code has to be re-run one more time, for a total of 3 loop iterations.

So really, and to answer your last post, I don’t intend to use source_2 and source_3 in the end. I’m really just interested in the measurements, assuming that the rebinding “took place from the beginning”. However, the way we rebind the sources depend on the user input, so we do not know it in advance.

Maybe this is a bit more clear with this slight variation of the pseudo-code I was giving:

# Independent sources of noise
source_1 = da.random.normal(size=size)
source_2 = da.random.normal(size=size)
source_3 = da.random.normal(size=size)

# Propagation of this noise
propag_1 = source_1.map_blocks(propagate)
propag_2 = source_2.map_blocks(propagate)
propag_3 = source_3.map_blocks(propagate)

# Measurements
meas_1 = propag_2 - source_1
meas_2 = propag_3 - source_2
meas_3 = propag_1 - source_3

def lock(config):

    if config['source_1'] == False:
        # Source 1 not locked so it is not changed
    elif config['source_1'] == True:
        # meas_1 = 0  =>  source_1 = propag_2
        dask.rebing(source_1, propag_2)

    if config['source_2'] == False:
        # Source 2 not locked so it is not changed
    elif config['source_2'] == True:
        # meas_2 = 0  =>  source_2 = propag_3
        dask.rebing(source_2, propag_3)

    if config['source_1'] == False:
        # Source 1 not locked so it is not changed
    elif config['source_1'] == True:
        # meas_1 = 0  =>  source_1 = propag_2
        dask.rebing(source_1, propag_2)

    if config['source_3'] == False:
        # Source 3 not locked so it is not changed
    elif config['source_3'] == True:
        # meas_3 = 0  =>  source_3 = propag_1
        dask.rebing(source_3, propag_1)

# And you can hope to get the same result as
# my first example with
lock({'source_1': False, 'source_2': True, 'source_3': True})

With what you suggest, I think we have to run the entire code above (without the random number generation initialization) three times: the first time, all sources are randomly generated and in the end source_3 is correctly set; the second time, source_3 is correctly propagated and source_2 is correctly set; the third time, source_2 is correctly propagated and we finally have all measurements.

Hopefully this is clearer!

1 Like

Hi @j2b.bayle ,

Your last post was much clearer. Thank you.

Your problem is also quite interesting. It is indeed an exercise in task-graph construction. And if I understood you correctly, then the following code-snippet should serve your needs. [You can also examine the task-graph of meas_1, for instance, by executing meas_1.visualize() after running the following code-snippet.]

import dask.array as da


def propagate(src):
    # This function is just a dummy for now.
    # Replace the body of this function with its true implementation.
    return src


def lock(config, source_1, source_2, source_3, propag_1, propag_2, propag_3):
    continue_ = False
    
    if (
        config['source_1']
        and source_1.dask.dependencies != propag_2.dask.dependencies
    ):
        # meas_1 = 0  =>  source_1 = propag_2
        source_1 = propag_2
        continue_ = True

    if (
        config['source_2']
        and source_2.dask.dependencies != propag_3.dask.dependencies
    ):
        # meas_2 = 0  =>  source_2 = propag_3
        source_2 = propag_3
        continue_ = True

    if (
        config['source_3']
        and source_3.dask.dependencies != propag_1.dask.dependencies
    ):
        # meas_3 = 0  =>  source_3 = propag_1
        source_3 = propag_1
        continue_ = True
    
    return continue_, source_1, source_2, source_3


config = {'source_1': False, 'source_2': True, 'source_3': True}
size = 10
# Independent sources of noise
source_1 = da.random.normal(size=size, chunks=5)
source_2 = da.random.normal(size=size, chunks=5)
source_3 = da.random.normal(size=size, chunks=5)

while True:
    # Propagation of sources
    propag_1 = source_1.map_blocks(propagate)
    propag_2 = source_2.map_blocks(propagate)
    propag_3 = source_3.map_blocks(propagate)

    continue_, source_1, source_2, source_3 = lock(
        config, source_1, source_2, source_3, propag_1, propag_2, propag_3
    )
    if not continue_:
        break

# Measurements
meas_1 = propag_2 - source_1
meas_2 = propag_3 - source_2
meas_3 = propag_1 - source_3
1 Like

I think it does work :partying_face: – at least this simple version of the problem yields the current task graph.

The construction of the task graph is a bit sub-optimal, because we have to loop n times for n “locking conditions”, but that’s good enough for now. Hopefully with the full, more complex problem the loop will not take up too much time.

Thank you very much!

2 Likes