I have a permanent dask cluster in kubernetes. Workers need access to multiple databases. To avoid reconnects I build custom dask image with required libraries and custom preload_entrypoint.py
:
from dask.distributed import Worker
# dask calls this function
# all code which might fail must exist inside try block
async def dask_setup(worker: Worker):
try:
from backend.dask.preload import preload
await preload(worker)
except Exception as e:
import sys
print("preload failed:", e)
# explicitly exit to prevent worker from running without preload code
# this does not kill pod but prevents it from registering in scheduler
# worker.stop() does not work for some reason
sys.exit(1)
The file it imports assigns settings, db pools etc to worker object:
async def preload(worker_: Worker):
# subclass that defines my_* types
worker = cast(MyWorker, worker_)
worker.my_settings = settings
worker.my_log = make_logger("worker")
worker.my_mongo = await connect_mongo()
There are multiple issues with this:
- clashing with worker namespace. Are there better ways to set properties on worker in type safe way?
- how to asyncronously close all these connections on worker exit?
- as mentioned in comment, worker ignores errors in preload.py for some reason and would just skip initialization if not explicitly killed. and even then it doesnt kill pod, just keeps retrying in a loop. is there a way to kill entire pod to indicate something is wrong with initialization?
I tried using WorkerPlugin but it seem to give even less control over worker. Errors are ignored, stdout suppressed and other weird things making it hard to debug.
Hi @Fogapod,
I’m not sure about what you means here.
WorkerPlugin gives you a teardown
method.
Could you elaborate? Maybe this is an issue in preload.
Does the retries happen at pods level or in Dask? Maybe this is more a Kubernetes setting?
Maybe you should use worker logger to print things. Not sure why stdout might be suppressed. What other weird things did you see? I think WorkerPlugin would be the correct way in your case.
By clashing with worker namespace I mean I don’t like setting things on worker directly and having to use prefix for my variables.
My worker type I cast to:
class MyWorker(Worker):
my_log: APPLogger
my_settings: ServiceSettings
my_object_store: ObjectStore
my_mongo: AIOMongoDBServer[Any]
my_http_client: httpx.AsyncClient
Simple example:
async def dask_setup(worker):
1 / 0
worker.a = 1234 # this will never run and will cause AttributeError at runtime inside tasks
2024-06-28 14:15:26,013 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.244.9.106:44589'
2024-06-28 14:15:26,476 - distributed.preloading - INFO - Creating preload: /opt/backend/dask/bin.runfiles/_main/backend/dask/preload_entrypoint.py
2024-06-28 14:15:26,477 - distributed.utils - INFO - Reload module preload_entrypoint from .py file
2024-06-28 14:15:26,477 - distributed.preloading - INFO - Import preload module: /opt/backend/dask/bin.runfiles/_main/backend/dask/preload_entrypoint.py
2024-06-28 14:15:26,876 - distributed.preloading - INFO - Run preload setup: /opt/backend/dask/bin.runfiles/_main/backend/dask/preload_entrypoint.py
2024-06-28 14:15:26,876 - distributed.preloading - ERROR - Failed to start preload: /opt/backend/dask/bin.runfiles/_main/backend/dask/preload_entrypoint.py
Traceback (most recent call last):
File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/preloading.py", line 234, in start
await preload.start()
File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/preloading.py", line 213, in start
await future
File "/tmp/dask-scratch-space/worker-78qpzjdj/preload_entrypoint.py", line 8, in dask_setup
1 / 0
~~^~~
ZeroDivisionError: division by zero
Preload failed but worker just ignores it and continues initialization normally.
Restarts happen inside worker. Kubernetes pod stays alive.
I will use that, thanks. I think some exceptions were not shown as well but I will need to double check since I moved away from plugins
1 Like
This might be an issue, I encourage you to raise an issue on distributed issue tracker with a reproducer.
Well, I don’t think there really is another solution.
is there a way to register plugins from --preload
though? It feels a bit wrong defining plugins by doing the following at each worker startup:
async def dask_preload(worker: Worker):
client = worker.client
client.register_plugin(MyPlugin())
Since I have a long running cluster I can’t register plugins from client side because that creates versioning complications
EDIT: found undocumented dask_teardown
: distributed/distributed/preloading.py at a57ab42e2600d0589342fe596e443c0e40112976 · dask/distributed · GitHub
I’m not sure of your problem, but from the Dask Plugins documentation page, for the setup
function of a plugin:
Run when the plugin is attached to a worker. This happens when the plugin is registered and attached to existing workers, or when a worker is created after the plugin has been registered.
So you should have to register this plugin only once.