Optimal way to store connections in distributed worker

I have a permanent dask cluster in kubernetes. Workers need access to multiple databases. To avoid reconnects I build custom dask image with required libraries and custom preload_entrypoint.py:

from dask.distributed import Worker


# dask calls this function
# all code which might fail must exist inside try block
async def dask_setup(worker: Worker):
    try:
        from backend.dask.preload import preload

        await preload(worker)
    except Exception as e:
        import sys

        print("preload failed:", e)

        # explicitly exit to prevent worker from running without preload code
        # this does not kill pod but prevents it from registering in scheduler
        # worker.stop() does not work for some reason
        sys.exit(1)

The file it imports assigns settings, db pools etc to worker object:

async def preload(worker_: Worker):
    # subclass that defines my_* types
    worker = cast(MyWorker, worker_)

    worker.my_settings = settings
    worker.my_log = make_logger("worker")
    worker.my_mongo = await connect_mongo()

There are multiple issues with this:

  • clashing with worker namespace. Are there better ways to set properties on worker in type safe way?
  • how to asyncronously close all these connections on worker exit?
  • as mentioned in comment, worker ignores errors in preload.py for some reason and would just skip initialization if not explicitly killed. and even then it doesnt kill pod, just keeps retrying in a loop. is there a way to kill entire pod to indicate something is wrong with initialization?

I tried using WorkerPlugin but it seem to give even less control over worker. Errors are ignored, stdout suppressed and other weird things making it hard to debug.

Hi @Fogapod,

I’m not sure about what you means here.

WorkerPlugin gives you a teardown method.

Could you elaborate? Maybe this is an issue in preload.

Does the retries happen at pods level or in Dask? Maybe this is more a Kubernetes setting?

Maybe you should use worker logger to print things. Not sure why stdout might be suppressed. What other weird things did you see? I think WorkerPlugin would be the correct way in your case.

By clashing with worker namespace I mean I don’t like setting things on worker directly and having to use prefix for my variables.

My worker type I cast to:

class MyWorker(Worker):
    my_log: APPLogger
    my_settings: ServiceSettings
    my_object_store: ObjectStore
    my_mongo: AIOMongoDBServer[Any]
    my_http_client: httpx.AsyncClient

Simple example:

async def dask_setup(worker):
    1 / 0
    worker.a = 1234  # this will never run and will cause AttributeError at runtime inside tasks
2024-06-28 14:15:26,013 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.244.9.106:44589'
2024-06-28 14:15:26,476 - distributed.preloading - INFO - Creating preload: /opt/backend/dask/bin.runfiles/_main/backend/dask/preload_entrypoint.py
2024-06-28 14:15:26,477 - distributed.utils - INFO - Reload module preload_entrypoint from .py file
2024-06-28 14:15:26,477 - distributed.preloading - INFO - Import preload module: /opt/backend/dask/bin.runfiles/_main/backend/dask/preload_entrypoint.py
2024-06-28 14:15:26,876 - distributed.preloading - INFO - Run preload setup: /opt/backend/dask/bin.runfiles/_main/backend/dask/preload_entrypoint.py
2024-06-28 14:15:26,876 - distributed.preloading - ERROR - Failed to start preload: /opt/backend/dask/bin.runfiles/_main/backend/dask/preload_entrypoint.py
Traceback (most recent call last):
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/preloading.py", line 234, in start
    await preload.start()
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/preloading.py", line 213, in start
    await future
  File "/tmp/dask-scratch-space/worker-78qpzjdj/preload_entrypoint.py", line 8, in dask_setup
    1 / 0
    ~~^~~
ZeroDivisionError: division by zero

Preload failed but worker just ignores it and continues initialization normally.

Restarts happen inside worker. Kubernetes pod stays alive.

I will use that, thanks. I think some exceptions were not shown as well but I will need to double check since I moved away from plugins

1 Like

This might be an issue, I encourage you to raise an issue on distributed issue tracker with a reproducer.

Well, I don’t think there really is another solution.

is there a way to register plugins from --preload though? It feels a bit wrong defining plugins by doing the following at each worker startup:

async def dask_preload(worker: Worker):
    client = worker.client
    client.register_plugin(MyPlugin())

Since I have a long running cluster I can’t register plugins from client side because that creates versioning complications

EDIT: found undocumented dask_teardown: distributed/distributed/preloading.py at a57ab42e2600d0589342fe596e443c0e40112976 · dask/distributed · GitHub

I’m not sure of your problem, but from the Dask Plugins documentation page, for the setup function of a plugin:

Run when the plugin is attached to a worker. This happens when the plugin is registered and attached to existing workers, or when a worker is created after the plugin has been registered.

So you should have to register this plugin only once.