Hey there,
I’m new to dask and I’ve run into a situation. I’ve adapted an anisotropic diffusion filter for smoothing CT images. And when runing it using map_block and compute it, I do not get the dask to concatenate the output.
Workflow
I have a list of unit8 png’s which reside on my disk. I load them delayed like this:
Loading
filenames = sorted(glob("path/to/*.png"), key=alphanumeric_key)
sample = imageio.v3.imread(filenames[0], mode='L')
# Use a faster reader and parallelize the reading
lazy_arrays = db.from_sequence(filenames).map(lambda x: imageio.v3.imread(x, mode='L')).compute()
# Convert to Dask arrays and stack
dask_arrays = [da.from_array(arr, chunks=sample.shape) for arr in lazy_arrays]
stack = da.stack(dask_arrays, axis=0)
The function I used is an adaption of the code of awangenh github code. when using it on a single image it works very well and as expected. I’ve adatpted the code to work with the rapids
library (cupy
) .
Custom function
import cucim.skimage.util as csku
import cupy as cp
def anisodiff_gpu2(img, niter=1, kappa=50, gamma=0.1, step=(1.,1.), option=1):
if img.ndim == 3:
#warnings.warn("Only grayscale images allowed, converting to 2D matrix")
img = img[0,:,:]
img = csku.img_as_float(img)
imgout = img.copy()
deltaS = cp.zeros_like(imgout)
deltaE = deltaS.copy()
gS = cp.ones_like(imgout)
gE = gS.copy()
for _ in range(niter):
deltaS[:-1,: ] = cp.diff(imgout,axis=0)
deltaE[: ,:-1] = cp.diff(imgout,axis=1)
if option == 1:
gS, gE = cp.exp(-(deltaS/kappa)**2.)/step[0], cp.exp(-(deltaE/kappa)**2.)/step[1]
elif option == 2:
gS, gE = 1./(1.+(deltaS/kappa)**2.)/step[0], 1./(1.+(deltaE/kappa)**2.)/step[1]
NS, EW = gS*deltaS, gE*deltaE
NS[1:,:] -= deltaS[:-1,:]
EW[:,1:] -= deltaE[:,:-1]
imgout += gamma*(NS+EW)
return cp.asarray(imgout)
Computing
I define a function which is operating on my blocks
def process_block2(block):
block = cp.asarray(block)
# Perform anisotropic denoising
block = sf.anisodiff_gpu2(block, niter=50, kappa=8, gamma=0.166, option=1)
return block
and executing it like this:
import dask as da
# Perform denoising on each block
vol_Den2 = da.map_blocks(process_block2, stack, dtype='float32') #,
# Persist the data in memory
vol_Den2 = vol_Den2.persist()
# Compute the result
vol_Den2 = vol_Den2.compute()
when monitoring the process with dask distributed dashboard, I can see that somethin is happening. But when vol_Den2.compute()
is executed it seems to have trouble to concatenate the arrays together.
Error Traceback
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[6], line 8
5 vol_Den2 = vol_Den2.persist()
7 # Compute the result
----> 8 vol_Den2 = vol_Den2.compute()
File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
318 def compute(self, **kwargs):
319 """Compute this dask collection
320
321 This turns a lazy Dask collection into its in-memory equivalent.
(...)
340 dask.compute
341 """
--> 342 (result,) = compute(self, traverse=False, **kwargs)
343 return result
File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/base.py:630, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
627 with shorten_traceback():
628 results = schedule(dsk, keys, **kwargs)
--> 630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/base.py:630, in <listcomp>(.0)
627 with shorten_traceback():
628 results = schedule(dsk, keys, **kwargs)
--> 630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:1282, in finalize(results)
1280 while isinstance(results2, (tuple, list)):
1281 if len(results2) > 1:
-> 1282 return concatenate3(results)
1283 else:
1284 results2 = results2[0]
File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:5299, in concatenate3(arrays)
5297 if concatenate_lookup.dispatch(type(advanced)) is not np.concatenate:
5298 x = unpack_singleton(arrays)
-> 5299 return _concatenate2(arrays, axes=list(range(x.ndim)))
5301 ndim = ndimlist(arrays)
5302 if not ndim:
File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:403, in _concatenate2(arrays, axes)
401 return arrays
402 if len(axes) > 1:
--> 403 arrays = [_concatenate2(a, axes=axes[1:]) for a in arrays]
404 concatenate = concatenate_lookup.dispatch(
405 type(max(arrays, key=lambda x: getattr(x, "__array_priority__", 0)))
406 )
407 if isinstance(arrays[0], dict):
408 # Handle concatenation of `dict`s, used as a replacement for structured
409 # arrays when that's not supported by the array library (e.g., CuPy).
File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:403, in <listcomp>(.0)
401 return arrays
402 if len(axes) > 1:
--> 403 arrays = [_concatenate2(a, axes=axes[1:]) for a in arrays]
404 concatenate = concatenate_lookup.dispatch(
405 type(max(arrays, key=lambda x: getattr(x, "__array_priority__", 0)))
406 )
407 if isinstance(arrays[0], dict):
408 # Handle concatenation of `dict`s, used as a replacement for structured
409 # arrays when that's not supported by the array library (e.g., CuPy).
File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:417, in _concatenate2(arrays, axes)
415 return ret
416 else:
--> 417 return concatenate(arrays, axis=axes[0])
File <__array_function__ internals>:200, in concatenate(*args, **kwargs)
File cupy/_core/core.pyx:1475, in cupy._core.core._ndarray_base.__array__()
TypeError: Implicit conversion to a NumPy array is not allowed. Please use `.get()` to construct a NumPy array explicitly.
So at some point we end up with an numpy array instead of a cupy array, or dask needs to have a numpy array as a result?
BUT
When runnig a cucim function like this:
import cucim.skimage.restoration as cskr
eps1 = 0.001
weight1 = 0.1
max_num_iter1 = 50
def process_block(block):
block = cp.asarray(block)
#print(block.shape)
# Perform Chambolle denoising
block = cskr.denoise_tv_chambolle(block, weight=weight1, eps=eps1, max_num_iter=max_num_iter1)
return block
# Perform Chambolle denoising on each block
vol_Den = da.map_blocks(process_block, stack, dtype='float64') #,
# Persist the data in memory
vol_Den = vol_Den.persist()
# Compute the result
vol_Den = vol_Den.compute()
The stack gets correctly processed and concatenated.
Question
Why do both functions behave differently? What am I overlooking?
Is some able to help me figure this out?
If you have followup questions I’m happy to answer them.If you think something is missing or I’m in the wrong topic, please correct me.
Tank you very much in advance. I hope this is not to much text.