got "TypeError: Argument 'b' has incorrect type (expected cupy._core.core.ndarray, got numpy.ndarray)"

I’m trying to calculate the svd of large matrix by using dask+rapidsai on gpus, refering to the following two links:
https://blog.dask.org/2020/05/13/large-svds

I copied the codes from the referances, but it did not work, and returned

TypeError: Argument 'b' has incorrect type (expected cupy._core.core.ndarray, got numpy.ndarray)

here is the python code (because I’m using one node, I have deleted some configurations about multi-node):

import time
import numpy as np

from dask.distributed import Client
from dask_cuda import LocalCUDACluster
from dask_cuda.initialize import initialize
from dask.utils import parse_bytes
from dask.distributed import performance_report
from dask.distributed import wait
from dask.distributed import get_task_stream
from collections import defaultdict

import cupy
import rmm
import cudf
import dask.array as da

def setup_rmm_pool(client):
    client.run(
        cudf.set_allocator,
        pool=True,
        initial_pool_size= parse_bytes("25.6GB"),
        allocator="default"
    )
    client.run(
        cupy.cuda.set_allocator,
        rmm.rmm_cupy_allocator
    )

if __name__ == "__main__":

    initialize(create_cuda_context=True)
    cluster = LocalCUDACluster(local_directory="./tmp/")
    client = Client(cluster)
    setup_rmm_pool(client)

    SIZE = 25_000
    rs = da.random.RandomState(RandomState=cupy.random.RandomState)  
    x = rs.randint(0, 3, size=(SIZE, SIZE), chunks=(5_000, 5_000), dtype="uint8")
    print("x:",x)

    t0=time.time()
    u, s, vh = da.linalg.svd_compressed(x, k=1000)
    print("u:",u)
    print("s:",s)
    print("vh",vh)
    u, s, vh = da.compute(u, s, vh)
    t1=time.time()

    print("time:{:10.4e}s".format(t1-t0))
    print(u)
    print(s)
    print(vh)

here is the error information:

