Worker plugins for Nanny workers

Hey Everyone!

What I’m trying to do it’s to add a worker plugin to the Nanny worker. The idea behind this is to add some logs to our cloud system for the workers that we have in a different network from our scheduler.

I have tried to fetch the worker from the nanny and then add the plugin to the worker, but it fails:

async def start_nanny(contact_address):
    nanny = await Nanny(
        scheduler_address,
        name="Worker-local",
        contact_address=contact_address,
        protocol="tcp",
        worker_port="13370",
        host="localhost"
    )

    comms_plugin = WorkerCommsPlugin()
    await nanny.Worker.plugin_add(comms_plugin)

Unfortunately, this would lead to the following exception:

2024-08-26 12:06:45,906 - distributed.utils - ERROR - Worker.plugin_add() missing 1 required positional argument: 'plugin'
Traceback (most recent call last):
  File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^
TypeError: Worker.plugin_add() missing 1 required positional argument: 'plugin'
Traceback (most recent call last):
  File "/home/dhliv/Desktop/raiinmaker/Depin/worker.py", line 64, in <module>
    asyncio.run(start_nanny(relay_address))
  File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/dhliv/Desktop/raiinmaker/Depin/worker.py", line 58, in start_nanny
    await nanny.Worker.plugin_add(comms_plugin)
  File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^
TypeError: Worker.plugin_add() missing 1 required positional argument: 'plugin'

And then if I try to fix this by using the positional arguments in the following way it still fails:

async def start_nanny(contact_address):
    nanny = await Nanny(
        scheduler_address,
        name="Worker-local",
        contact_address=contact_address,
        protocol="tcp",
        worker_port="13370",
        host="localhost"
    )

    comms_plugin = WorkerCommsPlugin()
    await nanny.Worker.plugin_add(self=nanny.Worker, plugin=comms_plugin)

with the following exception:

2024-08-26 12:08:29,444 - distributed.worker - ERROR - type object 'Worker' has no attribute 'plugins'
Traceback (most recent call last):
  File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/worker.py", line 1863, in plugin_add
    if name in self.plugins:
               ^^^^^^^^^^^^
AttributeError: type object 'Worker' has no attribute 'plugins'
Traceback (most recent call last):
  File "/home/dhliv/Desktop/raiinmaker/Depin/worker.py", line 64, in <module>
    asyncio.run(start_nanny(relay_address))
  File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/dhliv/Desktop/raiinmaker/Depin/worker.py", line 58, in start_nanny
    await nanny.Worker.plugin_add(self=nanny.Worker, plugin=comms_plugin)
  File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/utils.py", line 837, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/worker.py", line 1863, in plugin_add
    if name in self.plugins:
               ^^^^^^^^^^^^
AttributeError: type object 'Worker' has no attribute 'plugins'

What can I do to register Worker plugins for my workers and still use the Nanny?

Hi @Dhliv, welcome to Dask community!

I am not sure I understand what you are trying to do. Why are you not using the Client registering method described in Plugins — Dask.distributed 2024.8.1+11.g4aeed40 documentation?