Accessing worker state on dask-gateway

I can access worker state on a distributed cluster using:

import distributed
client = distributed.Client()

When I try the same thing on a dask-gateway cluster, the cluster does not have a scheduler member, but I can get a scheduler by going through a client. But it’s a different sort of object, and I’m not sure how to get the workers information.

name = "xyz"
cluster = gateway.connect(name)
client = cluster.get_client()

The workers object is a:
<function distributed.core.PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc(**kwargs)>

Can I get the WorkerState with this function? My goal is to find the # of tasks processing on each worker.

An easier way to get the tasks processing on each worker is Client.processing().

(Note that this will just tell you what tasks the scheduler has sent each worker and enqueued for it to run, not necessarily which tasks are currently executing at the moment. But that’s the same as you were getting from client.cluster.scheduler.workers.)

The local client is odd, in that the scheduler actually runs in the same process (and event loop) as your code, hence why client.cluster.scheduler gives you an actual Scheduler instance. It’s unfortunate design that it’s this easy to access locally, because it probably shouldn’t be considered part of the public API, since when your scheduler is actually a different machine, you just have access to the RPC as you’re seeing.

Thanks! Is it the same info I’m getting from the info/main/workers.html page? Currently I scrape that page.

I notice clusters deadlock with a few workers with very high processing counts. So I have a process that kills the pods housing those workers. I may also want to look at the CPU for those workers – is this available, as well?

Yup, same info as info/main/workers.html, just without scraping :grinning_face_with_smiling_eyes:

client.scheduler_info() will (confusingly) give you all those worker metrics you’d see on the dashboard, including CPU. Testing locally, I see something like:

In [3]: client.scheduler_info()
{'type': 'Scheduler',
 'id': 'Scheduler-a81a9c65-8eaa-4164-8dc9-09016bb23574',
 'address': 'tcp://',
 'services': {'dashboard': 8787},
 'started': 1641523098.2240808,
 'workers': {'tcp://': {'type': 'Worker',
   'id': 2,
   'host': '',
   'resources': {},
   'local_directory': '/Users/gabe/dev/dask/dask-worker-space/worker-1jw9np4_',
   'name': 2,
   'nthreads': 4,
   'memory_limit': 8589934592,
   'last_seen': 1641523105.8124008,
   'services': {'dashboard': 62955},
   'metrics': {'executing': 0,
    'in_memory': 0,
    'ready': 0,
    'in_flight': 0,
    'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}},
    'spilled_nbytes': 0,
    'cpu': 2.8,
    'memory': 74620928,
    'time': 1641523105.810308,
    'read_bytes': 12294.81236837962,
    'write_bytes': 18442.21855256943,
    'read_bytes_disk': 0.0,
    'write_bytes_disk': 0.0,
    'num_fds': 30},
   'nanny': 'tcp://'},

Notice that you can also find out how many tasks are actually executing (vs queued) on each worker too, if that’s interesting. Though note that these metrics are what workers report to the scheduler at regular intervals, so they’ll be slightly (milliseconds-seconds) out of date and may be slightly inconsistent with the scheduler’s task counts from Client.processing(). But that may not matter for your use case, so you could probably just use scheduler_info() and skip processing().

Is there an issue open for this? Having to hack around deadlocks via k8s is really not ideal :disappointed:

Oh, yes, there are several deadlock issue (mine and others) that have been filed (just search for deadlock in the issues).

For every hour of coding up dask (for dask-gateway/k8s), I’ve easily spent 8+ hours working around issues in scaling, deadlocking, and lopsided scheduling. It’s difficult to iterate when the problems are in production and at scale.

I sort of have something that works, but it is likely less ~10% efficient for the finicky workloads.