# How to properly use Dask delayed on a function that calls other functions

Hello everyone,

I’ve recently started using Dask to parallelize some functions, but I have some doubts on how to use it properly and efficiently.

I’ll describe you the function I’m trying to parallelize, I don’t think it’s important the actual function, I just need to understand what to do.
I’m basically computing a metric on multiple images with their corresponding ground truths. Now, let’s consider a single image, which has multiple channels (so dimension is H x W x C). The metric takes as input the image and its ground truth, and then start computing “things” on sub-matrices of that image (let’s say, for example, 32x32 sub-arrays), and for each sub-matrices other “things” are computed on each channel. So, the code is something like:

``````def compute_metric(im1, im2):
for i in range(channels):
im1[:, :, i] = another_function(im1[:, :, i])
im2[:, :, i] = another_function(im1[:, :, i])
# other computations on the images
...
...
return computed_metric

def metric(image, gt, block_size, shift)
for j in range(step_x): # number of sub-arrays on X dimension
for i in range(step_y): # number of sub-arrays on Y dimension
values[i, j, :] = compute_metric(image[j * shift : j * shift + block_size, i * shift: i *shift + block_size, :],
gt[j *shift : j * shift + block_size, i * shift : i * shift + block_size, :])
return metric

``````

So, basically, the function I call to compute the metric calls other functions to actually compute the metric.
Now, the images are really big and with lots of channels, so I was trying to parallelize the code. I tried to add the decorator delayed to the function `compute_metric` and change the function `metric` appropriately (so I’m not loggin the values in a matrix anymore, I’m just appending them to a list, and finally computing the sum), and finally launch it, but the computation was slower than the base code with no parallelization. How should I use delayd in this context? I mean, should I add the decorator to each function that is called by `compute_metric` (like `another_function`)?

Thanks in advance for the help!

Hi @notprime, welcome to Dask community,

I think you want to have a look at Dask Arrays, I imagine you are currently using Numpy?. At first, I thought your code was embarrassingly parallel, but looking closer, you want to achieve a moving window across the first 2 dimensions in a given channel, is that right?

Depending on your Array shape, there might be different optimal solutions for this. Could you tell us more about it?

For example, a really simple solution would be to chunk the image along the channel dimension, and then apply almost the same code as the one above to each of these chunks in parallel.

Hi @guillaumeeb ,
thanks for the answer!

Things are getting interesting , I’ll go have a look at Dask Arrays immediately.

So, there you can find the metric:

In particular, I’m using the function `Q2n` (line 294), which calls `q_index_metric` (line 221) which, in turn, calls other functions. I can’t chunk the images along the channel dimension, because they are all needed to compute the metric.

Basically the images can be either pretty big (let’s say 3000x3000x200 [H x W x C]) or small (let’s say 200x200x200 [H x W x C]) but in a large number (I’m basically computing this metric over images generated by a model, either in the inference or test phase, respectively).

My idea was to parallelize the moving window step (so the double `for` loop) and I thought that just adding the `delayed` decorator would magically solve all of my problems

What type of pixel? float64? That would mea 13GiB array?

OK, so this will not be the simpler way ;). I guess you’ll need to look at `map_overlap` function. What is your typical window size and shift? You will need to chunk on H and W, or at least on one of them, and use a `depth` that allows you to have enough adjacent pixels in one chunk.

Let’s say your window size is 32, and your shift 16. You could use chunks of size 3000x32x200, with `depth=16`, and then just move your window on two rows in each chunk.

Float32, but the images can be even bigger (let’s say 7000x7000x200ish, 40GiB circa). Or, other case, if I compute the metric during the test phase, I have 1000/2000 images of dimension 200x200x200ish

Tipically window size is 32 and shift is also 32, there is no overlap of the sub-patches, so I was looking at `map_partitions` and `map_blocks`.

