How to find count of idle workers from scheduler_info?

In my workflow, I can have 3 outer-level distributed futures, and each one creates chunked processes. Say, these are train, dev, test sets from a dataset, with 80-10-10% division.

If I run all 3 together and divide available cores between them, two smaller ones finish rapidly and the train set would only use 1/3 of available cores thereafter (I feed the tasks eagerly with as_completed.add() after a task finishes).

I want to find number of idle cores and if there are 3 idle, I want to add 3 more tasks. I though I could use client.scheduler_info()["workers"], but I could not find how (that part not documented in detail). E.g. is using the worker_info["metrics"]["cpu"] == 0 or better worker_info["metrics"]["task_counts"]["executing"] != 1 (needs try/except I think) safe to determine these?

Or is there any other way?

Here is one worker’s info from current version:

{
    "type": "Worker",
    "id": 6,
    "host": "127.0.0.1",
    "resources": {"io_bound": 1, "cpu_bound": 1},
    "local_directory": "C:\\TEMP1\\dask-scratch-space\\worker-ebfhxx8w",
    "name": 6,
    "nthreads": 1,
    "memory_limit": 5712486400,
    "last_seen": 1726011699.6877353,
    "services": {"dashboard": 59905},
    "metrics": {
        "task_counts": {"executing": 1},
        "bandwidth": {"total": 100000000, "workers": {}, "types": {}},
        "digests_total_since_heartbeat": {
            "tick-duration": 0.9965400695800781,
            "latency": 0.02536606788635254,
            "profile-duration": 0.008771181106567383,
        },
        "managed_bytes": 0,
        "spilled_bytes": {"memory": 0, "disk": 0},
        "transfer": {
            "incoming_bytes": 0,
            "incoming_count": 0,
            "incoming_count_total": 0,
            "outgoing_bytes": 0,
            "outgoing_count": 0,
            "outgoing_count_total": 0,
        },
        "event_loop_interval": 0.020341262221336365,
        "cpu": 99.9,
        "memory": 748695552,
        "time": 1726011699.6566687,
        "host_net_io": {
            "read_bps": 1177.4330340287108,
            "write_bps": 1649.5915829093515,
        },
        "host_disk_io": {"read_bps": 0.0, "write_bps": 283216.10697710735},
    },
    "status": "running",
    "nanny": "tcp://127.0.0.1:59882",
}

First of all: why are you doing that? Isn’t it possible to submit all the tasks at once? Could you give a more precise example of code/workflow?

Else, you might find interesting things is this method, used for adaptive deployment.

My problem is, I don’t know what is coming and I’m trying to make some “more intelligent” decisions before starting. And sorry for the long post :frowning:

  • I’m dealing with voice datasets and (mostly Common Voice, which has version, but also others like Voxpopuli and Fleurs) and first of all I import them into parquet format for further access (I keep deltas to not replicate), or if requested so, I extract them as files. It is CLI driven and the user can specify the languages and versions, if transcoding is required, audio should be analyzed or not, etc. Datasets can be small (a recently added language - a couple MBs) or very large (like in English - 80GB or so). Also, some non Common Voice datasets are provided as separate train, dev, test tar.gz files, which should be merged. In short the data and tasks are not “horizontal”.
  • If audio transcription/analysis is not requested, the job is simply io-bound, so I just do it (taking care of disk-io bottlenecks). But if audio transcription/analysis is requested, it is mostly a cpu-intensive task, and I need to “spawn” sub-tasks to handle them in chunks, so I will have N io-bound tasks, each one creating M cpu-bound tasks. Here, cpu-bound tasks return result chunks, which are collected in io-bound ones and if they reach/exceed a certain count, it is written as a unique parquet part file (this is to keep the parquet part sized in a certain range and to free memory).
  • Based on the task and depending on the files I have, I create a LocalCluster with different values, and I can use more processes than logical cores.

