How to find count of idle workers from scheduler_info?

My problem is, I don’t know what is coming and I’m trying to make some “more intelligent” decisions before starting. And sorry for the long post :frowning:

  • I’m dealing with voice datasets and (mostly Common Voice, which has version, but also others like Voxpopuli and Fleurs) and first of all I import them into parquet format for further access (I keep deltas to not replicate), or if requested so, I extract them as files. It is CLI driven and the user can specify the languages and versions, if transcoding is required, audio should be analyzed or not, etc. Datasets can be small (a recently added language - a couple MBs) or very large (like in English - 80GB or so). Also, some non Common Voice datasets are provided as separate train, dev, test tar.gz files, which should be merged. In short the data and tasks are not “horizontal”.
  • If audio transcription/analysis is not requested, the job is simply io-bound, so I just do it (taking care of disk-io bottlenecks). But if audio transcription/analysis is requested, it is mostly a cpu-intensive task, and I need to “spawn” sub-tasks to handle them in chunks, so I will have N io-bound tasks, each one creating M cpu-bound tasks. Here, cpu-bound tasks return result chunks, which are collected in io-bound ones and if they reach/exceed a certain count, it is written as a unique parquet part file (this is to keep the parquet part sized in a certain range and to free memory).
  • Based on the task and depending on the files I have, I create a LocalCluster with different values, and I can use more processes than logical cores.

The problem is: On a single cluster io-bound and cpu-bound tasks can compete. I need to control the flow. Here is an example:

  • I transcode & import Common Voice v3 datasets, total 29 languages(=.tar.gz files). In this case I want to use 3 workers for upper-level io-tasks (per file), and use all logical cores for cpu-bound tasks which read a chunk of 800 records and process them (I do not know the amount of clips in each dataset, I can only estimate it from the file size with some statistics).
  • So for each of this io-bound tasks I want to dedicate equal amount of logical-cores at the start. In my laptop I have 6c/12t CPU, so I have 3 io-bound and 12 cpu-bound tasks, total 15 workers. For each io-bound task, I first dedicate 12/3 = 4 cpu-bound tasks.
  • This works well at the start. But English dataset has 900k records, and others are much smaller. Although I sort the tasks by file-size descending to be able to start the larger datasets first (to lower the wall-time), all other tasks finish and English dataset is left with 4 cores - thus the question.
  • Therefore I try to find a way to extend the cores allocated to that worker. At that point if more than 1 workers are idle, I get the idle count and add that much new chunks (add new futures) so that all cores get busy.
  • Actually I coded it with worker_info["metrics"]["task_counts"]["executing"] and it is working. But it returns erratic counts between 0-2, so it is not so reliable. For 0 cases, I return 1 because I already know that one is finished.

I’m currently working with LocalCluster, but I want to scale it to a LAN for upper level tasks, so each node handles a set of datasets, I will also have same problem in each with this implementation… I tried many methods like delayed, bag, adaptive, priorities, resources, etc, but none were optimal, so I decided to handle it myself by feeding the futures in a controlled manner - probably the wrong direction.

  • Outer-level: I create 3 futures for io-bound/files and start them, when one finishes I add another.
  • Inner level: I create 4 futures (per io-bound) for cpu-bound/chunks and start them, when one finishes I add another. HERE comes the problem with very long task at the end.

One thing I didn’t try (came to my mind while writing these): Create a LocalCluster with (say) 3 workers and rescale it with 12 more in my case. But related questions:

  • When I add all 29 futures to the 3 workers at the very start (each worker would have 9-10 tasks) will they compete or only the first ones get executed?
  • How can I create 3 workers only for io-bound and 12 workers only for cpu-bound? This is also required for memory management. As outer-tasks collect data and then write it out in batches, they need more memory (and cluster settings are valid for all workers).
    • Do I need to create two different clusters (as available resources are defined in cluster)?
    • Or must I use worker objects directly?

Or what would be your suggestion for the scenario above? I’ve been dealing with this for a month now and I’m out of ideas.