My threaded plugin is failing when I run with UCX

HI folks,

I wrote a plugin recently to monitor GPU usage. It uses Thread and asyncio.
It works very well in a TCP environment, but when I move to use UCX, it fails with an async issue.

Does anyone know what should I do to fix that? Once I’m able to reproduce the problem, I can paste the Python exception here.

Hi @jcfaracco,

Thanks for sharing this work!

Could you give us more information about the error (at least the stacktrace), I see you are working on it, it would really help.

On my side, I don’t know anything about this communication protocol.

cc @crusaderky @jacobtomlinson

1 Like

Happy to help but we will need more information like the traceback at a minimum.

Hi @jacobtomlinson

Sorry for my late response. I was accessing my clusters. Now, I have the logs. See what I get when I run my plugin with asyncio with UCX enabled.

0: 2024-07-29 00:02:35,154 - distributed.preloading - INFO - Run preload setup: dask_memusage_gpus_plugin
0: Task exception was never retrieved
0: future: <Task finished name='Task-42' coro=<_listener_handler_coroutine() done, defined at /usr/local/lib/python3.10/dist-packages/ucp/core.py:128> exception=RuntimeError('<distributed.core.AsyncTaskGroup object at 0x7f8ed587ab60> is bound to a different event loop')>
0: Traceback (most recent call last):
0:   File "/usr/local/lib/python3.10/dist-packages/ucp/core.py", line 178, in _listener_handler_coroutine
0:     await func(ep)
0:   File "/usr/local/lib/python3.10/dist-packages/distributed/comm/ucx.py", line 523, in serve_forever
0:     await self.comm_handler(ucx)
0:   File "/usr/local/lib/python3.10/dist-packages/distributed/core.py", line 731, in handle_comm
0:     self._ongoing_background_tasks.call_soon(self._handle_comm, comm)
0:   File "/usr/local/lib/python3.10/dist-packages/distributed/core.py", line 195, in call_soon
0:     task = self._get_loop().create_task(afunc(*args, **kwargs))
0:   File "/usr/local/lib/python3.10/dist-packages/distributed/core.py", line 137, in _get_loop
0:     raise RuntimeError(f"{self!r} is bound to a different event loop")
0: RuntimeError: <distributed.core.AsyncTaskGroup object at 0x7f8ed587ab60> is bound to a different event loop

Can you also share a code example of how you are starting your cluster?

This is how I’m starting the cluster:

dask scheduler --scheduler-file /my/path/scheduler.json --protocol ucx --interface ib0 --preload dask_memusage_gpus_plugin --memusage-gpus-path /my/path/test.csv &

client.py &

dask-cuda-worker --scheduler-file /my/path/scheduler.json

For other workers, I just use the latest command about CUDA workers. I’m not using any special environment variable or something similar.

It looks like some communication code is happening in the wrong thread.

Given that we don’t see this when your plugin is not being used it’s likely that something in there is causing it. If you can reproduce this without your plugin then I recommend creating an issue on the distributed repo.

I also just wan to check that you are aware that distributed already tracks GPU memory, it is available in the dashboard, via performance reports and via the prometheus integration. Perhaps one of those solutions would avoid you having to write a plugin at all?

1 Like