Error when using custom function

Hey there,

I’m new to dask and I’ve run into a situation. I’ve adapted an anisotropic diffusion filter for smoothing CT images. And when runing it using map_block and compute it, I do not get the dask to concatenate the output.

Workflow

I have a list of unit8 png’s which reside on my disk. I load them delayed like this:

Loading

filenames = sorted(glob("path/to/*.png"), key=alphanumeric_key)

sample = imageio.v3.imread(filenames[0], mode='L')

# Use a faster reader and parallelize the reading
lazy_arrays = db.from_sequence(filenames).map(lambda x: imageio.v3.imread(x, mode='L')).compute()

# Convert to Dask arrays and stack
dask_arrays = [da.from_array(arr, chunks=sample.shape) for arr in lazy_arrays]
stack = da.stack(dask_arrays, axis=0)

The function I used is an adaption of the code of awangenh github code. when using it on a single image it works very well and as expected. I’ve adatpted the code to work with the rapids library (cupy) .

Custom function

import cucim.skimage.util as csku
import cupy as cp

def anisodiff_gpu2(img, niter=1, kappa=50, gamma=0.1, step=(1.,1.), option=1):

    if img.ndim == 3:
        #warnings.warn("Only grayscale images allowed, converting to 2D matrix")
        img = img[0,:,:]

    img = csku.img_as_float(img)
    imgout = img.copy()

    deltaS = cp.zeros_like(imgout)
    deltaE = deltaS.copy()
    gS = cp.ones_like(imgout)
    gE = gS.copy()

    for _ in range(niter):
        deltaS[:-1,: ] = cp.diff(imgout,axis=0)
        deltaE[: ,:-1] = cp.diff(imgout,axis=1)

        if option == 1:
            gS, gE = cp.exp(-(deltaS/kappa)**2.)/step[0], cp.exp(-(deltaE/kappa)**2.)/step[1]
        elif option == 2:
            gS, gE = 1./(1.+(deltaS/kappa)**2.)/step[0], 1./(1.+(deltaE/kappa)**2.)/step[1]

        NS, EW = gS*deltaS, gE*deltaE
        NS[1:,:] -= deltaS[:-1,:]
        EW[:,1:] -= deltaE[:,:-1]

        imgout += gamma*(NS+EW)

    return cp.asarray(imgout)

Computing

I define a function which is operating on my blocks

def process_block2(block):
    block = cp.asarray(block)
    # Perform anisotropic denoising
    block = sf.anisodiff_gpu2(block, niter=50, kappa=8, gamma=0.166, option=1)
    return block

and executing it like this:

import dask as da

# Perform denoising on each block
vol_Den2 = da.map_blocks(process_block2, stack, dtype='float32') #, 

# Persist the data in memory
vol_Den2 = vol_Den2.persist()

# Compute the result
vol_Den2 = vol_Den2.compute()

when monitoring the process with dask distributed dashboard, I can see that somethin is happening. But when vol_Den2.compute() is executed it seems to have trouble to concatenate the arrays together.

Error Traceback
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[6], line 8
      5 vol_Den2 = vol_Den2.persist()
      7 # Compute the result
----> 8 vol_Den2 = vol_Den2.compute()

File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
    318 def compute(self, **kwargs):
    319     """Compute this dask collection
    320 
    321     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    340     dask.compute
    341     """
--> 342     (result,) = compute(self, traverse=False, **kwargs)
    343     return result

File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/base.py:630, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    627 with shorten_traceback():
    628     results = schedule(dsk, keys, **kwargs)
--> 630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/base.py:630, in <listcomp>(.0)
    627 with shorten_traceback():
    628     results = schedule(dsk, keys, **kwargs)
--> 630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:1282, in finalize(results)
   1280 while isinstance(results2, (tuple, list)):
   1281     if len(results2) > 1:
-> 1282         return concatenate3(results)
   1283     else:
   1284         results2 = results2[0]

File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:5299, in concatenate3(arrays)
   5297 if concatenate_lookup.dispatch(type(advanced)) is not np.concatenate:
   5298     x = unpack_singleton(arrays)
-> 5299     return _concatenate2(arrays, axes=list(range(x.ndim)))
   5301 ndim = ndimlist(arrays)
   5302 if not ndim:

File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:403, in _concatenate2(arrays, axes)
    401     return arrays
    402 if len(axes) > 1:
--> 403     arrays = [_concatenate2(a, axes=axes[1:]) for a in arrays]
    404 concatenate = concatenate_lookup.dispatch(
    405     type(max(arrays, key=lambda x: getattr(x, "__array_priority__", 0)))
    406 )
    407 if isinstance(arrays[0], dict):
    408     # Handle concatenation of `dict`s, used as a replacement for structured
    409     # arrays when that's not supported by the array library (e.g., CuPy).

File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:403, in <listcomp>(.0)
    401     return arrays
    402 if len(axes) > 1:
