Dear Dask community,
I’m from the astrophysics community and recently discovered Dask with delight — so a big thanks! However my needs quickly grew and it seems that I now evolve in unchartered (let’s say, without high level doc) territories!
Here’s my problem: I’m simulating an instrument where we have 3 independent laser sources represented by 3 independent noise time series. I model them by 3 Dask arrays. They are transformed in complicated ways as they propagate through the system — all the way down to our measurements, the results of my simulation.
Here’s the catch: lasers are actually locked onto each other. An easy way to implement this with very minimal changes (in my head) is by editing slightly the task graph in the end: I substitute some “random generation” tasks by the result of a propagated beam. However it seems that there’s no interface (even low level) to “rebind” or “substitute” a task by another.
How would you go about doing this?
Thanks a lot for your help!
1 Like
Welcome @j2b.bayle !
I’m no expert in HighLevelGraph
(HLG) technology. So really someone more conversant with it than I should be answering you. But since no one has replied to your query yet, I’ll venture an answer in the meantime.
After having used dask
for a while now, I’m not aware of any functionality that allows task-graphs to be “parameterized” or edited in the way you describe. Normally, once constructed, a task-graph is not expected to be tampered with by the end-user.
But if you must go this way, I can at least point out that HLG’s are implemented as Python dict
s, and therefore can be modified in the same way you would a dict
.
At first glance however, excising a part of an HLG in order to “graft” another (belonging to different array) in its place does not strike me as a performance enhancer, and I’m wondering if you’re not better off reconstructing the graph from scratch with the required array included instead.
It appears dask
developers are still contemplating an HLG API/protocol, with very few people at the moment understanding how HLG construction actually works (see link). Still, you might be able to figure out how to manipulate HighLevelGraph
s in the way you want by studying the source-code in the modules dask/dask/highlevelgraph.py
, dask/dask/graph_manipulation.py
, and their accompanying unit tests in `dask/dask.
Perhaps, if you share some (pseudo-) code detailing what you’re aiming to calculate, I might be able to suggest a convenient alternative.
1 Like
Thanks for the reply @ParticularMiner.
I’m actually not sure that you can reconstruct the graph from scratch easily. Here’s a small snippet of pseudo code, showing what I intend to do.
# Independent sources of noise
source_1 = da.random.normal(size=size)
source_2 = da.random.normal(size=size)
source_3 = da.random.normal(size=size)
# Propagation of this noise
propag_1 = delayed(propagate)(source_1)
propag_2 = delayed(propagate)(source_2)
propag_3 = delayed(propagate)(source_3)
# Measurements
meas_1 = propag_2 - source_1
meas_2 = propag_3 - source_2
meas_3 = propag_1 - source_3
# Source 1 not locked so it is not changed
# Source 3 locked so that
# meas_3 = 0 => source_3 = propag_1
dask.rebing(source_3, propag_1)
# Source 2 locked so that
# meas_2 = 0 => source_2 = propag_3
dask.rebind(source_2, propag_3)
where the rebind()
method actually replaces the tasks corresponding to its first argument with the graph of its second argument.
Thanks for the references – I’ll take a look at them as well!
@j2b.bayle
I guess, the important question here is: how do you intend to use source_2
and source_3
after calling rebind()
? That is what will determine how rebind()
should be implemented.
For instance, if the pseudo-code block you posted is going to be inside a loop, then simple assignment (as in my previous post) will suffice to implement rebind()
— though performance implications might necessitate additionally persisting source_2
and source_3
to memory (if running the code on a multiple-machine cluster), or to disk (if running on a single machine).
1 Like
Dear @ParticularMiner,
Thanks for your help and your answers – they are much appreciated.
Regarding your first reply:
-
That’s a good point, thanks for reminding me that. Hopefully this is not something I’d do in a real block of code.
-
As you pointed out, if I just reassign source_3 = propag_1
and source_2 = propag_3
, then I have to duplicate the code (or use a loop) to recompute meas_1
, meas_2
and meas_3
accordingly. In fact, because changing source_3
changes propag_3
, then the entire code has to be re-run one more time, for a total of 3 loop iterations.
So really, and to answer your last post, I don’t intend to use source_2
and source_3
in the end. I’m really just interested in the measurements, assuming that the rebinding “took place from the beginning”. However, the way we rebind the sources depend on the user input, so we do not know it in advance.
Maybe this is a bit more clear with this slight variation of the pseudo-code I was giving:
# Independent sources of noise
source_1 = da.random.normal(size=size)
source_2 = da.random.normal(size=size)
source_3 = da.random.normal(size=size)
# Propagation of this noise
propag_1 = source_1.map_blocks(propagate)
propag_2 = source_2.map_blocks(propagate)
propag_3 = source_3.map_blocks(propagate)
# Measurements
meas_1 = propag_2 - source_1
meas_2 = propag_3 - source_2
meas_3 = propag_1 - source_3
def lock(config):
if config['source_1'] == False:
# Source 1 not locked so it is not changed
elif config['source_1'] == True:
# meas_1 = 0 => source_1 = propag_2
dask.rebing(source_1, propag_2)
if config['source_2'] == False:
# Source 2 not locked so it is not changed
elif config['source_2'] == True:
# meas_2 = 0 => source_2 = propag_3
dask.rebing(source_2, propag_3)
if config['source_1'] == False:
# Source 1 not locked so it is not changed
elif config['source_1'] == True:
# meas_1 = 0 => source_1 = propag_2
dask.rebing(source_1, propag_2)
if config['source_3'] == False:
# Source 3 not locked so it is not changed
elif config['source_3'] == True:
# meas_3 = 0 => source_3 = propag_1
dask.rebing(source_3, propag_1)
# And you can hope to get the same result as
# my first example with
lock({'source_1': False, 'source_2': True, 'source_3': True})
With what you suggest, I think we have to run the entire code above (without the random number generation initialization) three times: the first time, all sources are randomly generated and in the end source_3
is correctly set; the second time, source_3
is correctly propagated and source_2
is correctly set; the third time, source_2
is correctly propagated and we finally have all measurements.
Hopefully this is clearer!
1 Like
Hi @j2b.bayle ,
Your last post was much clearer. Thank you.
Your problem is also quite interesting. It is indeed an exercise in task-graph construction. And if I understood you correctly, then the following code-snippet should serve your needs. [You can also examine the task-graph of meas_1
, for instance, by executing meas_1.visualize()
after running the following code-snippet.]
import dask.array as da
def propagate(src):
# This function is just a dummy for now.
# Replace the body of this function with its true implementation.
return src
def lock(config, source_1, source_2, source_3, propag_1, propag_2, propag_3):
continue_ = False
if (
config['source_1']
and source_1.dask.dependencies != propag_2.dask.dependencies
):
# meas_1 = 0 => source_1 = propag_2
source_1 = propag_2
continue_ = True
if (
config['source_2']
and source_2.dask.dependencies != propag_3.dask.dependencies
):
# meas_2 = 0 => source_2 = propag_3
source_2 = propag_3
continue_ = True
if (
config['source_3']
and source_3.dask.dependencies != propag_1.dask.dependencies
):
# meas_3 = 0 => source_3 = propag_1
source_3 = propag_1
continue_ = True
return continue_, source_1, source_2, source_3
config = {'source_1': False, 'source_2': True, 'source_3': True}
size = 10
# Independent sources of noise
source_1 = da.random.normal(size=size, chunks=5)
source_2 = da.random.normal(size=size, chunks=5)
source_3 = da.random.normal(size=size, chunks=5)
while True:
# Propagation of sources
propag_1 = source_1.map_blocks(propagate)
propag_2 = source_2.map_blocks(propagate)
propag_3 = source_3.map_blocks(propagate)
continue_, source_1, source_2, source_3 = lock(
config, source_1, source_2, source_3, propag_1, propag_2, propag_3
)
if not continue_:
break
# Measurements
meas_1 = propag_2 - source_1
meas_2 = propag_3 - source_2
meas_3 = propag_1 - source_3
1 Like
I think it does work – at least this simple version of the problem yields the current task graph.
The construction of the task graph is a bit sub-optimal, because we have to loop n times for n “locking conditions”, but that’s good enough for now. Hopefully with the full, more complex problem the loop will not take up too much time.
Thank you very much!
2 Likes