Any way to group / name workers and tasks?

Here is the workflow:

  • Create a LocalCluster with N+M workers
  • Main thread creates N outer workers (futures) with keys like “ds=en”, “ds=de”,… (language datasets). This is OK, they are not very much, and I can see them in Dashboard fine.
  • Each of these workers create M inner workers (futures), M differs, somewhere 1-2000 tasks. I cannot set keys to them, because it adds thousands of bars, full of color. The process is the same for all inner tasks (“audio_processing” in chunks).

Problem: I can follow the outer tasks, but not the inner ones. Also the naming does not help.
Desire: I’d like to see inner tasks (“audio_processing”) generated by “ds=en” separately from “ds=de” (and thus given another color).
Maybe something like “ds=en::chunk=123” which might be parsed to group them?
Q1: Is there any method for this?

Q-2: Is there a way to name the workers other than 1,2,3,…?

Q-3: Is there a way to get the tasks’ keys assigned to a specific worker? I.e. like (just invented this) client.scheduler_info["workers"].assigned_tasks: list[object]?

Q1: yes - ish

You can control task names by customizing the task keys using the key parameter when submitting a task with client.submit(). If you want to group tasks in a meaningful way that reflects your workflow, you can use hierarchical task names like "ds=en::chunk=123". dask doesn’t automatically group tasks by such keys, but this naming pattern will help in visually distinguishing the tasks on the dask dashboard.

client.submit(audio_processing, chunk, key=f"ds={lang}::chunk={chunk_id}")

Unfortunately, as far as i know dask doesn’t have built-in task grouping for the dashboard, but with this approach, you can make the task keys hierarchical and easier to track.

If you’re seeing too many task bars in the dashboard, consider using a less granular naming scheme or grouping the tasks at a higher level (e.g., “ds=en::audio_processing”).

Q2 - yes

you can name the worker using worker API though this will require some work.
Worker API

from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=5, worker_kwargs={'name': 'worker_name_prefix'})
client = Client(cluster)

Each worker will be named with the specified prefix. You could further extend this to give workers more specific names.

If you want more control over worker names (for example, naming them based on the dataset), you’d need to set up your workers manually using the Worker API:

from dask.distributed import Worker

worker = Worker('scheduler_address', name='ds_en_worker')

Q3: also yes - BUT and this is a big BUT this will require from your side to either create the keys and manage them and pass to the worker or if you already have have them, - you still need to manage them and pass to the workers.
again using worker API above. you would need the data parameter

You can define worker resources and then submit tasks using the resources argument, ensuring they only run on workers that match those resource tags.

To track which tasks are running on specific workers, you can still query the scheduler for assigned tasks:

info = client.scheduler_info()
for worker, data in info["workers"].items():
    print(worker, data["tasks"])  # List of tasks on this worker

some thing like this i’d say

1 Like

I really appreciate you taking the time for such a good and detailed explanation, thank you.

My question is based on “deadlock” issue I encountered, which I presented in the other thread. When using LocalCluster (which works very nicely), all workers are the same (I know SpecCluster but I could not make it work with 100% CPU usage).

worker_kwargs={‘name’: ‘worker_name_prefix’}

This unfortunately gives the same prefix to all workers, so I cannot distinguish between outer and inner workers.

print(worker, data[“tasks”]) # List of tasks on this worker

I’ll go after this, which might solve my problem.

The “kwarcs” style in the documentation make it difficult to follow.

Thank you!

I think I found a solution for my problem from this SO post (last answer about annotate).

All I wanted was to assign different types of tasks to different workers.
So, I’ll use all_workers[:M] to feed outer tasks, and all_workers[M:] for inner tasks.

It is a rather old post, but I hope it is still valid.

Edit: Oh, of course it is already in the documentation (under User Control) !
https://distributed.dask.org/en/latest/locality.html?highlight=client.submit

thats great to hear that you managed to find some additional assitance.

what i’ve meant is that you need to create the workers manually and for each one give a different name - LocalCluster create them for you.

you need to work hard to achieve what you want.

using
https://distributed.dask.org/en/latest/locality.html?highlight=client.submit
can also work fine- this time you pass the worker names or ips to control which will do what.

I’ve never heard of that neither, but if the solution of naming works, I’d be glad to have this return!

My real question is: why would you want to do that? The only reason I see would be to have local data. From what I understand of your workflow, you shouldn’t have to do this.