I’m trying to calculate the svd of large matrix by using dask+rapidsai on gpus, refering to the following two links:
https://blog.dask.org/2020/05/13/large-svds
I copied the codes from the referances, but it did not work, and returned
TypeError: Argument 'b' has incorrect type (expected cupy._core.core.ndarray, got numpy.ndarray)
here is the python code (because I’m using one node, I have deleted some configurations about multi-node):
import time
import numpy as np
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
from dask_cuda.initialize import initialize
from dask.utils import parse_bytes
from dask.distributed import performance_report
from dask.distributed import wait
from dask.distributed import get_task_stream
from collections import defaultdict
import cupy
import rmm
import cudf
import dask.array as da
def setup_rmm_pool(client):
client.run(
cudf.set_allocator,
pool=True,
initial_pool_size= parse_bytes("25.6GB"),
allocator="default"
)
client.run(
cupy.cuda.set_allocator,
rmm.rmm_cupy_allocator
)
if __name__ == "__main__":
initialize(create_cuda_context=True)
cluster = LocalCUDACluster(local_directory="./tmp/")
client = Client(cluster)
setup_rmm_pool(client)
SIZE = 25_000
rs = da.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.randint(0, 3, size=(SIZE, SIZE), chunks=(5_000, 5_000), dtype="uint8")
print("x:",x)
t0=time.time()
u, s, vh = da.linalg.svd_compressed(x, k=1000)
print("u:",u)
print("s:",s)
print("vh",vh)
u, s, vh = da.compute(u, s, vh)
t1=time.time()
print("time:{:10.4e}s".format(t1-t0))
print(u)
print(s)
print(vh)
here is the error information:
/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/ucx.py:58: UserWarning: A CUDA context for device 0 already exists on process ID 112480. This is often the result of a CUDA-enabled library calling a CUDA runtime function before Dask-CUDA can spawn worker processes. Please make sure any such function calls don't happen at import time or in the global scope of a program.
warnings.warn(
2022-08-12 14:57:23,563 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-08-12 14:57:23,565 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-08-12 14:57:23,573 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-08-12 14:57:23,626 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-08-12 14:57:26,001 - distributed.worker - WARNING - Compute Failed
Key: ('sum-7503ff5abcef15c45377e6fab6b08c0f', 1, 1, 0)
Function: subgraph_callable-3bb057bd-d992-46d9-85bb-c7a2ccc0
args: (array([[1, 2, 2, ..., 2, 1, 1],
[0, 2, 1, ..., 1, 0, 0],
[2, 1, 1, ..., 1, 0, 0],
...,
[0, 0, 1, ..., 2, 0, 2],
[1, 2, 0, ..., 1, 2, 0],
[2, 2, 2, ..., 1, 1, 2]], dtype=uint8), array([[-4.06509678e-01, 1.64600491e+00, -3.39715455e-01, ...,
-1.00570528e+00, 1.83279271e+00, -3.68053344e-01],
[ 3.98488480e-01, -9.21862199e-01, 5.33124772e-01, ...,
1.19739800e+00, -8.87315324e-01, -3.23711103e-02],
[-1.44600309e+00, 5.16558544e-01, 5.41410230e-01, ...,
-5.90658668e-01, -1.21138653e+00, 3.01011608e-01],
...,
[ 3.72695626e-01, 1.35016683e+00, -3.33027427e-01, ...,
1.32543200e+00, -1.09257878e+00, -4.88091643e-01],
[ 3.23098284e-01, -7.25119894e-01, 1.88832146e-01, ...,
-1.21142630e-01, 1.05581178e+00, -7.19700660e-02],
[-1.14190826e+00, -1.78644899e-01, -6.67522821e-01, ...,
9.06188042e-02, 8.57061617e-01, 5.58890910e-04]]))
kwargs: {}
Exception: 'TypeError("Argument \'b\' has incorrect type (expected cupy._core.core.ndarray, got numpy.ndarray)")'
x: dask.array<randint, shape=(25000, 25000), dtype=uint8, chunksize=(5000, 5000), chunktype=cupy.ndarray>
u: dask.array<mul, shape=(25000, 1000), dtype=float64, chunksize=(5000, 1000), chunktype=numpy.ndarray>
s: dask.array<getitem, shape=(1000,), dtype=float64, chunksize=(1000,), chunktype=numpy.ndarray>
vh dask.array<mul, shape=(1000, 25000), dtype=float64, chunksize=(1000, 5000), chunktype=numpy.ndarray>
Traceback (most recent call last):
File "/work/0/TRG/luoxiao/test/dasktest/dasktest4.py", line 47, in <module>
u, s, vh = da.compute(u, s, vh)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/base.py", line 600, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/client.py", line 3014, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/client.py", line 2188, in gather
return self.sync(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 320, in sync
return sync(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 387, in sync
raise exc.with_traceback(tb)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 360, in f
result = yield future
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/client.py", line 2051, in _gather
raise exception.with_traceback(traceback)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/optimization.py", line 990, in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 149, in get
result = _execute_task(task, cache)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 119, in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 113, in _execute_task
return [_execute_task(a, cache) for a in arg]
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 113, in <listcomp>
return [_execute_task(a, cache) for a in arg]
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/utils.py", line 41, in apply
return func(*args, **kwargs)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/dask/array/routines.py", line 272, in _tensordot
x = tensordot(a, b, axes=axes)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/cupy/linalg/_product.py", line 349, in tensordot
return _core.tensordot_core(a, b, None, n, m, k, ret_shape)
TypeError: Argument 'b' has incorrect type (expected cupy._core.core.ndarray, got numpy.ndarray)
2022-08-12 14:57:26,043 - distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:38034 -> tcp://127.0.0.1:45398
Traceback (most recent call last):
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/tornado/iostream.py", line 971, in _handle_write
num_bytes = self.write_to_fd(self._write_buffer.peek(size))
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/tornado/iostream.py", line 1148, in write_to_fd
return self.socket.send(data) # type: ignore
BrokenPipeError: [Errno 32] Broken pipe
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 1704, in get_data
response = await comm.read(deserializers=serializers)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
convert_stream_closed_error(self, e)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 148, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://127.0.0.1:38034 remote=tcp://127.0.0.1:55704>: BrokenPipeError: [Errno 32] Broken pipe
2022-08-12 14:57:26,046 - distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:38034 -> tcp://127.0.0.1:41655
Traceback (most recent call last):
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 1704, in get_data
response = await comm.read(deserializers=serializers)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
convert_stream_closed_error(self, e)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://127.0.0.1:38034 remote=tcp://127.0.0.1:55700>: Stream is closed
2022-08-12 14:57:26,046 - distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:38034 -> tcp://127.0.0.1:41989
Traceback (most recent call last):
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 1704, in get_data
response = await comm.read(deserializers=serializers)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
convert_stream_closed_error(self, e)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://127.0.0.1:38034 remote=tcp://127.0.0.1:55702>: Stream is closed
2022-08-12 14:57:26,147 - distributed.worker - ERROR -
Traceback (most recent call last):
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 761, in wrapper
return await func(*args, **kwargs)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 3277, in gather_dep
response = await get_data_from_worker(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4593, in get_data_from_worker
return await retry_operation(_get_data, operation="get_data_from_worker")
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
return await retry(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
return await coro()
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4573, in _get_data
response = await send_recv(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/core.py", line 739, in send_recv
response = await comm.read(deserializers=deserializers)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 236, in read
n = await stream.read_into(chunk)
asyncio.exceptions.CancelledError
2022-08-12 14:57:26,149 - distributed.worker - ERROR -
Traceback (most recent call last):
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 761, in wrapper
return await func(*args, **kwargs)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 3277, in gather_dep
response = await get_data_from_worker(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4593, in get_data_from_worker
return await retry_operation(_get_data, operation="get_data_from_worker")
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
return await retry(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
return await coro()
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4573, in _get_data
response = await send_recv(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/core.py", line 739, in send_recv
response = await comm.read(deserializers=deserializers)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 236, in read
n = await stream.read_into(chunk)
asyncio.exceptions.CancelledError
2022-08-12 14:57:26,151 - distributed.worker - ERROR -
Traceback (most recent call last):
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils.py", line 761, in wrapper
return await func(*args, **kwargs)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 3277, in gather_dep
response = await get_data_from_worker(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4593, in get_data_from_worker
return await retry_operation(_get_data, operation="get_data_from_worker")
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
return await retry(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
return await coro()
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/worker.py", line 4573, in _get_data
response = await send_recv(
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/core.py", line 739, in send_recv
response = await comm.read(deserializers=deserializers)
File "/home/DENSEQCD/luoxiao/.miniconda3/envs/rapids-test/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError
It seems that the function is using numpy.ndarray (not cupy.ndarray) during the svd calculation. And the u,s,vh
u, s, vh = da.linalg.svd_compressed(x, k=1000)
are numpy.ndarray, which should be cupy.array.
By the way, the addition of two matrixes can be calculated on gpu:
a = cupy.asarray([[cupy.random.rand()*10 for j in range(10) ] for i in range(10)],dtype=cupy.int64)
a = da.array(a)
b = cupy.asarray([[cupy.random.rand()*10 for j in range(10) ] for i in range(10)],dtype=cupy.int64)
b = da.array(a)
s = a + b
s=da.compute(s)
Is anybody know how to fix this issue? Thanks.