Hi @guillaumeeb ,
Thank you for your help! I really appreciate that!
With the mean()
code, sometimes it has similar error:
2023-10-20 16:07:34,233 - distributed.worker - WARNING - Compute Failed
Key: ('mean_chunk-mean_combine-partial-a0d2906d554d3be538686807052898da', 1, 0, 0)
Function: execute_task
args: ((functools.partial(<function mean_combine at 0x155456833640>, axis=(0, 1, 2), keepdims=True), [[[(subgraph_callable-36965da8-9f47-49c9-9ee2-9e82cab2415c, <zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(493598, 740397, None), slice(0, 17, None), slice(0, 17, None)))]]]))
kwargs: {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"
---------------------------------------------------------------------------
CUDARuntimeError Traceback (most recent call last)
Cell In[53], line 2
1 coh = cpu_coh.map_blocks(cp.asarray)
----> 2 coh.mean().compute()
File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
318 def compute(self, **kwargs):
319 """Compute this dask collection
320
321 This turns a lazy Dask collection into its in-memory equivalent.
(...)
340 dask.compute
341 """
--> 342 (result,) = compute(self, traverse=False, **kwargs)
343 return result
File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
625 postcomputes.append(x.__dask_postcompute__())
627 with shorten_traceback():
--> 628 results = schedule(dsk, keys, **kwargs)
630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/miniconda3/envs/work/lib/python3.10/site-packages/cupy/_creation/from_data.py:76, in asarray()
49 def asarray(a, dtype=None, order=None):
50 """Converts an object to array.
51
52 This is equivalent to ``array(a, dtype, copy=False)``.
(...)
74
75 """
---> 76 return _core.array(a, dtype, False, order)
File cupy/_core/core.pyx:2360, in cupy._core.core.array()
File cupy/_core/core.pyx:2384, in cupy._core.core.array()
File cupy/_core/core.pyx:2532, in cupy._core.core._array_default()
File cupy/cuda/memory.pyx:491, in cupy.cuda.memory.MemoryPointer.copy_from_host_async()
File cupy_backends/cuda/api/runtime.pyx:588, in cupy_backends.cuda.api.runtime.memcpyAsync()
File cupy_backends/cuda/api/runtime.pyx:143, in cupy_backends.cuda.api.runtime.check_status()
CUDARuntimeError: cudaErrorInvalidValue: invalid argument
2023-10-20 16:07:34,394 - distributed.worker - WARNING - Compute Failed
Key: ('mean_chunk-9539a40ab711b977753f7a6a6067ef67', 1, 0, 0)
Function: subgraph_callable-36965da8-9f47-49c9-9ee2-9e82cab2
args: (<zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(246799, 493598, None), slice(0, 17, None), slice(0, 17, None)))
kwargs: {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"
or errors like:
2023-10-20 16:14:18,180 - distributed.worker - WARNING - Compute Failed
Key: ('mean_chunk-mean_combine-partial-eb4b6b755a760e6d742fb59e72f569a1', 1, 0, 0)
Function: execute_task
args: ((functools.partial(<function mean_combine at 0x15545686f640>, axis=(0, 1, 2), keepdims=True), [[[(subgraph_callable-6cbb4118-2830-4df1-afd9-c2dc8704d774, <zarr.core.Array (740397, 17, 17) complex64 read-only>, (slice(493598, 740397, None), slice(0, 17, None), slice(0, 17, None)))]]]))
kwargs: {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"
---------------------------------------------------------------------------
CUDARuntimeError Traceback (most recent call last)
Cell In[12], line 2
1 coh = cpu_coh.map_blocks(cp.asarray)
----> 2 coh.mean().compute()
File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
318 def compute(self, **kwargs):
319 """Compute this dask collection
320
321 This turns a lazy Dask collection into its in-memory equivalent.
(...)
340 dask.compute
341 """
--> 342 (result,) = compute(self, traverse=False, **kwargs)
343 return result
File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
625 postcomputes.append(x.__dask_postcompute__())
627 with shorten_traceback():
--> 628 results = schedule(dsk, keys, **kwargs)
630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/miniconda3/envs/work/lib/python3.10/site-packages/cupy/_creation/from_data.py:76, in asarray()
49 def asarray(a, dtype=None, order=None):
50 """Converts an object to array.
51
52 This is equivalent to ``array(a, dtype, copy=False)``.
(...)
74
75 """
---> 76 return _core.array(a, dtype, False, order)
File cupy/_core/core.pyx:2360, in cupy._core.core.array()
File cupy/_core/core.pyx:2384, in cupy._core.core.array()
File cupy/_core/core.pyx:2532, in cupy._core.core._array_default()
File cupy/cuda/memory.pyx:491, in cupy.cuda.memory.MemoryPointer.copy_from_host_async()
File cupy_backends/cuda/api/runtime.pyx:588, in cupy_backends.cuda.api.runtime.memcpyAsync()
File cupy_backends/cuda/api/runtime.pyx:143, in cupy_backends.cuda.api.runtime.check_status()
CUDARuntimeError: cudaErrorInvalidValue: invalid argument
Sometimes it just works perfect.
time.sleep(1)
do not always help. Sometimes the error is still there with time.sleep(1)
.
But with threads_per_worker=1
, the error always disappear.
A new finding is, after the calculation, when I do cluster.close(); client.close()
, I sometimes get
2023-10-20 16:04:20,379 - distributed.worker - ERROR - Unexpected exception during heartbeat. Closing worker.
Traceback (most recent call last):
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/worker.py", line 1253, in heartbeat
response = await retry_operation(
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 454, in retry_operation
return await retry(
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 433, in retry
return await coro()
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1344, in send_recv_from_rpc
comm = await self.pool.connect(self.addr)
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1543, in connect
raise RuntimeError("ConnectionPool is closed")
RuntimeError: ConnectionPool is closed
2023-10-20 16:04:20,380 - tornado.application - ERROR - Exception in callback <bound method Worker.heartbeat of <Worker 'tcp://127.0.0.1:34371', name: 3, status: closed, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>>
Traceback (most recent call last):
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/compatibility.py", line 146, in _run
await val
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/worker.py", line 1253, in heartbeat
response = await retry_operation(
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 454, in retry_operation
return await retry(
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 433, in retry
return await coro()
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1344, in send_recv_from_rpc
comm = await self.pool.connect(self.addr)
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1543, in connect
raise RuntimeError("ConnectionPool is closed")
RuntimeError: ConnectionPool is closed
or
2023-10-20 15:59:09,091 - distributed.nanny - WARNING - Worker process still alive after 3.1999981689453127 seconds, killing
2023-10-20 15:59:09,092 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2023-10-20 15:59:09,092 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2023-10-20 15:59:09,093 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-10-20 15:59:09,097 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2023-10-20 15:59:09,098 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-10-20 15:59:09,101 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-10-20 15:59:09,106 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-10-20 15:59:09,892 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x15542dfba770>>, <Task finished name='Task-15042' coro=<SpecCluster._correct_state_internal() done, defined at /users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/spec.py:346> exception=TimeoutError()>)
Traceback (most recent call last):
File "/users/kangl/miniconda3/envs/work/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
asyncio.exceptions.TimeoutError
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
File ~/miniconda3/envs/work/lib/python3.10/asyncio/tasks.py:456, in wait_for(fut, timeout)
455 try:
--> 456 return fut.result()
457 except exceptions.CancelledError as exc:
CancelledError:
The above exception was the direct cause of the following exception:
TimeoutError Traceback (most recent call last)
Cell In[13], line 1
----> 1 cluster.close()
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/spec.py:293, in SpecCluster.close(self, timeout)
292 def close(self, timeout: float | None = None) -> Awaitable[None] | None:
--> 293 aw = super().close(timeout)
294 if not self.asynchronous:
295 self._loop_runner.stop()
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/cluster.py:226, in Cluster.close(self, timeout)
223 return None
225 try:
--> 226 return self.sync(self._close, callback_timeout=timeout)
227 except RuntimeError: # loop closed during process shutdown
228 return None
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils.py:359, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
357 return future
358 else:
--> 359 return sync(
360 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
361 )
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils.py:426, in sync(loop, func, callback_timeout, *args, **kwargs)
424 if error:
425 typ, exc, tb = error
--> 426 raise exc.with_traceback(tb)
427 else:
428 return result
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils.py:399, in sync.<locals>.f()
397 future = wait_for(future, callback_timeout)
398 future = asyncio.ensure_future(future)
--> 399 result = yield future
400 except Exception:
401 error = sys.exc_info()
File ~/miniconda3/envs/work/lib/python3.10/site-packages/tornado/gen.py:762, in Runner.run(self)
759 exc_info = None
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/spec.py:446, in SpecCluster._close(self)
444 if isawaitable(f):
445 await f
--> 446 await self._correct_state()
447 await asyncio.gather(*self._futures)
449 if self.scheduler_comm:
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/deploy/spec.py:359, in SpecCluster._correct_state_internal(self)
353 await self.scheduler_comm.retire_workers(workers=list(to_close))
354 tasks = [
355 asyncio.create_task(self.workers[w].close())
356 for w in to_close
357 if w in self.workers
358 ]
--> 359 await asyncio.gather(*tasks)
360 for name in to_close:
361 if name in self.workers:
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/nanny.py:619, in Nanny.close(self, timeout, reason)
617 self.stop()
618 if self.process is not None:
--> 619 await self.kill(timeout=timeout, reason=reason)
621 self.process = None
622 await self.rpc.close()
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/nanny.py:396, in Nanny.kill(self, timeout, reason)
393 return
395 deadline = time() + timeout
--> 396 await self.process.kill(reason=reason, timeout=0.8 * (deadline - time()))
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/nanny.py:867, in WorkerProcess.kill(self, timeout, executor_wait, reason)
863 logger.warning(
864 f"Worker process still alive after {wait_timeout} seconds, killing"
865 )
866 await process.kill()
--> 867 await process.join(max(0, deadline - time()))
868 except ValueError as e:
869 if "invalid operation on closed AsyncProcess" in str(e):
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/process.py:330, in AsyncProcess.join(self, timeout)
327 return
328 # Shield otherwise the timeout cancels the future and our
329 # on_exit callback will try to set a result on a canceled future
--> 330 await wait_for(asyncio.shield(self._exit_future), timeout)
File ~/miniconda3/envs/work/lib/python3.10/site-packages/distributed/utils.py:1920, in wait_for(fut, timeout)
1919 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1920 return await asyncio.wait_for(fut, timeout)
File ~/miniconda3/envs/work/lib/python3.10/asyncio/tasks.py:458, in wait_for(fut, timeout)
456 return fut.result()
457 except exceptions.CancelledError as exc:
--> 458 raise exceptions.TimeoutError() from exc
459 finally:
460 timeout_handle.cancel()
TimeoutError:
This error does not always exist, either. But with threads_per_worker=1
, such error never appears.