Taking max projection of a large array

Hii everyone

I have a large lightsheet imaging data in the form of large number of .tiff files. In the initial stages of analysis, I want to take maximum projection of the data along one axis. For this, I am loading the images as a dask array using dask.delayed, then perform median subtraction and after which I want to take maximum projection.

I am currently using our institute’s HPC which is managed through Slurm. I first login to a compute node and then setup dask cluster as follows :

from dask_jobqueue import SLURMCluster
from dask.distributed import Client

cluster = SLURMCluster(queue='quick',cores=16 ,memory='32GB',walltime='01:05:00')

cluster.scale(jobs=10)
client = Client(cluster)

This is how one of the dask array corresponding to a channel looks:

rfp
dask.array<stack, shape=(4209, 13088, 7415), dtype=uint16, chunksize=(1, 13088, 7415), chunktype=numpy.ndarray>

Currently, it’s taking about 10 minutes for taking the max projection along axis=0. Albeit, it also displays the following warning :

UserWarning: Sending large graph of size 22.27 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.

I want to understand if chunking the data in a specific way would help increase the speed of this process? When I rechunk the data along axis=0 to have chunk size 500 and then perform the max projection, it gives following error :

> 2024-06-26 17:28:49,619 - distributed.scheduler - ERROR - Task ('rechunk-merge-chunk_max-9854bcb041246ed32ffe88d6efb54f51', 7, 0, 0) has 90.38 GiB worth of input dependencies, but worker tcp://10.56.1.79:44823 has memory_limit set to 14.90 GiB

At a conceptual level, I am not able to understand how dask distributes this computation across requested nodes and how the computation is done on each node individually. Intuitively, I would imagine rechunking the data along axis=2 and axis=3 into smaller pieces and then take maximum projection on each chunks. And then if these computations are run parallely somehow. Sorry for terrible lack of understanding of how things work.

Hi @dub2s,

Could you provide a minimal reproducer, or at least a code snippet of your Dask computation? You probably serialize some object in the graph. 22MiB is not so much though.

Well, rechunking can be an expensive operation, not always necessary to compute a maximum, median is more complicated though. Since you’re working on axis 0, it would definitly help to have chunks that contains this complete dimension.

However, if I understand correctly, you are trying to create chunk of size (500, 13088, 7415), which means 90 GiB, way to big. You should try to have chunks of around 100MiB.

Maximum is simple, you can do it per chunk and just reduce the results. Median is a bit more complex though.

Hii @guillaumeeb

Thank you for your response. Here is the code I am using.

import os
from tqdm import tqdm
from skimage.io import imread
from dask import delayed
import dask.array as da
import numpy as np
import napari
import time

def import_process(dir_path,folder_channel_ls):
	path = dir_path
	filenames_d = {}
	for i in folder_channel_ls:
		filenames = os.listdir(path+i[1]+'/')
		filenames = [x for x in filenames if '.tif' in x]
		filenames.sort()
		filenames = [path+i[1]+"/"+x for x in filenames]
		filenames_d[i[0]] = filenames
	print('Finished Sorting directories and filenames')
	## lazy reading of channels
	lazy_imread = delayed(imread)
	sample = imread(filenames_d[i[0]][0])
	image_d = {}
	for j in list(filenames_d):
		lazy_arrays = [lazy_imread(image) for image in filenames_d[j]]
		dask_arrays = [da.from_delayed(delayed_reader, shape=sample.shape, dtype=sample.dtype) 
		for delayed_reader in lazy_arrays]
		image_d[j] = da.stack(dask_arrays,axis=0)
	print('Finished Lazy Reading')
	median = da.median(da.stack([image_d[x] for x in list(image_d)]),axis=0)
	subtracted_d = {}
	for i in list(image_d):
		subtracted_d[i] = image_d[i] - median
	return subtracted_d

After importing and processing a bit then I setup the cluster.

from dask_jobqueue import SLURMCluster
from dask.distributed import Client

cluster = SLURMCluster(queue='normal',cores=32 ,memory='64GB',walltime='03:00:00')

cluster.scale(jobs=4)
client = Client(cluster)

def max_x(val):
	return val.max(axis=1)

def max_y(val):
	return val.max(axis=2)

def max_z(val):
	return val.max(axis=0)

from skimage.io import imsave

max_p_x = {}
max_p_y = {}
max_p_z = {}

for i in tqdm(list(sb_crop1)):	
	dat_sc = client.scatter(sb_crop1[i])
	result = client.submit(max_x,dat_sc).result()
	print('Starting X-projection')
	start_time = time.time()
	max_p_x[i]=result.compute()
	print(i,time.time()-start_time)
	print('Starting Y-projection')
	result = client.submit(max_y,dat_sc).result()
	start_time = time.time()
	max_p_y[i]=result.compute()
	print(i,time.time()-start_time)
	print('Starting Z-projection')
	result = client.submit(max_z,dat_sc).result()
	start_time = time.time()
	max_p_z[i]=result.compute()
	print(i,time.time()-start_time)	

imsave('sb_sk_crop1_x_maxproject.tiff',np.stack([max_p_x[i] for i in list(max_p_x)]))
imsave('sb_sk_crop1_y_maxproject.tiff',np.stack([max_p_y[i] for i in list(max_p_y)]))
imsave('sb_sk_crop1_z_maxproject.tiff',np.stack([max_p_z[i] for i in list(max_p_z)]))

Going through some questions about the warning, I was not sure whether the data chunks are read individually by the workers. I was also thinking if storing image as zarr_arrays or ome-zarr arrays would be beneficial.

So, the first part of the code looks good to me, your building a big Dask Array and take the median.

This however sounds weird, why are you not setting up the Cluster before computing the median?

Also, I have trouble following the second part, you submit a lot of function and take the result right then, sending data in and not taking advantage of parallelization. I don’t think this is the correct way of doing what you want.

Hii @guillaumeeb

Thank you for the reply. I thought that the import function that includes lazy reading the data and subtraction of median projection does not compute until I setup the cluster and then call compute specifically.

The end result I am trying to achieve is follows:

  1. Take a median projection of the image across 3 channels (or 1 axis). Each channel has an array of shape: (z,x,y). Then subtract this median from each channel to get “cleared” channels for further processing (or visualization).

  2. After this, calculate maximum projection along each of the axis (z, x and y) for the 3 channels.

At the end of processing, I expect to get a 2-D array each for x, y and z-max projections (for each channel).

Do I need to compute median (and then subtraction) first before I call maximum projection along each axis?

Correct, if you don’t call compute before setting up the Cluster, then it’s OK.

Well, not necessarily. In the end, it all comes to setting the right chunks to ease the computation. It would help if you provided a reproducible example for a single channel in order to help. Also, this kind of workflow looks very similar to what is done in the Pangeo community (https://discourse.pangeo.io/). I’m ccing some of them here, but you might want to post there if they don’t answer.

cc @rabernat @TomAugspurger @keewis

1 Like