Dask LocalCudaCluster compute error when `threads_per_worker` not equal to 1

I find a weird error with LocalCUDACluster. My workflow is use dask to load data from zarr, then transfer to GPU memory, do some computation with multi-GPU, transfer the result back to CPU memory and finally save to zarr. Here is my code:

coh_path = '../CLI/co/ds_can_coh.zarr'
ph_path = './ds_can_ph.zarr'
emi_quality_path = './ds_can_emi_quality.zarr'

cluster = LocalCUDACluster(n_workers=8, threads_per_worker=3)
client = Client(cluster)
# time.sleep(1)

coh_zarr = zarr.open(coh_path,mode='r')
pc_chunk_size = 246799
cpu_coh = da.from_zarr(coh_path, chunks=(pc_chunk_size,*coh_zarr.shape[1:]))

coh = cpu_coh.map_blocks(cp.asarray)
coh_delayed = coh.to_delayed()
coh_delayed = np.squeeze(coh_delayed,axis=(-2,-1))

ph_delayed = np.empty_like(coh_delayed,dtype=object)
emi_quality_delayed = np.empty_like(coh_delayed,dtype=object)

with np.nditer(coh_delayed,flags=['multi_index','refs_ok'], op_flags=['readwrite']) as it:
    for block in it:
        idx = it.multi_index
        ph_delayed[idx], emi_quality_delayed[idx] = delayed(emi,pure=True,nout=2)(coh_delayed[idx])
        ph_delayed[idx] = da.from_delayed(ph_delayed[idx],shape=coh.blocks[idx].shape[0:2],meta=cp.array((),dtype=coh.dtype))
        emi_quality_delayed[idx] = da.from_delayed(emi_quality_delayed[idx],shape=coh.blocks[idx].shape[0:1],meta=cp.array((),dtype=cp.float32))

ph = da.block(ph_delayed[...,None].tolist())
emi_quality = da.block(emi_quality_delayed.tolist())

cpu_ph = ph.map_blocks(cp.asnumpy)
cpu_emi_quality = emi_quality.map_blocks(cp.asnumpy)

_cpu_ph = cpu_ph.to_zarr(ph_path,compute=False,overwrite=True)
_cpu_emi_quality = cpu_emi_quality.to_zarr(emi_quality_path,compute=False,overwrite=True)

def emi(coh:cp.ndarray, #complex coherence metrix,dtype cupy.complex
        ref:int=0, #index of reference image in the phase history output, optional. Default: 0
       )-> tuple[cp.ndarray,cp.ndarray]: # estimated phase history `ph`, dtype complex; quality (minimum eigvalue, dtype float)
    coh_mag = abs(coh)
    coh_mag_inv = cp.linalg.inv(coh_mag)
    min_eigval, min_eig = cp.linalg.eigh(coh_mag_inv*coh)
    min_eigval = min_eigval[...,0]
    # min_eig = min_eig[...,0]
    min_eig = min_eig[...,0]*min_eig[...,[ref],0].conj()

    return min_eig/abs(min_eig), min_eigval

Please forgive me that I can’t not provide a clean and minimum test code since I find the error disappear when I do even a little bit change to the code.

The error is

