Using Dask with ITK

Hello all,

I hope my question in this category is ok. I am pretty new to dask and wanted to see if it is possible to combine ITK with it.
I read on the previous release candidate post of ITK that ITK images should be compatible with dask. Still, I wanted to hear from the experts here first if it would be reasonable to venture deeper into making dask with ITK work (maybe converting them somehow into dask-arrays and storing the metadata separately?), especially for writing single volumes in multiple processes and the same for resampling images. I am not sure if I grasped the proposed compatibility correctly.

I already found the blog post from back in 2019, but using the filter, there is not translatable to resampling or writing an image.

Again, I hope this question is appropriate, and someone can give me a hint or more information.

Hi @Keyn34, welcome to this forum.

Unfortunately, I’m not sure there has been significant work on Dask side on ITK compatibility since the blog post you mentioned. And I have not been able to find actual documentation on ITK side about what changed in 5.3 RC4 version that may have improved it (Maybe extended support for Numpy Arrays?).

I’m clearly not an expert in image processing and ITK. I think the best would be to post in their forum and ask more there (you already posted one thread I see).

Could you at least explain a bit more operations you want to perform, and what would you need to be able to execute them with Dask?

1 Like

Hello @guillaumeeb,

Thank you for the welcome and your response.

So, in a nutshell, when working with medical images (especially images of the whole human body), the images get pretty large. My idea was to speed up multiple processes with dask.

For example, I tried to read two images in parallel, resample them and write them back to disk. The code below should explain what I’m doing, which works well:

# This is needed as a code guard
if __name__ == '__main__':
    # Create a Dask cluster and client
    cluster = dd.LocalCluster()
    client = dd.Client(cluster)

    # Load a list of image filenames into a Dask bag
    filenames = db.from_sequence([moving_image, fixed_image])

    # Use Dask to parallelize image loading
    start_time = time.time()
    images = filenames.map(load_image)
    images.compute()
    print(f"[DASK ITK] read took {(time.time() - start_time):.2f} seconds.")

    # Use Dask to parallelize image resampling
    start_time = time.time()
    resamples = images.map(resample_itk)
    resamples.compute()
    print(f"[DASK ITK] resample took {(time.time() - start_time):.2f} seconds.")

    # Use Dask to parallelize image writing
    start_time = time.time()
    writes = resamples.map(write_itk, os.path.join(working_directory, "test.nii.gz"))
    writes.compute()
    print(f"[DASK ITK] write took {(time.time() - start_time):.2f} seconds.")

I now want to take this further, and instead of working on images in parallel, I want to work on chunks of one image in parallel.
I was just curious if I need to preprocess the itk image to divide it into chunks or if dask accepts itk images and can divide them into chunks itself. I hope my explanation is understandable, as I have only limited knowledge of dask for now.

Thanks again!

First, some comments on the code above:

  • I think that if you don’t have many images, you should go with the Delayed API instead of Bag.
  • Every time you call .compute() on a lazy operation, you trigger this operation and bring back the result on main process memory, which takes unnecessary time and resources. It’s acceptable to do this for debugging purpose, but I just wanted to be sure you were aware of that.

Is ITK image a file format in itself? Or are you using standard formats such as TIFF?
If this is the first choice, then I’m afraid Dask doesn’t propose anything.

How would you want to load this image chunks? As Numpy arrays? To build a Dask Array in the end?
Is there some way of reading only part of a ITK image in ITK APIs?

Thank you for pointing those two things out. This was just a test case and basically took one code sample and tried to adopt it. I called compute just to see how long each step would take. But good to know that it may reduce computing time.

The images themselves are in nifti (.nii) or compressed nifti (.nii.gz) format. ITK image is the class that the image is stored in after loading it, basically the container that stores the metadata and the image array. This is a general class for loading any kinds of images. It is convertable to an numpy array but the ITK post I mentioned above states that ITK images are now natively supported by dask.

Here is actually an example to load and process in chunks with ITK I just found. This actually might be useful and worth a try. I will try to look into this in more detail.