I also managed to get the `delayed` decorator work and the speed increase is stunning, so maybe this is enough, but I tried to launch the script on a big image (again, 7000x7000x200ish), and after some time I got the error `signal 9: SIGKILL` (obviously a RAM issue, even if I’m using a HPC with 512GB of RAM). I switched to a node with 1024GB of RAM, the computation was going on but later I got a time error (don’t remember it, I’ll retrieve it tomorrow asap). So I was thinking if it is okay to call `compute` multiple times to avoid this huge memory issue and append the computed values in a new list to be used later. Or maybe with `map_partitions` or `map_blocks` I can solve the problem in a easier way, what do you think?

Btw thanks again for the help, it’s fun when you start making things work and stumble upon new problems, I’m really starting to like dask lol

This simplifies things a lot!

First, how are you currently loading the input image? With Numpy only? I’ll guess this will work using Numpy + Delayed on it, but I don’t really like this pattern, ths will probably lead to more data excgange between processes and it will be difficult to scale on larger images.

What I would do is read the input image (or images in the case of the test) using Dask Array with an appropriate block size (like (512, 512, 200) for example in the case of the real workflow). Then, just a map_block on each chunk using your `metric` function. You shouldn’t run into any memory issue.

So, again, there are subtle differences between the images.
The big images are `.tif` files, so I first use `rasterio/gdal` to oper the image, then convert it into a numpy array if necessary. While the test images are already saved as `.npy` files, so I just load them using numpy.

Obviously your idea is way better than mine. Just to check if I got it, I first load the image, than I chunk it using Dask Array (maybe 510x510 is a bit high, the metric is really slow, maybe 300/400ish is better), and then I parallelize the computation on the sub-chunks using `map_block`, and then I gather the results appropriately. Just to put it into code, it should be something like:

``````array = dask.from_array(x, chunks = (510, 510, 200)) # H and W must be divisible by 32
results = dask.array.map_blocks(metric, array, chunks = 200).compute()
""" chunks needed in map_blocks because metric computes, for each
32x32x200 chunk a 1D array of lenght 200 """
``````

But I have some doubts:

1. if I use `from_array` on a numpy array, obviosly I need to load the image into memory as a numpy array, there is no escape from this, right?
2. From the docs, `map_blocks` map a function across all blocks of a dask array, so, in this case, what happens is that I’m actually parallelizing over chunks of the whole image, right? Because `metric` will be applied in parallel to each sub-chunk of dimension 510x510x200, am I right? While with delayed I’m just parallelizing directly on the 32x32x200 sub-patches.

I was also thinking, for the test phase, being the images all of dimension 200x200x200ish loaded using `DataLoader` from pytorch, that maybe is best to just use `delayed` and to call `compute` for each image and, finally, gather all the results.

What do you think?

This is the most important point! You should find a way to avoid loading the full image into memory from the main process. If you do this, there is not much advantage using Dask Arrays. The idea would be to load the image directly by chunk on Dask worker. I believe it is possible from a rasterio point of view to load by chunk, I think Xarray is already doing this. So you don’t want to use `from_array`. If you don’t find a built-in reading function, then you might have to played with Delayed, have a function taht is able to read a chunk, and user `dask.array.from_delayed`.

Yes you are right!

Which generates a lot of small tasks where the data is sent from the main process to the workers, which you want to avoid.

Yes, in this case this would be perfectly okay. You could also built a Dask Array from all the images and apply map_blocks, but it’s quite the same performance wise I think.

Hi @guillaumeeb ,

thanks again for the reply, it was very resourceful!
Even the HPC is on vacation, I’ll try to use map_blocks asap and I’ll let you know!
I’d like to ask you another similar question:
I’m also computing another metric, in this case there is no moving window, the metric is computed between two images, but for each channel independently. So, let’s say we have two images `7000x7000x200`, the metric will compute `200` values, one for each pair of channels. I thought that being the computation only performed over `200` channels, it would have been enought to use `delayed`, and this is a little reproducible code of how I coded it:

