Computing a global array only once, if needed

Hi,

I am working on a problem where I need different dask workers to use have access to an entire array that can itself be lazily computed. The easiest way to explain this is with an example. We first set up a dask array for which the internal details are not important – it is an array that might be expensive to compute:

import numpy as np
import dask.array as da
from dask import delayed

@delayed(pure=True)
def make_global_array():
    print('Making global array')
    return np.random.random((2048, 2048))

global_array = make_global_array()

I could also simply write:

global_array = da.random.random((2048, 2048))

for this example but I am using the code above to more easily be able to see when that array is actually computed. The details of that array are unimportant but in the real-life scenario we have it is an array that could potentially be computed efficiently in parallel and for which the computation could be expensive.

I will now set up another, regular dask array:

array = da.random.random((1024, 1024), chunks=(512, 512))

I then want to operate over chunks of this array, and for each chunk I need to call a C function that needs the data for that chunk, as well as the entire global_array, as C-contiguous buffers. The C function might look like:

def c_function(a, global_array):
    return a + global_array.mean()

Again the details inside the function are unimportant above but all the chunk data and all (not just part of) the global array are needed.

I can then use map_blocks to iterate over chunks and do the operation I need to do:

def process(array):
    
    def do_computation(a, block_info=None):
        computed_global_array = global_array.compute()
        return c_function(a, computed_global_array)
    
    return da.map_blocks(do_computation, array)

Note that above I need to compute the global array before passing it to the C function.

Finally, I compute the result array:

dask_result = process(array)
dask_result.compute()

At this point, this will print the following output:

Making global array
Making global array
Making global array
Making global array

The bottom line of the issue I am trying to solve is that I want to make sure the global array gets computed only once, using all available workers, rather than be computed once per worker or chunk.

An obvious solution is to move:

computed_global_array = global_array.compute()

outside of the do_computation function as such:

def process(array):

    computed_global_array = global_array.compute()
    
    def do_computation(a, block_info=None):
        return c_function(a, computed_global_array)
    
    return da.map_blocks(do_computation, array)

and the global array gets computed only once but it will get computed during the following step:

dask_result = process(array)

not during the compute step:

dask_result.compute()

What I really want is for process(array) to be fast and not do any computation, but for the computation of the global array to happen at most once when the resulting dask array (or an array depending on it) is computed.

Does anyone see a way to achieve this?

For background, this is for the astropy reproject package, and the internal C functions are similar to (but not exactly the same as) map_coordinates, in that each chunk of the output array might be some arbitrary combination of points sampled from anywhere in the input array, which is why the internal C functions need access to the whole input array.

Hi @astrofrog, welcome to Dask community!

Thanks a lot for this really detailed problem, this helps a lot!

I think there is not a lot of change to do to achieve what you really want.

Your code is currently building two different Dask graphs, and the one making the global_array is just sent to every worker without being computed. Each workers have no way to connect its result with others.

You just need to make the global Dask graph aware of its dependency to the global_array construction graph.

I just modify your code as below:

def c_function(a, garray):
    return a + garray.mean()

def process(array, garray):
    def do_computation(a, garray, block_info=None):
        return c_function(a, garray)
    
    return da.map_blocks(do_computation, array, garray)

dask_result = process(array, global_array)

This way, Dask scheduler knows each block needs the same input, which can be verified by visualizing the Dask graph:

Let me know if it solved your problem or if I missed anything.

Thanks @guillaumeeb, that indeed solves my issue!

1 Like

@guillaumeeb I have run into an issue with the approach you have suggested - in the case where by chance global_array and array have the same shape and chunk size, da.map_blocks will pass only a chunk of the global array to each do_computation call. Is there a way to prevent this from happening?

Yes, absolutely. The solution is working because in the example you return a Numpy Array. I see two potential solutions:

  • Continue to use a delayed function call, and convert your Dask Array result using compute instead of returning it directly inside this function.
  • Rechunk the dask array so that it has only one chunk.

This work only if your result fit inside a Worker memory!

Ok that makes sense, thanks! I am running into another issue related to the scheduler when using this approach - I am trying to use the ‘processes’ scheduler and limit the number of processes, which normally works fine but not here. If I take the complete example and now make it so that the ‘C function’ raises an error:

import numpy as np
import dask.array as da
from dask import delayed

@delayed(pure=True)
def make_global_array():
    print('Making global array')
    return np.random.random((2048, 2048))

global_array = make_global_array()

array = da.random.random((1024, 1024), chunks=(256, 256))

def c_function(a, garray):
    raise ValueError()

def process(array, garray):
        
    def do_computation(a, garray, block_info=None):
        if a.ndim == 0 or block_info is None or block_info == []:
            return a
        print('processing chunk...')
        return c_function(a, garray)
    
    return da.map_blocks(do_computation, array, garray)

dask_result = process(array, global_array)

import dask
with dask.config.set(scheduler="processes", num_workers=2):
    dask_result.compute()

Then if I set num_workers to 2 i see the message ‘processing chunk’ 12 times which means that it might be running 12 separate processes to do the computation before erroring. If I set num_workers to 1 I get the message 6 times, and if I set it to 4 I see the message 16 times. If I have 4 workers, I would have expected the message to be printed out at most four times? Or am I misunderstanding what is happening behind the scenes?

I’m not totally sure of what happens behind the scene, but my guess is that in multiprocessing mode, the Scheduler is sending several tasks to each workers before getting the result, so the workers is trying to process several chunks before the Scheduler gets back the Error and stops the computation. You can see that each chunk processed is different when looking into the block_info kwarg.