Well, aside from the old Dask blog post, and the statement in the ITK post, I’ve not been able to find anything about native ITK images in Dask. You should ask ITK folks if they have some details about that.

Okay, the code is a bit complicated, but if you want to give it a shot, I recommend using Delayed API.

Basically, you should do something like that:

from dask import delayed
delayed_chunk_process = []

def read_chunk(start, size):
    region = itk.ImageRegion[Dimension]()
    region.SetIndex(start)
    region.SetSize(size)

    # now that we have our chunk, request the filter to only process that
    median.GetOutput().SetRequestedRegion(region)
    median.Update()
    result = median.GetOutput()
    return result

def my_itk_process(itk_object):
    # Do something with the itk chunk

for z in range(zDiv):
    start[2] = int(fullSize[2] * float(z) / zDiv)
    end[2] = int(fullSize[2] * (z + 1.0) / zDiv)
    size[2] = end[2] - start[2]

    for y in range(yDiv):
        start[1] = int(fullSize[1] * float(y) / yDiv)
        end[1] = int(fullSize[1] * (y + 1.0) / yDiv)
        size[1] = end[1] - start[1]

        for x in range(xDiv):
            start[0] = int(fullSize[0] * float(x) / xDiv)
            end[0] = int(fullSize[0] * (x + 1.0) / xDiv)
            size[0] = end[0] - start[0]

            delayed_read = delayed(read_chunk)(start, size)
            delayed_process = delayed(my_itk_process)(delayed_read)
            delayed_write = ...
            
            delayed_chunk_process.append(delayed_write)

dask.compute(*delayed_chunk_process)

This would be the simplest, but you could also convert the chunk read into Numpy arrays, and build a Dask Array from that.

1 Like

Thank you so much, @guillaumeeb. I really appreciate your support and effort in helping me out with this. I will try the example you provided and the alternative suggestion in converting the itk image into dask-array and report back if I get it working. Thanks again!

1 Like

Hi @Keyn34 ,

A few points that may be helpful:

  • itk-5.3.0 (the latest version as of this writing) has greatly improved dask support, so make to use this version or newer
  • If your files are Nifti volumes, then you can convert them to xarray image representation,
image = itk.imread('volume.nii.gz')
data_array = itk.xarray_from_image(image)
# Chunk as preferred: https://docs.xarray.dev/en/stable/generated/xarray.DataArray.chunk.html
# process in chunks with map_blocks, map_overlap, etc.
result = itk.image_from_xarray(processed_data_array)
itk.imwrite(result, 'result.nii.gz')

You can also convert Nifti’s to OME-NGFF-Zarr, storing the data as dask arrays under the hood with multiscale-spatial-image or ngff-zarr.

Hope this helps.

1 Like

As per the Dask tutorial present at Image Processing, they created separated chunks and stored them for parallel processing later. So something similar can be done. Either using NumPy or ITK.

2 Likes

Hi @thewtex, Hi @PranjalSahu,

Thank you a lot for the response and the helpful links. For now, I resolved the issue with a combination of scipy and dask. For anyone curious, I posted a sample code in the ITK forum. Maybe @guillaumeeb can also point out points of improvement, but the code snippet did work immensely well for me and sped up the process nicely.

2 Likes

That’s good new @Keyn34!

Just looked at your code sample, the only thing that is bothering me is on the IO operations. Looks like you are reading and writing on Client side. I’m talking about the itk_image = itk.imread(input_image_path) part. That means you are reading data in the master process/thread, and then sending parts of it to the Dask Cluster using from_array. I guess it is the same when writing it back.

This won’t be too much of a problem if the image is not too big, and the computation time is some order of magnitude greater than IOs. But ideally, you should only read metadata on Client side, and do the real data IO on workers side.

1 Like

Thanks for pointing that out, @guillaumeeb.
You are right. It is the same for reading and writing. The images are at a size of around 700MB each. Reading is done quickly, but writing takes a long time as the image is being compressed (*.nii.gz).

Would it make sense to look into that as well? I think the output file format that I need might be a problem.

Thanks again!

If you are stuck with ITK for IO and this is not taking too much time compared to the computation, then it is not a problem.

1 Like