``````def metric_delayed(im1, im2, im3):

@dask.delayed
def compute_metric():
q_high = metric(im1, im2)
q_low = metric(im2, im3)
metric = np.abs(q_high - q_low) ** 2
return metric

nbands = im1.shape[-1]
metric_vals = []
for i in range(nbdands):
metric_i = compute_metric(im1, im2, im3)
metric_vals.append(metric_i)
metric = dask.compute(metric_vals)
return metric

if __name__ == "__main__":
with LocalCluster(n_workers = int(0.8 * mp.cpu_count()),
processes = True, memory_limit = "auto",
threads_per_worker = 1
) as cluster, Client(cluster) as client:
a = np.random.rand(7000, 7000, 200)
b = np.random.rand(7000, 7000, 200)
c = np.random.rand(7000, 7000, 200)

value = metric_delayed(a, b, c)
``````

I tried to use 0.8 of the total cpus, but I quickly ran into memory issue (again D:), so I later tried to use only 12 cpus, and managed to make it work, even if it was always consuming 500/600giga of RAM. I think that the best choice is again to use `map_blocks` slicing over channel dimension (and also to use gpus, given that the actual metric computes some convolution using pytorch, just to speed things up), but I’m curious to know how `delayed` actually work under the hood and why the ram fills up so fast. Obviously I expect delayed to actually create 48 processes for example if `n_workers = 48`, but three `7000x7000x1` float32 numpy arrays can’t need 500/600 giga of RAM, right? Or am I missing something?

So first here, you’re loading three entire images of 37 GB into your client process memory.

Don’t you slice the input images keeping only one channel here? Because reading this code, I’m under the impression that each delayed call will have to take in input the entire three images with all channels!

Not sure if I understand your use case correctly, but this is how I’d do it:

``````import dask.array as da
import numpy as np
from distributed import Client

def metric_blocks(im1, im2, im3):

def metric(im1, im2):
return np.sum(im1+im2)

q_high = metric(im1, im2)
q_low = metric(im2, im3)
metric = np.abs(q_high - q_low) ** 2
return np.array([[[metric]]]) # I don't very much like this, but I've got trouble returning a scalar changing shape of output array

a = da.random.random((7000, 7000, 200), chunks=(7000,7000,1)).astype(np.float32)
b = da.random.random((7000, 7000, 200), chunks=(7000,7000,1)).astype(np.float32)
c = da.random.random((7000, 7000, 200), chunks=(7000,7000,1)).astype(np.float32)

result = da.map_blocks(metric_blocks, a, b, c, chunks=(1,1,1))

client = Client()

result.compute()
``````

One of the key part is again to be able to load the data directly by chunk, this will greatly optimized the process, and make it able to run even on a low memory setup, I’ve been able to run this with only 20GB of memory (and probably didn’t really used more than 10GB).

Well, I just omitted the slicing part, obviously I’m passing `im1[7000, 7000, i]`, `im2[7000, 7000, i]` and `im3[7000, 7000, i]` to `compute_metric`, slicing over the channel dimension, that’s why I was wondering why `delayed` was consuming so much memory: if I understand correctly, the three 37 GB arrays are loaded only in the client process memory, and the processes just get the sliced data, right? If this is the case, I can’t figure out where is the memory going lol. I know it’s a bad practice to use `delayed`, but I’m curious

By the way, your script is exactly how I’m computing the metric at the moment, using `map_blocks` and slicing over the channel dimension.
I don’t remember how I actually solved the shape problem while returning the metric, probably it’s due to the chunk size specified in `map_blocks`, maybe setting `chunks = 1` or `chunks = (1, 1)` will solve the issue, I’ll let you know asap.

Thanks again for the help

1 Like

Well, I would assume so too, so I’m not sure about that. The main problem would be that every slice is passed through a graph and so through the Scheduler, which might be really bad for performance.

Great, that could help other people!