The problem is: On a single cluster io-bound and cpu-bound tasks can compete. I need to control the flow. Here is an example:

  • I transcode & import Common Voice v3 datasets, total 29 languages(=.tar.gz files). In this case I want to use 3 workers for upper-level io-tasks (per file), and use all logical cores for cpu-bound tasks which read a chunk of 800 records and process them (I do not know the amount of clips in each dataset, I can only estimate it from the file size with some statistics).
  • So for each of this io-bound tasks I want to dedicate equal amount of logical-cores at the start. In my laptop I have 6c/12t CPU, so I have 3 io-bound and 12 cpu-bound tasks, total 15 workers. For each io-bound task, I first dedicate 12/3 = 4 cpu-bound tasks.
  • This works well at the start. But English dataset has 900k records, and others are much smaller. Although I sort the tasks by file-size descending to be able to start the larger datasets first (to lower the wall-time), all other tasks finish and English dataset is left with 4 cores - thus the question.
  • Therefore I try to find a way to extend the cores allocated to that worker. At that point if more than 1 workers are idle, I get the idle count and add that much new chunks (add new futures) so that all cores get busy.
  • Actually I coded it with worker_info["metrics"]["task_counts"]["executing"] and it is working. But it returns erratic counts between 0-2, so it is not so reliable. For 0 cases, I return 1 because I already know that one is finished.

I’m currently working with LocalCluster, but I want to scale it to a LAN for upper level tasks, so each node handles a set of datasets, I will also have same problem in each with this implementation… I tried many methods like delayed, bag, adaptive, priorities, resources, etc, but none were optimal, so I decided to handle it myself by feeding the futures in a controlled manner - probably the wrong direction.

  • Outer-level: I create 3 futures for io-bound/files and start them, when one finishes I add another.
  • Inner level: I create 4 futures (per io-bound) for cpu-bound/chunks and start them, when one finishes I add another. HERE comes the problem with very long task at the end.

One thing I didn’t try (came to my mind while writing these): Create a LocalCluster with (say) 3 workers and rescale it with 12 more in my case. But related questions:

  • When I add all 29 futures to the 3 workers at the very start (each worker would have 9-10 tasks) will they compete or only the first ones get executed?
  • How can I create 3 workers only for io-bound and 12 workers only for cpu-bound? This is also required for memory management. As outer-tasks collect data and then write it out in batches, they need more memory (and cluster settings are valid for all workers).
    • Do I need to create two different clusters (as available resources are defined in cluster)?
    • Or must I use worker objects directly?

Or what would be your suggestion for the scenario above? I’ve been dealing with this for a month now and I’m out of ideas.

This is how it works:

Here I have English and German datasets, and English is about 3x larger than German.
So I have 2 io_bound, 12 cpu_bound tasks, totaling 14 at the start.
When German ends, that extra is also used for cpu_bound tasks (not something that I want, because of context-switching).
Also, as you see some workers have more than one task (at this image 20 total jobs), because not-running-tasks sometimes return 2, it increased. Not a problem thou, they are queued. And I don’t want to queue them all to be able to rescale to idle cores. Would cluster.adapt() be helpful for it?

If I have left it alone, only 6 logical cores would run English chunks - so I “steal workers”. Is worker-stealing for these cases?

I’ve trouble to understand what you are trying to do. Why do you try to manually affect tasks to only 4 CPU? What don’t you let Dask handle the cpu_bound tasks distribution between workers? This way if dataset size are imbalance you don’t care.

With resources, you shouldn’t have to do that. Just create all your IOBound future, and when one is finished, just create all the corresponding cpu-bound tasks. Is there a problem with this approach?

1 Like

You are right. I will solve it with SpecCluster and use it on outer level by pushing all futures.

It might cause trouble for inner level, as it includes reading chunks of data from multiple files. I need to test it.

1 Like

Unfortunately, whenever I switched to SpecCluster, my CPU utilization dropped to ~40-45%.
I created a worker_spec dict with 3 outer and 12 inner tasks and fed 3 files, also did not specify any scheduler, tried to specify a default one, fed less futures to inner, with no change…

It seems like it is using threads (no setting to force processes?) and/or it is context switching, or just has too much overhead in scheduling.

One interesting thing is: There is a large delay after the audio processing is finished, which was not there in LocalCluster:

Also memory usage in each worker increased considerably, throttling the workers so I had to set it to 0 (although in Task Manager I see 32GB free RAM). This is probably caused from the fact that I have to read the chunk in the outer level and pass it AFTER some pre-processing. When I give 3*12 processes to the inner level, memory usage triples.

Any advice will be very much appreciated.

Could you share your code?

And share a simplified example?