2023-10-19 23:16:08,095 - distributed.worker - WARNING - Compute Failed
Key:       ('asarray-concatenate-emi-from-value-getitem-asnumpy-8538ece2468390fdbe992622b836a920', 0, 0)
Function:  execute_task
args:      ((subgraph_callable-34d87e96-8737-4514-bf3f-0ec3b17723a2, (<built-in function getitem>, (<function emi at 0x15544060f640>, (subgraph_callable-a431c24f-cb39-4c39-8b01-3f8f75b4b337, <zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(0, 246799, None), slice(0, 17, None), slice(0, 17, None)))), 0)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

2023-10-19 23:16:08,098 - distributed.worker - WARNING - Compute Failed
Key:       ('asarray-concatenate-emi-from-value-getitem-asnumpy-8538ece2468390fdbe992622b836a920', 2, 0)
Function:  execute_task
args:      ((subgraph_callable-34d87e96-8737-4514-bf3f-0ec3b17723a2, (<built-in function getitem>, (<function emi at 0x15544060f640>, (subgraph_callable-a431c24f-cb39-4c39-8b01-3f8f75b4b337, <zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(493598, 740397, None), slice(0, 17, None), slice(0, 17, None)))), 0)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

CUDARuntimeError                          Traceback (most recent call last)
Cell In[11], line 1
----> 1 da.compute(_cpu_ph)

File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/work/lib/python3.10/site-packages/cupy/_creation/from_data.py:76, in asarray()
     49 def asarray(a, dtype=None, order=None):
     50     """Converts an object to array.
     52     This is equivalent to ``array(a, dtype, copy=False)``.
     75     """
---> 76     return _core.array(a, dtype, False, order)

File cupy/_core/core.pyx:2360, in cupy._core.core.array()

File cupy/_core/core.pyx:2384, in cupy._core.core.array()

File cupy/_core/core.pyx:2532, in cupy._core.core._array_default()

File cupy/cuda/memory.pyx:491, in cupy.cuda.memory.MemoryPointer.copy_from_host_async()

File cupy_backends/cuda/api/runtime.pyx:588, in cupy_backends.cuda.api.runtime.memcpyAsync()

File cupy_backends/cuda/api/runtime.pyx:143, in cupy_backends.cuda.api.runtime.check_status()

CUDARuntimeError: cudaErrorInvalidValue: invalid argument

The most weird thing is when I set the threads_per_worker to 1, this error disappers. And when I
use time.sleep(1) after creating the cluster and client, the error also disappear. It is hard for me to understand it.

Hi @kanglcn,

It seems the error is raised right at the beginning of your computation, when converting the input Array to cupy Array, so cpying the data in GPU memory.

Can you reproduce the issue with just a simple code like:

coh = cpu_coh.map_blocks(cp.asarray)

Do you always get the error without the time.sleep call and when using several threads per worker?

Does calling time.sleep(1) always make the error disapear? This is really strange!

Hi @guillaumeeb ,

Thank you for your help! I really appreciate that!

With the mean() code, sometimes it has similar error:

or errors like:

Sometimes it just works perfect.

time.sleep(1) do not always help. Sometimes the error is still there with time.sleep(1).
But with threads_per_worker=1, the error always disappear.

A new finding is, after the calculation, when I do cluster.close(); client.close(), I sometimes get

This error does not always exist, either. But with threads_per_worker=1, such error never appears.

One interesting finding is, if I first load all the needed data into memory, then use da.from_array rather than da_from_zarr, the error always disappear even with threads_per_worker!=1.

Now I can provide a minimum code to show the error:

import numpy as np
import cupy as cp
import zarr
import dask
from dask import array as da
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

# prepare the code
a_np = np.random.random((100, 100, 5, 5))
a_zarr = zarr.open('./a.zarr','w',shape=a_np.shape,chunks=(10,-1,-1,-1))
a_zarr[:] = a_np

cluster = LocalCUDACluster(n_workers=3, threads_per_worker=3)
client = Client(cluster)

a_da = da.from_zarr('./a.zarr')
a_cu_da = a_da.map_blocks(cp.asarray)

is_b = (a_cu_da < 0.5) & (a_cu_da >= 0)

b_num = da.count_nonzero(is_b,axis=(-2,-1)).astype(cp.int32)

is_b_cpu = is_b.map_blocks(cp.asnumpy)

_is_b_cpu = is_b_cpu.to_zarr('./is_b.zarr',compute=False,overwrite=True)


I will get:

Thanks @kanglcn, this will be really helpful to help!

Nice thing is that you have a workaround to make the error disappear.

I don’t think I’ll be able to help, because this goes beyond my knowledge. It seems when using multiple threads there is some race condition occurring.

cc @jakirkham @jacobtomlinson @quasiben.

Hi @guillaumeeb ,

I also contact the dask_cuda team and they said using more than one thread per worker (default) is not officially support for LocalCUDACluster at the moment. Please look at Dask LocalCudaCluster compute error when threads_per_worker not equal to 1 · Issue #1262 · rapidsai/dask-cuda (github.com)

Thanks for your help anyway.

