In my workflow, I can have 3 outer-level distributed futures, and each one creates chunked processes. Say, these are train, dev, test sets from a dataset, with 80-10-10% division.
If I run all 3 together and divide available cores between them, two smaller ones finish rapidly and the train set would only use 1/3 of available cores thereafter (I feed the tasks eagerly with as_completed.add()
after a task finishes).
I want to find number of idle cores and if there are 3 idle, I want to add 3 more tasks. I though I could use client.scheduler_info()["workers"]
, but I could not find how (that part not documented in detail). E.g. is using the worker_info["metrics"]["cpu"] == 0
or better worker_info["metrics"]["task_counts"]["executing"] != 1
(needs try/except I think) safe to determine these?
Or is there any other way?
Here is one worker’s info from current version:
{
"type": "Worker",
"id": 6,
"host": "127.0.0.1",
"resources": {"io_bound": 1, "cpu_bound": 1},
"local_directory": "C:\\TEMP1\\dask-scratch-space\\worker-ebfhxx8w",
"name": 6,
"nthreads": 1,
"memory_limit": 5712486400,
"last_seen": 1726011699.6877353,
"services": {"dashboard": 59905},
"metrics": {
"task_counts": {"executing": 1},
"bandwidth": {"total": 100000000, "workers": {}, "types": {}},
"digests_total_since_heartbeat": {
"tick-duration": 0.9965400695800781,
"latency": 0.02536606788635254,
"profile-duration": 0.008771181106567383,
},
"managed_bytes": 0,
"spilled_bytes": {"memory": 0, "disk": 0},
"transfer": {
"incoming_bytes": 0,
"incoming_count": 0,
"incoming_count_total": 0,
"outgoing_bytes": 0,
"outgoing_count": 0,
"outgoing_count_total": 0,
},
"event_loop_interval": 0.020341262221336365,
"cpu": 99.9,
"memory": 748695552,
"time": 1726011699.6566687,
"host_net_io": {
"read_bps": 1177.4330340287108,
"write_bps": 1649.5915829093515,
},
"host_disk_io": {"read_bps": 0.0, "write_bps": 283216.10697710735},
},
"status": "running",
"nanny": "tcp://127.0.0.1:59882",
}