Hey Everyone!
What I’m trying to do it’s to add a worker plugin to the Nanny worker. The idea behind this is to add some logs to our cloud system for the workers that we have in a different network from our scheduler.
I have tried to fetch the worker from the nanny and then add the plugin to the worker, but it fails:
async def start_nanny(contact_address):
nanny = await Nanny(
scheduler_address,
name="Worker-local",
contact_address=contact_address,
protocol="tcp",
worker_port="13370",
host="localhost"
)
comms_plugin = WorkerCommsPlugin()
await nanny.Worker.plugin_add(comms_plugin)
Unfortunately, this would lead to the following exception:
2024-08-26 12:06:45,906 - distributed.utils - ERROR - Worker.plugin_add() missing 1 required positional argument: 'plugin'
Traceback (most recent call last):
File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/utils.py", line 837, in wrapper
return await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
TypeError: Worker.plugin_add() missing 1 required positional argument: 'plugin'
Traceback (most recent call last):
File "/home/dhliv/Desktop/raiinmaker/Depin/worker.py", line 64, in <module>
asyncio.run(start_nanny(relay_address))
File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/home/dhliv/Desktop/raiinmaker/Depin/worker.py", line 58, in start_nanny
await nanny.Worker.plugin_add(comms_plugin)
File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/utils.py", line 837, in wrapper
return await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
TypeError: Worker.plugin_add() missing 1 required positional argument: 'plugin'
And then if I try to fix this by using the positional arguments in the following way it still fails:
async def start_nanny(contact_address):
nanny = await Nanny(
scheduler_address,
name="Worker-local",
contact_address=contact_address,
protocol="tcp",
worker_port="13370",
host="localhost"
)
comms_plugin = WorkerCommsPlugin()
await nanny.Worker.plugin_add(self=nanny.Worker, plugin=comms_plugin)
with the following exception:
2024-08-26 12:08:29,444 - distributed.worker - ERROR - type object 'Worker' has no attribute 'plugins'
Traceback (most recent call last):
File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/utils.py", line 837, in wrapper
return await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/worker.py", line 1863, in plugin_add
if name in self.plugins:
^^^^^^^^^^^^
AttributeError: type object 'Worker' has no attribute 'plugins'
Traceback (most recent call last):
File "/home/dhliv/Desktop/raiinmaker/Depin/worker.py", line 64, in <module>
asyncio.run(start_nanny(relay_address))
File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/dhliv/.pyenv/versions/3.11.2/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/home/dhliv/Desktop/raiinmaker/Depin/worker.py", line 58, in start_nanny
await nanny.Worker.plugin_add(self=nanny.Worker, plugin=comms_plugin)
File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/utils.py", line 837, in wrapper
return await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/dhliv/.cache/pypoetry/virtualenvs/depin-lvuHD2G4-py3.11/lib/python3.11/site-packages/distributed/worker.py", line 1863, in plugin_add
if name in self.plugins:
^^^^^^^^^^^^
AttributeError: type object 'Worker' has no attribute 'plugins'
What can I do to register Worker plugins for my workers and still use the Nanny?