Image Segmentation Using Large Dask Array

I am relatively new to dask trying to perform threshold based image segmentation on a large (~100gb) dask array which i create from a zarr store and am struggling to debug it. I start a distributed client on a local cluster w/ 1.5TB of memory and have been relying on the dashboard to monitor the progress made. It seems a lot of time is spent transferring data, and I have found there are also periods of time (around 10 mins) in which seemingly no progress is made. Eventually the memory blows up and crashes.

I also frequently get the warning:

Event loop was unresponsive in Worker for 3.14s. This is often caused by long-running GIL-holding > functions or moving large chunks of data. This can cause timeouts and instability

Which I believe is associated with the data transfer issue. I am wondering how to improve the performance of my code if doing so is possible. Here is the jist of my algorithm.

I use dask.image functions when possible for filtering along with some scipy functions when there was not dask support. Pandas is used to wrap the results and allow for analysis later on. When I run this with a smaller array and no client it finishes w/o issue and in a reasonable time, so it is like my misuse of dask.distributed causing the issue

Hi @walterjw, welcome to Dask discourse forum!

It’spretty hard to look at your code in this form: an image is not ideal, and there is a lot of content in it! Could you try to simplify your code and provide a minimal reproducer of your issue?

If not, I would advise you to try to split up a bit your code to identify in which part the problem could be.

Thanks for the reply! I have not yet been able to create a minimal reproducer, but I have started to split everything into steps to see what works. The following snippet runs and completes as expected

    dask.config.set({'distributed.admin.tick.limit': '180s'})
    from dask.distributed import Client, LocalCluster

    cluster = LocalCluster(n_workers = 10, memory_limit = '40GB')
    client = Client(cluster)

    ap_blur = prepro_data
    ap_blur.values = dimg.ndfilters.gaussian_filter(ap_blur.data, sigma=(100,5))

    me = (np.finfo(np.float32).eps)
    ap_blur.data = ap_blur.data + me
    ap_blur.data = (ap_blur.data).persist()

    threshold_filter = 10

    log_norm = dar.log(ap_blur)
    u = log_norm.mean(dim='time')
    std = log_norm.std(dim = 'time')
    _lo = dar.exp(u - std)

However when I add these lines to the above code

channel_thresholds = dimg.ndfilters.minimum_filter(_lo.data, size = threshold_filter)
off = dar.where(ap_blur < channel_thresholds, True, False)
lbl_img, n_lbls = dimg.ndmeasure.label(off)
lbl_img.compute()

The scheduler seems to hang for sometime, and the dashboard shows no progress made for about 20mins before resuming. Could this be the result of calling compute on the dask_image object? From the client docs

  1. The Client registers itself as the default Dask scheduler, and so runs all dask collections like dask.array, dask.bag, dask.dataframe and dask.delayed

I notice that dask_image is not listed here. Could it be the case that the client is not registered as the scheduler for di objects and thus the computation is run in the local process? This to me would explain why no progress is shown on the dashboard. If this is true how can I run image computations on the distributed client? After the dashboard shows the client’s work has completed htop shows a huge increase in memory in my parent process well above the 400gb limit i set on the cluster. I’d like to be able to run everything on the client to allow control over memory usage since I am working on a shared machine.

I have managed to create something akin to a minimal reproducer. Here is my array read from a zarr store. To reproduce this on a different machine it should be sufficient to fill a similarly shaped zarr store with random data.
image

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers = 10, memory_limit = '40GB')
client = Client(cluster)

data = zarr.open(store = data_path, mode = 'r')
data = da.from_zarr(data)

The following addition computation seems to hang the scheduler for a long time after completion

>>>%%time
>>>data = data + 2
>>>result = data.compute()

CPU times: user 4min 11s, sys: 7min 54s, total: 12min 6s
Wall time: 11min 10s
Event loop was unresponsive in Scheduler for 5.22s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

The dashboard however shows that the computation takes just over a second.

Does your client machine possess enough memory to hold 154GiB of data in memory? Calling compute will gather all the data into your Client Python process.

Yes the machine has 1.5T of memory, so I don’t see how that would be the problem here. Even in my main workflow the most memory I ever need to hold is around 300GB since I trim the objects I use. I have tried something similar using ipython/cmdline and notice the same results. The dashboard indicates that the computation has completed, but it does not return.

I should add that I am using a single machine, but opted to use the distributed scheduler since it seems to give more control over resource usage.

One clarification: the pause time is probably spent copying all the image data from the workers and into the main process. Since the client can only handle one connection at a time, this is slow, and memory is necessarily duplicated in the meantime. It is not a good idea to .compute() something big, since if there was a good reason to have it distributed on workers in the first place, there’s an equally good reason not to copy it all into one process. If it does fit in one process easily, maybe you don’t need dask!

To investigate, I bet the following happens much faster:

>>> result = data.max().compute()

because although it handles just as much data, only a small fraction of that is copied back.

So: do what you need to do via dask-image and dask-array only, which will use the workers, and don’t call .compute() when the result would be big.

1 Like

My primary intention with Dask is the parallel interface for numpy and scipy.ndimage. At some point I will need to call compute on a large image. Based on what you’re saying about copying data back to the client, might it be best to use a single process with multi threading instead of a client and worker processes?

might it be best to use a single process with multi threading

Worth a try!

Is it possible to put the scheduler in one process and the multithreaded worker in a separate one? Using a single process has helped some of the memory issues, but significantly slowed my code, which I suspect has to do with the scheduling overhead since I have a fairly large task graph

The threaded scheduler has the lowest overhead, but suffers from the GIL - which should not be an issue for pure-array operations normally.

You can have distributed run a single worker with threads only, but you will still have communication between client and worker. You can also run distributed in-process (processes=False), just to see if the graph-optimisation done for distributed is helping you.

Went ahead and ran with client = Client(processes = False) and ended up with ~80000 tasks in the first steps of the workflow and there is still a lot of communication time. This is the jist of my current pipeline

data = di.ndfilters.gaussian_filter(data, sigma = (100,5))
data = data + machine_epsilon

#Compute segmentation thresholds 
threshold_filter = 10
log_norm = dar.log(data)
u = log_norm.mean(dim = 'time')
std = log_norm.std(dim = 'time')
_lo = da.exp(u - std)
thresholds = di.ndfilters.minimum_filter(_lo, size =thershold_filter)

#Segment image
off = da.where(data < thresholds, True, False)
lbl_img = di.ndmeasure.label(off)

min_lbl = lbl_img[lbl_img > 0].min()
max_lbl = lbl_img.max()
lbls = da.arange(min_lbl, max_lbl +1)

#Get centers of mass for labeled subregions
com = di.ndmeasure.center_of_mass(data, lbl_img, index = lbls)
pk = di.ndmeasure.minimum(data, lbl_img, index = lbls)

return lbl_img, com, pk, lbls

To me this seems complicated, but I am unsure of how to improve it. What are best practices for this kind to simplify the task graph?