--> 403     arrays = [_concatenate2(a, axes=axes[1:]) for a in arrays]
    404 concatenate = concatenate_lookup.dispatch(
    405     type(max(arrays, key=lambda x: getattr(x, "__array_priority__", 0)))
    406 )
    407 if isinstance(arrays[0], dict):
    408     # Handle concatenation of `dict`s, used as a replacement for structured
    409     # arrays when that's not supported by the array library (e.g., CuPy).

File ~/miniconda3/envs/segmentTest2/lib/python3.10/site-packages/dask/array/core.py:417, in _concatenate2(arrays, axes)
    415     return ret
    416 else:
--> 417     return concatenate(arrays, axis=axes[0])

File <__array_function__ internals>:200, in concatenate(*args, **kwargs)

File cupy/_core/core.pyx:1475, in cupy._core.core._ndarray_base.__array__()

TypeError: Implicit conversion to a NumPy array is not allowed. Please use `.get()` to construct a NumPy array explicitly.

So at some point we end up with an numpy array instead of a cupy array, or dask needs to have a numpy array as a result?

BUT

When runnig a cucim function like this:

import cucim.skimage.restoration as cskr 

eps1 = 0.001
weight1 = 0.1
max_num_iter1 = 50

def process_block(block):
    block = cp.asarray(block)
    #print(block.shape)
    # Perform Chambolle denoising
    block = cskr.denoise_tv_chambolle(block, weight=weight1, eps=eps1, max_num_iter=max_num_iter1)
    return block

# Perform Chambolle denoising on each block
vol_Den = da.map_blocks(process_block, stack, dtype='float64') #, 

# Persist the data in memory
vol_Den = vol_Den.persist()

# Compute the result
vol_Den = vol_Den.compute()

The stack gets correctly processed and concatenated.

Question

Why do both functions behave differently? What am I overlooking?

Is some able to help me figure this out?

If you have followup questions I’m happy to answer them.If you think something is missing or I’m in the wrong topic, please correct me.

Tank you very much in advance. I hope this is not to much text.

Hi @UTOBY, welcome to Dask Discourse!

This is a complex topic to me, but I’ll try to offer some suggestions.

First in the data loading : are you really calling compute() when buiding the lazy_arrays? This would mean you first load every images on memory before building a Dask Array? If so there is probably some way to optimize, but this is not the question.

Some suggestion on how to debug:

  • You said you tried to run your implementation on a single image, what was the result dtype? Was it correct?
  • Does the original code without cupy works with you Dask code?
  • Why are you calling cp.asarray(imgout) at the end of the method, imgout isn’t already on GPU?

Dask Array can perfectly work with cupy as blocks.

1 Like

Hy @guillaumeeb,

thank you for taking the time and replying.

You are right, calling compute means that I read all the images into memory. Since my data Is small enough to fit into memory I don’t really need lazy reading. And I found that having the images in RAM is faster then reading it lazily from disk. (the dataset is not bigger then 2GB, which is pretty small) But then again, this is my first venture into dask and I was more interested in using it for distributed work, since executing the function in a traditional for loop on my GPU or GPU is slow due to the iterloop within the anisotropic diffusion function.

Regarding your other suggestions:

  • The test of my function without using dask resulted in a cupy array as output.
  • When refactoring the function and using numpy instead, the problem persists and I get the same error (that seems strange to me, since numpy should work with dask, right? - and it seems I’m not using some numpy functions which are not natively supported in dask)
  • Well I was using cp.asarray(imgout) to really force to output a cupy array even though I knew it already is one, this was for debugging reasons still in the code.

I guess one could test the function and the code of mine on its own machine by creating a 3D array filled with random uint8 integers (0 to 255). The function still should work, even though it’s only noisy images.

I’m still puzzled what I’m doing wrong.

You’ve got the same error coming from cupy?

It seems it was an error of mine.

the function I used is only working on images with 2 dimensions (img.shape(1000, 500) . But a block passed in the map_blocks function passes a slice of the array with 3 dimensions to the function (block.shape(1, 1000, 500).

Since the image I returned in my function has only 2 dimensions, I only need to add cp.expand_dims(imgout, axis=0). Thats why I got the same error message by using numpy or cupy.
Stupid me… Last week I was obviously not aware enough the catch that obvious fact.

For completeness:

I decided to rework the image loading process like this:

import dask_image as di

array_delayed = di.imread.imread("/path/toPNG/files/*IncludePattern1*[!_excludePattern1_]*.png",
                                 arraytype='cupy')[:,:,:,0]

When executing the process_block using

result = da.map_blocks(process_block2, array_delayed, dtype='float64')

I noticed that the longest time (~500 ms) spend is by get_item (when looking at the dash dashbord). But this still was faster then reading the entire array to RAM (or VRAM) and run the computations with a rechunked array.

One more question:

If I would pass a block of the shape (10, 1000, 500) a.k.a. (slice, x, y) to my function, and I wanted to compute every slice, then I could modify my function with a for loop so that every worker processes the 10 slices of the block sequentially, right? Then transfering would be slower but the worker could spend more time processing.
Or what would be the best approach here? (But I think this is out of scope of this question)

I’m glad that we resolved this :sunny:

Thank you @guillaumeeb for your help and patience!

So long

UTOBY