Dask LocalCudaCluster compute error when `threads_per_worker` not equal to 1

I find a weird error with LocalCUDACluster. My workflow is use dask to load data from zarr, then transfer to GPU memory, do some computation with multi-GPU, transfer the result back to CPU memory and finally save to zarr. Here is my code:

coh_path = '../CLI/co/ds_can_coh.zarr'
ph_path = './ds_can_ph.zarr'
emi_quality_path = './ds_can_emi_quality.zarr'

cluster = LocalCUDACluster(n_workers=8, threads_per_worker=3)
client = Client(cluster)
# time.sleep(1)

coh_zarr = zarr.open(coh_path,mode='r')
pc_chunk_size = 246799
cpu_coh = da.from_zarr(coh_path, chunks=(pc_chunk_size,*coh_zarr.shape[1:]))

coh = cpu_coh.map_blocks(cp.asarray)
coh_delayed = coh.to_delayed()
coh_delayed = np.squeeze(coh_delayed,axis=(-2,-1))

ph_delayed = np.empty_like(coh_delayed,dtype=object)
emi_quality_delayed = np.empty_like(coh_delayed,dtype=object)

with np.nditer(coh_delayed,flags=['multi_index','refs_ok'], op_flags=['readwrite']) as it:
    for block in it:
        idx = it.multi_index
        ph_delayed[idx], emi_quality_delayed[idx] = delayed(emi,pure=True,nout=2)(coh_delayed[idx])
        ph_delayed[idx] = da.from_delayed(ph_delayed[idx],shape=coh.blocks[idx].shape[0:2],meta=cp.array((),dtype=coh.dtype))
        emi_quality_delayed[idx] = da.from_delayed(emi_quality_delayed[idx],shape=coh.blocks[idx].shape[0:1],meta=cp.array((),dtype=cp.float32))

ph = da.block(ph_delayed[...,None].tolist())
emi_quality = da.block(emi_quality_delayed.tolist())

cpu_ph = ph.map_blocks(cp.asnumpy)
cpu_emi_quality = emi_quality.map_blocks(cp.asnumpy)

_cpu_ph = cpu_ph.to_zarr(ph_path,compute=False,overwrite=True)
_cpu_emi_quality = cpu_emi_quality.to_zarr(emi_quality_path,compute=False,overwrite=True)

da.compute(_cpu_ph)
def emi(coh:cp.ndarray, #complex coherence metrix,dtype cupy.complex
        ref:int=0, #index of reference image in the phase history output, optional. Default: 0
       )-> tuple[cp.ndarray,cp.ndarray]: # estimated phase history `ph`, dtype complex; quality (minimum eigvalue, dtype float)
    coh_mag = abs(coh)
    coh_mag_inv = cp.linalg.inv(coh_mag)
    min_eigval, min_eig = cp.linalg.eigh(coh_mag_inv*coh)
    min_eigval = min_eigval[...,0]
    # min_eig = min_eig[...,0]
    min_eig = min_eig[...,0]*min_eig[...,[ref],0].conj()

    return min_eig/abs(min_eig), min_eigval

Please forgive me that I can’t not provide a clean and minimum test code since I find the error disappear when I do even a little bit change to the code.

The error is