/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/ucx.py:58: UserWarning: A CUDA context for device 0 already exists on process ID 112480. This is often the result of a CUDA-enabled library calling a CUDA runtime function before Dask-CUDA can spawn worker processes. Please make sure any such function calls don't happen at import time or in the global scope of a program.
  warnings.warn(
2022-08-12 14:57:23,563 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-08-12 14:57:23,565 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-08-12 14:57:23,573 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-08-12 14:57:23,626 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-08-12 14:57:26,001 - distributed.worker - WARNING - Compute Failed
Key:       ('sum-7503ff5abcef15c45377e6fab6b08c0f', 1, 1, 0)
Function:  subgraph_callable-3bb057bd-d992-46d9-85bb-c7a2ccc0
args:      (array([[1, 2, 2, ..., 2, 1, 1],
       [0, 2, 1, ..., 1, 0, 0],
       [2, 1, 1, ..., 1, 0, 0],
       ...,
       [0, 0, 1, ..., 2, 0, 2],
       [1, 2, 0, ..., 1, 2, 0],
       [2, 2, 2, ..., 1, 1, 2]], dtype=uint8), array([[-4.06509678e-01,  1.64600491e+00, -3.39715455e-01, ...,
        -1.00570528e+00,  1.83279271e+00, -3.68053344e-01],
       [ 3.98488480e-01, -9.21862199e-01,  5.33124772e-01, ...,
         1.19739800e+00, -8.87315324e-01, -3.23711103e-02],
       [-1.44600309e+00,  5.16558544e-01,  5.41410230e-01, ...,
        -5.90658668e-01, -1.21138653e+00,  3.01011608e-01],
       ...,
       [ 3.72695626e-01,  1.35016683e+00, -3.33027427e-01, ...,
         1.32543200e+00, -1.09257878e+00, -4.88091643e-01],
       [ 3.23098284e-01, -7.25119894e-01,  1.88832146e-01, ...,
        -1.21142630e-01,  1.05581178e+00, -7.19700660e-02],
       [-1.14190826e+00, -1.78644899e-01, -6.67522821e-01, ...,
         9.06188042e-02,  8.57061617e-01,  5.58890910e-04]]))
kwargs:    {}
Exception: 'TypeError("Argument \'b\' has incorrect type (expected cupy._core.core.ndarray, got numpy.ndarray)")'

x: dask.array<randint, shape=(25000, 25000), dtype=uint8, chunksize=(5000, 5000), chunktype=cupy.ndarray>
u: dask.array<mul, shape=(25000, 1000), dtype=float64, chunksize=(5000, 1000), chunktype=numpy.ndarray>
s: dask.array<getitem, shape=(1000,), dtype=float64, chunksize=(1000,), chunktype=numpy.ndarray>
vh dask.array<mul, shape=(1000, 25000), dtype=float64, chunksize=(1000, 5000), chunktype=numpy.ndarray>
Traceback (most recent call last):
  File "/work/0/TRG/luoxiao/test/dasktest/dasktest4.py", line 47, in <module>
    u, s, vh = da.compute(u, s, vh)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/client.py", line 3014, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/client.py", line 2188, in gather
    return self.sync(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 320, in sync
    return sync(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 387, in sync
    raise exc.with_traceback(tb)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 360, in f
    result = yield future
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/client.py", line 2051, in _gather
    raise exception.with_traceback(traceback)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 113, in _execute_task
    return [_execute_task(a, cache) for a in arg]
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 113, in <listcomp>
    return [_execute_task(a, cache) for a in arg]
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/utils.py", line 41, in apply
    return func(*args, **kwargs)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/array/routines.py", line 272, in _tensordot
    x = tensordot(a, b, axes=axes)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/cupy/linalg/_product.py", line 349, in tensordot
    return _core.tensordot_core(a, b, None, n, m, k, ret_shape)
TypeError: Argument 'b' has incorrect type (expected cupy._core.core.ndarray, got numpy.ndarray)
2022-08-12 14:57:26,043 - distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:38034 -> tcp://127.0.0.1:45398
Traceback (most recent call last):
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/tornado/iostream.py", line 971, in _handle_write
    num_bytes = self.write_to_fd(self._write_buffer.peek(size))
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/tornado/iostream.py", line 1148, in write_to_fd
    return self.socket.send(data)  # type: ignore
BrokenPipeError: [Errno 32] Broken pipe

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 1704, in get_data
    response = await comm.read(deserializers=serializers)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 148, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed)  local=tcp://127.0.0.1:38034 remote=tcp://127.0.0.1:55704>: BrokenPipeError: [Errno 32] Broken pipe
2022-08-12 14:57:26,046 - distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:38034 -> tcp://127.0.0.1:41655
Traceback (most recent call last):
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 1704, in get_data
    response = await comm.read(deserializers=serializers)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed)  local=tcp://127.0.0.1:38034 remote=tcp://127.0.0.1:55700>: Stream is closed
2022-08-12 14:57:26,046 - distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:38034 -> tcp://127.0.0.1:41989
Traceback (most recent call last):
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 1704, in get_data
    response = await comm.read(deserializers=serializers)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed)  local=tcp://127.0.0.1:38034 remote=tcp://127.0.0.1:55702>: Stream is closed
2022-08-12 14:57:26,147 - distributed.worker - ERROR - 
Traceback (most recent call last):
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 761, in wrapper
    return await func(*args, **kwargs)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 3277, in gather_dep
    response = await get_data_from_worker(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4593, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
    return await retry(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
    return await coro()
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4573, in _get_data
    response = await send_recv(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/core.py", line 739, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 236, in read
    n = await stream.read_into(chunk)
asyncio.exceptions.CancelledError
2022-08-12 14:57:26,149 - distributed.worker - ERROR - 
Traceback (most recent call last):
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 761, in wrapper
    return await func(*args, **kwargs)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 3277, in gather_dep
    response = await get_data_from_worker(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4593, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
    return await retry(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
    return await coro()
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4573, in _get_data
    response = await send_recv(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/core.py", line 739, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 236, in read
    n = await stream.read_into(chunk)
asyncio.exceptions.CancelledError
2022-08-12 14:57:26,151 - distributed.worker - ERROR - 
Traceback (most recent call last):
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 761, in wrapper
    return await func(*args, **kwargs)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 3277, in gather_dep
    response = await get_data_from_worker(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4593, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
    return await retry(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
    return await coro()
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4573, in _get_data
    response = await send_recv(
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/core.py", line 739, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

It seems that the function is using numpy.ndarray (not cupy.ndarray) during the svd calculation. And the u,s,vh

u, s, vh = da.linalg.svd_compressed(x, k=1000)

are numpy.ndarray, which should be cupy.array.

By the way, the addition of two matrixes can be calculated on gpu:

a = cupy.asarray([[cupy.random.rand()*10 for j in range(10) ] for i in range(10)],dtype=cupy.int64)
a = da.array(a)
b = cupy.asarray([[cupy.random.rand()*10 for j in range(10) ] for i in range(10)],dtype=cupy.int64)
b = da.array(a)
s = a + b
s=da.compute(s)

Is anybody know how to fix this issue? Thanks.