Pre-load worker plugin not working (Docker)

Hi,
I’m trying to pre-load a worker plugin in a docker setup. Below the snippet from the dask.yaml file.
It does load the script successfully however the plugin is not getting registered.
The only way I’m able to register the worker plugin is through calling client.register_plugin from the client side. However I want the worker plugin to already be available when it starts up.

distributed:
  worker:
    preload:
    - /etc/dask/workerStartup.py
import click
from dask.distributed import Client, WorkerPlugin
class ErrorLogger(WorkerPlugin):
    def __init__(self):
        pass
    def setup(self, worker):
        self.worker = worker
    def transition(self, key, start, finish, *args, **kwargs):
        log.info('somewhere here...')
        import sys
        sys.exit(-1) # Just a test.
def dask_setup(worker):
    plugin = ErrorLogger()
    worker.client.register_worker_plugin(plugin)

Alternatively I tried setting it up through the scheduler preload.
It loads as well but yet again the worker plugin is not available.

@click.command()
def dask_setup(scheduler):
    plugin = ErrorLogger()
    async def setup_worker(worker):
        await worker.plugin_add(plugin=plugin, name='my_worker_plugin')
    scheduler.worker_plugins.append({'setup': setup_worker})
    for worker in scheduler.workers.values():
        scheduler.loop.add_callback(setup_worker, worker)

Is there away to do this?
Any help would be appreciated.
Regards,
Christian

I wonder if your config yaml isn’t getting picked up.

Could you try setting it via an environment variable in the worker container instead?

DASK_DISTRIBUTED__WORKER__PRELOAD='["/etc/dask/workerStartup.py"]'

Hi,

yaml is being picked up because other settings (logging) get reflected correctly.
Moreover I get a “preload-error-message” for whenever there’s an error in the file (workerStartup.py).

Christian

I tried a few tests and the script is definitely being executed. What I’m running is below (the schedulerStartup.py). It should just kill each worker as a test right away. The script is definitely being exectuted because when I just do a sys.exit(-1), none of the workers start up.

To make the question more clear: How can I register the worker in the dask_setup command?

import sys
import click
from dask.distributed import Client, WorkerPlugin, Worker, Scheduler

class KillAfterWorkPlugin(WorkerPlugin):
    def __init__(self):
        pass
    def setup(self, worker):
        self.worker = worker
    def transition(self, key, start, finish, *args, **kwargs):
        import sys
        sys.exit(-1)

def dask_setup(scheduler: Scheduler):
    scheduler.register_worker_plugin(plugin=KillAfterWorkPlugin(),name='mmm')

Hi @spheredb,

I’m not sure reading the posts above of what is working and what isn’t.

Can’t you use something like:

@click.command()
def dask_setup(worker):
    plugin = ErrorLogger()
    worker.plugin_add(plugin)