Using dask distributed scheduler with CuPy

Recently I have been running some large image reconstruction computations on a GPU using the dask local scheduler and CuPy. I was trying to change to the distributed scheduler to take advantage of the nice visualization tools, but am having some problems. In particular, when I switch scheduler I start seeing CUDA runtime errors. However, I don’t think this is really a CuPy/CUDA problem because the same script runs without error using the local scheduler.

I am looking for some insight into what is going wrong here. am I misusing the distributed scheduler? Is there some reason I should not expect this to play nice with CuPy?

Here’s a simple script showing this problem.
I am running Python 3.9.7
dask 2022.05.2
CuPy: cupy-cuda112 10.1.0

import dask.array as da
import cupy as cp
from dask.distributed import Client
import numpy as np

if __name__ == "__main__":
    # script generates error if using distributed scheduler
    client = Client() 

    # generate random numbers on cpu
    m = da.random.normal(size=(10, 1024, 1204), chunks=(1, 1024, 1024))

    # move to gpu
    n = da.map_blocks(lambda a: cp.asarray(a),
                      m,
                      dtype=float,
                      meta=cp.array([]),
                      )

    # take average
    c = n.mean(axis=0).compute()

Running this script gives errors like:

2023-03-12 14:13:14,812 - distributed.worker - WARNING - Compute Failed
Key:       ('normal-mean_chunk-a63538303ea5d3aa30770a1cffa52d23', 6, 0, 1)
Function:  execute_task
args:      ((subgraph_callable-ca6bd2bd-67ff-4833-9c0b-0e3dbe99880f, (<function _apply_random at 0x00000257FFB01D30>, None, 'normal', array([2091449326, 1212383101, 2368016930,  142118476, 2123126501,
         34296365, 3071889805, 2443673445, 2637676760, 3752601443,
       3968457419, 1266953699, 2717250215, 4281392320, 1417859837,
       3808676731, 3925839290, 4281767247, 3410509063, 3707335665,
       4000099702, 2596823318, 1645030008, 3144614706,  414108185,
       3256532987, 1704711130, 1732176453, 4122088066,  625945448,
       1607808019,  487631450, 2896540170, 1707515452, 2988695013,
       3716837561, 1280465119, 1538272760,  339428695, 1872723080,
       2481544243, 2765609795, 3179652502,  811024844, 3684187116,
       2925976497, 4111897842, 1401907683, 3793865124,  507433036,
       3854105569, 2275119304,  855828169, 1664696094, 2237304806,
       2629391234, 4272235888, 3733506322,  115218582, 3026614138,
        538769257, 3339688838,  932858209, 2914506355, 3986535113,

kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

Traceback (most recent call last):
  File "C:\Users\q2ilab\Documents\mcsim_private\misc_scripts\2023_03_12_dask_distributed_test.py", line 31, in <module>
    c = n.mean(axis=0).compute()
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\dask\base.py", line 312, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\dask\base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\distributed\client.py", line 3014, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\distributed\client.py", line 2188, in gather
    return self.sync(
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\distributed\utils.py", line 320, in sync
    return sync(
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\distributed\utils.py", line 387, in sync
    raise exc.with_traceback(tb)
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\distributed\utils.py", line 360, in f
    result = yield future
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\tornado\gen.py", line 769, in run
    value = future.result()
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\distributed\client.py", line 2051, in _gather
    raise exception.with_traceback(traceback)
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\dask\optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\dask\core.py", line 149, in get
    result = _execute_task(task, cache)
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\dask\core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\dask\core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\dask\core.py", line 113, in _execute_task
    return [_execute_task(a, cache) for a in arg]
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\dask\core.py", line 113, in <listcomp>
    return [_execute_task(a, cache) for a in arg]
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\dask\core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\q2ilab\Documents\mcsim_private\misc_scripts\2023_03_12_dask_distributed_test.py", line 24, in <lambda>
    n = da.map_blocks(lambda a: xp.asarray(a),
  File "C:\Users\q2ilab\miniconda3\envs\mcsim39\lib\site-packages\cupy\_creation\from_data.py", line 76, in asarray
    return _core.array(a, dtype, False, order)
  File "cupy\_core\core.pyx", line 2249, in cupy._core.core.array
  File "cupy\_core\core.pyx", line 2270, in cupy._core.core.array
  File "cupy\_core\core.pyx", line 2418, in cupy._core.core._array_default
  File "cupy\cuda\memory.pyx", line 470, in cupy.cuda.memory.MemoryPointer.copy_from_host_async
  File "cupy_backends\cuda\api\runtime.pyx", line 560, in cupy_backends.cuda.api.runtime.memcpyAsync
  File "cupy_backends\cuda\api\runtime.pyx", line 132, in cupy_backends.cuda.api.runtime.check_status
1 Like

Hi @ptbrown, welcome here!

Even though you should be able to use your GPU with standard distributed clusters, it might be easier by using LocalCudaCluster, see:
https://dask-cuda.readthedocs.io/en/stable/quickstart.html

Could you try that and report back?

@guillaumeeb thanks for the response!

I should have also mentioned that I am running Windows 10 enterprise. My understanding is that dask-cuda other packages from RAPIDS would need to run through the Windows Linux Subsystem. Is that right? It will take me some time to set up the environment, but I will give it a shot

@guillaumeeb after installing Windows Subsystem for Linux (WSL2) I was able to create a new environment and install RAPIDS (including dask_cuda). Modifying the client creation in my script to

from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)

then allows the script to complete successfully

That is great, however I am not very keen on moving my entire workflow over to WSL. So I guess the question is: why does the LocalCudaCluster work when the distributed cluster does not? Is there a way to install this package on Windows? Or should I accept that on Windows I am stuck using a local scheduler?

Well, I have to admit I don’t know. On this page:
https://dask-cuda.readthedocs.io/en/stable/specializations.html

There is a list of the specializations that dask-cuda does. It is also said:

It is known that main line Dask and Distributed packages can be used to leverage GPU computing, utilizing libraries such as cuDF, CuPy and Numba

A first thing that I tried would be to launch a LocalCluster with only one process, and after that only one thread:

client = Client(processes=False)

Maybe there are some race conditions happening with multiple processes per GPU. I guess that if it doesn’t solve your problem, the next step would be to dig a bit into the code of LocalCUDACluster.

I’ve never tried it, but I see nowhere that dask-cuda is restricted to Linux. Did you try to just pip or conda install it?

@guillaumeeb thanks for your help!

A first thing that I tried would be to launch a LocalCluster with only one process, and after that only one thread:
client = Client(processes=False)

Running using threads instead of processes with the distributed scheduler (but withou dask-cuda) does work. As you mentioned LocalCUDACluster is designed to use only one process, so it seems like there is something funny going on with multiple processes. When I use the local scheduler I am also running with multiple processes, so not sure what is different here. But I agree the only way to understand this would be to dig into LocalCUDACluster

The question I have to answer is: am I winning anything from using multiple processes with the GPU. If the answer is “no” then my problem is solved. I have to do some more testing and see if this is the case

I’ve never tried it, but I see nowhere that dask-cuda is restricted to Linux. Did you try to just pip or conda install it?

Yes, I tried to install following their instruction. Using conda, solving the environment hangs for >20 minutes. Using pip, the install complete right away, but upon trying
import dask_cuda I get the error

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Users\q2ilab\mambaforge\envs\dask-cuda2\lib\site-packages\dask_cuda\__init__.py", line 4, in <module>
    raise ImportError("Only Linux is supported by Dask-CUDA at this time")
ImportError: Only Linux is supported by Dask-CUDA at this time

The RAPIDS project, of which dask-cuda is a part, is not compatible with Windows in general. This is why they only describe installation using WSL. In some cases I have been able to install subprojects on Windows. In particular with cuCIM. They discuss this issue here for cuCIM in particular. Maybe there is a similar workaround for dask-cuda.

You are able to run your workflow with the multiprocessing Scheduler, but not with a LocalCluster using process? That sounds illogical, I really don’t know what can be different either.

Well, that is a clear error message!

Hey there! I shared this thread with some of my colleagues in RAPIDS and they have some useful answers. Dask CUDA is part of the RAPIDS project so if you have more questions I recommend you open an issue over there.

Does Dask-CUDA work on Windows without WSL2?

It may or may not work, but it isn’t supported and there are no guarantees it will work in the future

Why does LocalCUDACluster worker while LocalCluster does not?

There are some specializations in LocalCUDACluster to make it work with multiple GPUs, by selecting the correct GPU for each process. This can be done by the user with LocalCluster, but it won’t be out-of-the-box, thus it is recommended to use LocalCUDACluster.

Am I winning anything from using multiple processes with the GPU?

LocalCUDACluster supports multiple GPUs by selecting one GPU per process it instantiates. Other uses (multiple GPUs in a single-process worker) are not currently supported.

@jacobtomlinson thanks for your input!

I was not able to run Dask-CUDA on Windows without WSL2. When I tried I got the error message I posted above. It looks like due to the fact the software is not tested/developed for windows it checks the OS and throws an error on windows (see here). Fair enough! I guess your point is if I were to remove that error statement Dask-CUDA might run on Windows.

I think the reason my script worked using Dask-CUDA is because Dask-CUDA enforces one processes per GPU. So at the end of the day my question is not about Dask-CUDA, but about the different behavior of the dask local scheduler and the distributed scheduler using multiprocessing.

The core of the problem seems to be that using (1) the local multiprocessing scheduler + CuPy works but (2) the distributed multiprocessing + CuPy does not work. But both multithreading schedulers work. As @guillaumeeb puts it

You are able to run your workflow with the multiprocessing Scheduler, but not with a LocalCluster using process? That sounds illogical, I really don’t know what can be different either.

Here’s a minimal example showing that behavior. I’m curious if others can reproduce this.

import numpy as np
import cupy as cp
import dask
from dask.diagnostics import ProgressBar
import dask.array as da
from dask.distributed import Client, LocalCluster
# from dask_cuda import LocalCUDACluster # only for Linux/Windows + WSL2

if __name__ == "__main__":
    # local scheduler with processes
    dask.config.set(scheduler='processes')
    # this succeeds

    # local scheduler with threads
    # dask.config.set(scheduler='threads')
    # this succeeds

    # distributed scheduler with processes
    # cluster = LocalCluster(processes=True)
    # client = Client(cluster)
    # this fails

    # distributed scheduler with threads
    # cluster = LocalCluster(processes=False)
    # client = Client(cluster)
    # this succeeds

    # distributed scheduler + dask_cuda
    # note: 1 process per GPU and only works on Linux/Windows + WSL2
    # cluster = LocalCUDACluster()
    # client = Client(cluster)
    # this succeeds


    # generate random numbers on cpu
    m = da.random.normal(size=(1000, 1024, 1204), chunks=(1, 1024, 1024))

    # move to gpu
    n = da.map_blocks(lambda a: cp.asarray(a),
                      m,
                      dtype=float,
                      meta=cp.array([]),
                      )

    # take average
    c = n.mean(axis=0).compute()

Hi @ptbrown,

Just tested your code (thanks a lot for the reproducer). In my case, the LocalCluster with processes = True is working. It is almost 3 times faster than with LocalCudaCluster (given that I have 4 processes). I’m running this on Ubuntu 22.04 (no WSL).

@guillaumeeb thank you for testing!

In my case, the LocalCluster with processes = True is working. It is almost 3 times faster than with LocalCudaCluster (given that I have 4 processes). I’m running this on Ubuntu 22.04 (no WSL).

I just tested the same scheduler on Linux Mint 19.3 with dask 2022.12.1, distributed 2023.1.1, and cupy-cuda11x 11.4.0 and it also succeeded. Here I am using a Gk110 GeForce GTX Titan GPU.

Looks like this is a Windows problem then.

1 Like