2023-10-19 23:16:08,095 - distributed.worker - WARNING - Compute Failed
Key:       ('asarray-concatenate-emi-from-value-getitem-asnumpy-8538ece2468390fdbe992622b836a920', 0, 0)
Function:  execute_task
args:      ((subgraph_callable-34d87e96-8737-4514-bf3f-0ec3b17723a2, (<built-in function getitem>, (<function emi at 0x15544060f640>, (subgraph_callable-a431c24f-cb39-4c39-8b01-3f8f75b4b337, <zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(0, 246799, None), slice(0, 17, None), slice(0, 17, None)))), 0)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

2023-10-19 23:16:08,098 - distributed.worker - WARNING - Compute Failed
Key:       ('asarray-concatenate-emi-from-value-getitem-asnumpy-8538ece2468390fdbe992622b836a920', 2, 0)
Function:  execute_task
args:      ((subgraph_callable-34d87e96-8737-4514-bf3f-0ec3b17723a2, (<built-in function getitem>, (<function emi at 0x15544060f640>, (subgraph_callable-a431c24f-cb39-4c39-8b01-3f8f75b4b337, <zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(493598, 740397, None), slice(0, 17, None), slice(0, 17, None)))), 0)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

---------------------------------------------------------------------------
CUDARuntimeError                          Traceback (most recent call last)
Cell In[11], line 1
----> 1 da.compute(_cpu_ph)

File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/work/lib/python3.10/site-packages/cupy/_creation/from_data.py:76, in asarray()
     49 def asarray(a, dtype=None, order=None):
     50     """Converts an object to array.
     51 
     52     This is equivalent to ``array(a, dtype, copy=False)``.
   (...)
     74 
     75     """
---> 76     return _core.array(a, dtype, False, order)

File cupy/_core/core.pyx:2360, in cupy._core.core.array()

File cupy/_core/core.pyx:2384, in cupy._core.core.array()

File cupy/_core/core.pyx:2532, in cupy._core.core._array_default()

File cupy/cuda/memory.pyx:491, in cupy.cuda.memory.MemoryPointer.copy_from_host_async()

File cupy_backends/cuda/api/runtime.pyx:588, in cupy_backends.cuda.api.runtime.memcpyAsync()

File cupy_backends/cuda/api/runtime.pyx:143, in cupy_backends.cuda.api.runtime.check_status()

CUDARuntimeError: cudaErrorInvalidValue: invalid argument

The most weird thing is when I set the threads_per_worker to 1, this error disappers. And when I
use time.sleep(1) after creating the cluster and client, the error also disappear. It is hard for me to understand it.

Hi @kanglcn,

It seems the error is raised right at the beginning of your computation, when converting the input Array to cupy Array, so cpying the data in GPU memory.

Can you reproduce the issue with just a simple code like:

coh = cpu_coh.map_blocks(cp.asarray)
coh.mean().compute()

Do you always get the error without the time.sleep call and when using several threads per worker?

Does calling time.sleep(1) always make the error disapear? This is really strange!

Hi @guillaumeeb ,

Thank you for your help! I really appreciate that!

With the mean() code, sometimes it has similar error:

2023-10-20 16:07:34,233 - distributed.worker - WARNING - Compute Failed
Key:       ('mean_chunk-mean_combine-partial-a0d2906d554d3be538686807052898da', 1, 0, 0)
Function:  execute_task
args:      ((functools.partial(<function mean_combine at 0x155456833640>, axis=(0, 1, 2), keepdims=True), [[[(subgraph_callable-36965da8-9f47-49c9-9ee2-9e82cab2415c, <zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(493598, 740397, None), slice(0, 17, None), slice(0, 17, None)))]]]))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

---------------------------------------------------------------------------
CUDARuntimeError                          Traceback (most recent call last)
Cell In[53], line 2
      1 coh = cpu_coh.map_blocks(cp.asarray)
----> 2 coh.mean().compute()

File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
    318 def compute(self, **kwargs):
    319     """Compute this dask collection
    320 
    321     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    340     dask.compute
    341     """
--> 342     (result,) = compute(self, traverse=False, **kwargs)
    343     return result

File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/work/lib/python3.10/site-packages/cupy/_creation/from_data.py:76, in asarray()
     49 def asarray(a, dtype=None, order=None):
     50     """Converts an object to array.
     51 
     52     This is equivalent to ``array(a, dtype, copy=False)``.
   (...)
     74 
     75     """
---> 76     return _core.array(a, dtype, False, order)

File cupy/_core/core.pyx:2360, in cupy._core.core.array()

File cupy/_core/core.pyx:2384, in cupy._core.core.array()

File cupy/_core/core.pyx:2532, in cupy._core.core._array_default()

File cupy/cuda/memory.pyx:491, in cupy.cuda.memory.MemoryPointer.copy_from_host_async()

File cupy_backends/cuda/api/runtime.pyx:588, in cupy_backends.cuda.api.runtime.memcpyAsync()

File cupy_backends/cuda/api/runtime.pyx:143, in cupy_backends.cuda.api.runtime.check_status()

CUDARuntimeError: cudaErrorInvalidValue: invalid argument
2023-10-20 16:07:34,394 - distributed.worker - WARNING - Compute Failed
Key:       ('mean_chunk-9539a40ab711b977753f7a6a6067ef67', 1, 0, 0)
Function:  subgraph_callable-36965da8-9f47-49c9-9ee2-9e82cab2
args:      (<zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(246799, 493598, None), slice(0, 17, None), slice(0, 17, None)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

or errors like:

2023-10-20 16:14:18,180 - distributed.worker - WARNING - Compute Failed
Key:       ('mean_chunk-mean_combine-partial-eb4b6b755a760e6d742fb59e72f569a1', 1, 0, 0)
Function:  execute_task
args:      ((functools.partial(<function mean_combine at 0x15545686f640>, axis=(0, 1, 2), keepdims=True), [[[(subgraph_callable-6cbb4118-2830-4df1-afd9-c2dc8704d774, <zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(493598, 740397, None), slice(0, 17, None), slice(0, 17, None)))]]]))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

---------------------------------------------------------------------------
CUDARuntimeError                          Traceback (most recent call last)
Cell In[12], line 2
      1 coh = cpu_coh.map_blocks(cp.asarray)
----> 2 coh.mean().compute()

File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
    318 def compute(self, **kwargs):
    319     """Compute this dask collection
    320 
    321     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    340     dask.compute
    341     """
--> 342     (result,) = compute(self, traverse=False, **kwargs)
    343     return result

File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/work/lib/python3.10/site-packages/cupy/_creation/from_data.py:76, in asarray()
     49 def asarray(a, dtype=None, order=None):
     50     """Converts an object to array.
     51 
     52     This is equivalent to ``array(a, dtype, copy=False)``.
   (...)
     74 
     75     """
---> 76     return _core.array(a, dtype, False, order)

File cupy/_core/core.pyx:2360, in cupy._core.core.array()

File cupy/_core/core.pyx:2384, in cupy._core.core.array()

File cupy/_core/core.pyx:2532, in cupy._core.core._array_default()

File cupy/cuda/memory.pyx:491, in cupy.cuda.memory.MemoryPointer.copy_from_host_async()

File cupy_backends/cuda/api/runtime.pyx:588, in cupy_backends.cuda.api.runtime.memcpyAsync()

File cupy_backends/cuda/api/runtime.pyx:143, in cupy_backends.cuda.api.runtime.check_status()

CUDARuntimeError: cudaErrorInvalidValue: invalid argument

Sometimes it just works perfect.

time.sleep(1) do not always help. Sometimes the error is still there with time.sleep(1).
But with threads_per_worker=1, the error always disappear.

A new finding is, after the calculation, when I do cluster.close(); client.close(), I sometimes get

2023-10-20 16:04:20,379 - distributed.worker - ERROR - Unexpected exception during heartbeat. Closing worker.
Traceback (most recent call last):
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/worker.py", line 1253, in heartbeat
    response = await retry_operation(
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 454, in retry_operation
    return await retry(
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 433, in retry
    return await coro()
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1344, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1543, in connect
    raise RuntimeError("ConnectionPool is closed")
RuntimeError: ConnectionPool is closed
2023-10-20 16:04:20,380 - tornado.application - ERROR - Exception in callback <bound method Worker.heartbeat of <Worker 'tcp://127.0.0.1:34371', name: 3, status: closed, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>>
Traceback (most recent call last):
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/compatibility.py", line 146, in _run
    await val
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/worker.py", line 1253, in heartbeat
    response = await retry_operation(
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 454, in retry_operation
    return await retry(
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 433, in retry
    return await coro()
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1344, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1543, in connect
    raise RuntimeError("ConnectionPool is closed")
RuntimeError: ConnectionPool is closed

or

2023-10-20 15:59:09,091 - distributed.nanny - WARNING - Worker process still alive after 3.1999981689453127 seconds, killing
2023-10-20 15:59:09,092 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2023-10-20 15:59:09,092 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2023-10-20 15:59:09,093 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-10-20 15:59:09,097 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2023-10-20 15:59:09,098 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-10-20 15:59:09,101 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-10-20 15:59:09,106 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-10-20 15:59:09,892 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x15542dfba770>>, <Task finished name='Task-15042' coro=<SpecCluster._correct_state_internal() done, defined at /users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/spec.py:346> exception=TimeoutError()>)
Traceback (most recent call last):
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
asyncio.exceptions.TimeoutError
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
File ~/miniconda3/envs/work/lib/python3.10/asyncio/tasks.py:456, in wait_for(fut, timeout)
    455 try:
--> 456     return fut.result()
    457 except exceptions.CancelledError as exc:

CancelledError: 

The above exception was the direct cause of the following exception:

TimeoutError                              Traceback (most recent call last)
Cell In[13], line 1
----> 1 cluster.close()

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/spec.py:293, in SpecCluster.close(self, timeout)
    292 def close(self, timeout: float | None = None) -> Awaitable[None] | None:
--> 293     aw = super().close(timeout)
    294     if not self.asynchronous:
    295         self._loop_runner.stop()

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/cluster.py:226, in Cluster.close(self, timeout)
    223     return None
    225 try:
--> 226     return self.sync(self._close, callback_timeout=timeout)
    227 except RuntimeError:  # loop closed during process shutdown
    228     return None

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils.py:359, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    357     return future
    358 else:
--> 359     return sync(
    360         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    361     )

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils.py:426, in sync(loop, func, callback_timeout, *args, **kwargs)
    424 if error:
    425     typ, exc, tb = error
--> 426     raise exc.with_traceback(tb)
    427 else:
    428     return result

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils.py:399, in sync.<locals>.f()
    397         future = wait_for(future, callback_timeout)
    398     future = asyncio.ensure_future(future)
--> 399     result = yield future
    400 except Exception:
    401     error = sys.exc_info()

File ~/miniconda3/envs/work/lib/python3.10/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/spec.py:446, in SpecCluster._close(self)
    444 if isawaitable(f):
    445     await f
--> 446 await self._correct_state()
    447 await asyncio.gather(*self._futures)
    449 if self.scheduler_comm:

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/spec.py:359, in SpecCluster._correct_state_internal(self)
    353         await self.scheduler_comm.retire_workers(workers=list(to_close))
    354     tasks = [
    355         asyncio.create_task(self.workers[w].close())
    356         for w in to_close
    357         if w in self.workers
    358     ]
--> 359     await asyncio.gather(*tasks)
    360 for name in to_close:
    361     if name in self.workers:

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/nanny.py:619, in Nanny.close(self, timeout, reason)
    617 self.stop()
    618 if self.process is not None:
--> 619     await self.kill(timeout=timeout, reason=reason)
    621 self.process = None
    622 await self.rpc.close()

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/nanny.py:396, in Nanny.kill(self, timeout, reason)
    393     return
    395 deadline = time() + timeout
--> 396 await self.process.kill(reason=reason, timeout=0.8 * (deadline - time()))

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/nanny.py:867, in WorkerProcess.kill(self, timeout, executor_wait, reason)
    863     logger.warning(
    864         f"Worker process still alive after {wait_timeout} seconds, killing"
    865     )
    866     await process.kill()
--> 867     await process.join(max(0, deadline - time()))
    868 except ValueError as e:
    869     if "invalid operation on closed AsyncProcess" in str(e):

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/process.py:330, in AsyncProcess.join(self, timeout)
    327     return
    328 # Shield otherwise the timeout cancels the future and our
    329 # on_exit callback will try to set a result on a canceled future
--> 330 await wait_for(asyncio.shield(self._exit_future), timeout)

File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils.py:1920, in wait_for(fut, timeout)
   1919 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1920     return await asyncio.wait_for(fut, timeout)

File ~/miniconda3/envs/work/lib/python3.10/asyncio/tasks.py:458, in wait_for(fut, timeout)
    456             return fut.result()
    457         except exceptions.CancelledError as exc:
--> 458             raise exceptions.TimeoutError() from exc
    459 finally:
    460     timeout_handle.cancel()

TimeoutError: 

This error does not always exist, either. But with threads_per_worker=1, such error never appears.

One interesting finding is, if I first load all the needed data into memory, then use da.from_array rather than da_from_zarr, the error always disappear even with threads_per_worker!=1.

Now I can provide a minimum code to show the error:

import numpy as np
import cupy as cp
import zarr
import dask
from dask import array as da
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

# prepare the code
a_np = np.random.random((100, 100, 5, 5))
a_zarr = zarr.open('./a.zarr','w',shape=a_np.shape,chunks=(10,-1,-1,-1))
a_zarr[:] = a_np

cluster = LocalCUDACluster(n_workers=3, threads_per_worker=3)
client = Client(cluster)

a_da = da.from_zarr('./a.zarr')
a_cu_da = a_da.map_blocks(cp.asarray)

is_b = (a_cu_da < 0.5) & (a_cu_da >= 0)

b_num = da.count_nonzero(is_b,axis=(-2,-1)).astype(cp.int32)

is_b_cpu = is_b.map_blocks(cp.asnumpy)

_is_b_cpu = is_b_cpu.to_zarr('./is_b.zarr',compute=False,overwrite=True)

da.compute(_is_b_cpu)

I will get:

2023-10-22 18:55:21,760 - distributed.worker - WARNING - Compute Failed
Key:       ('asnumpy-ac4fe35f67de082b63465a864bc6b4eb', 2, 0, 0, 0)
Function:  subgraph_callable-e9fbb40b-c7fe-4438-8765-7c7e2bac
args:      (0, 0.5, <zarr.core.Array (100, 100, 5, 5) float64 read-only>, (slice(20, 30, None), slice(0, 100, None), slice(0, 5, None), slice(0, 5, None)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

2023-10-22 18:55:21,760 - distributed.worker - WARNING - Compute Failed
Key:       ('asnumpy-ac4fe35f67de082b63465a864bc6b4eb', 1, 0, 0, 0)
Function:  subgraph_callable-e9fbb40b-c7fe-4438-8765-7c7e2bac
args:      (0, 0.5, <zarr.core.Array (100, 100, 5, 5) float64 read-only>, (slice(10, 20, None), slice(0, 100, None), slice(0, 5, None), slice(0, 5, None)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

2023-10-22 18:55:22,041 - distributed.worker - WARNING - Compute Failed
Key:       ('asnumpy-ac4fe35f67de082b63465a864bc6b4eb', 4, 0, 0, 0)
Function:  subgraph_callable-e9fbb40b-c7fe-4438-8765-7c7e2bac
args:      (0, 0.5, <zarr.core.Array (100, 100, 5, 5) float64 read-only>, (slice(40, 50, None), slice(0, 100, None), slice(0, 5, None), slice(0, 5, None)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

2023-10-22 18:55:22,042 - distributed.worker - WARNING - Compute Failed
Key:       ('asnumpy-ac4fe35f67de082b63465a864bc6b4eb', 3, 0, 0, 0)
Function:  subgraph_callable-e9fbb40b-c7fe-4438-8765-7c7e2bac
args:      (0, 0.5, <zarr.core.Array (100, 100, 5, 5) float64 read-only>, (slice(30, 40, None), slice(0, 100, None), slice(0, 5, None), slice(0, 5, None)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

---------------------------------------------------------------------------
CUDARuntimeError                          Traceback (most recent call last)
Cell In[9], line 1
----> 1 da.compute(_is_b_cpu)

File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/work/lib/python3.10/site-packages/cupy/_creation/from_data.py:76, in asarray()
     49 def asarray(a, dtype=None, order=None):
     50     """Converts an object to array.
     51 
     52     This is equivalent to ``array(a, dtype, copy=False)``.
   (...)
     74 
     75     """
---> 76     return _core.array(a, dtype, False, order)

File cupy/_core/core.pyx:2360, in cupy._core.core.array()

File cupy/_core/core.pyx:2384, in cupy._core.core.array()

File cupy/_core/core.pyx:2532, in cupy._core.core._array_default()

File cupy/cuda/memory.pyx:491, in cupy.cuda.memory.MemoryPointer.copy_from_host_async()

File cupy_backends/cuda/api/runtime.pyx:588, in cupy_backends.cuda.api.runtime.memcpyAsync()

File cupy_backends/cuda/api/runtime.pyx:143, in cupy_backends.cuda.api.runtime.check_status()

CUDARuntimeError: cudaErrorInvalidValue: invalid argument

Thanks @kanglcn, this will be really helpful to help!

Nice thing is that you have a workaround to make the error disappear.

I don’t think I’ll be able to help, because this goes beyond my knowledge. It seems when using multiple threads there is some race condition occurring.

cc @jakirkham @jacobtomlinson @quasiben.

Hi @guillaumeeb ,

I also contact the dask_cuda team and they said using more than one thread per worker (default) is not officially support for LocalCUDACluster at the moment. Please look at Dask LocalCudaCluster compute error when threads_per_worker not equal to 1 · Issue #1262 · rapidsai/dask-cuda (github.com)

Thanks for your help anyway.

1 Like