Worker resources minimum threshold

Does there exist a way to define a minimum resource request that must be met before a task can be scheduled on a worker. For example let’s say you have a cluster with 1 high memory worker and several low memory workers. Let’s say most tasks in dag are low memory but with a few high memory tasks. Without a minimum threshold the one high memory worker could be tied up running low memory tasks, leaving all the high memory tasks without anywhere to run. If we define a resource floor though we can ensure that the high memory worker remains reserved for high memory tasks.

Did you have a look at
https://distributed.dask.org/en/stable/resources.html

As resources are abstract, you can perfectly just mark some workers as high memory ones, and other as low memory, and then annotate your tasks accordingly.

Thank you for the suggestion. I’m familiar with the paradigm of using ordinal tags in that manner, and could make it work for my use case but would be a bit unideal.

The issue is that I don’t just have high and low categories. I have an arbitrary number of discrete memory tiers, depending on the memory requirements of the tasks in my dag. The exact memory requirements of my tasks are known and currently are not bucketed. If there was a minimum threshold my execution scheme could look like this:

task_resources:
taskA: MEMORY_MB: 3343
taskB: MEMORY_MB: 2036
taskC: MEMORY_MB: 10674
taskD: MEMORY_MB: 5543
taskE: MEMORY_MB: 9000
taskF: MEMORY_MB: 14245

worker_resources:
worker1: MEMORY_MB: 6000
worker2: MEMORY_MB: 6000
worker3: MEMORY_MB: 4000-9000
worker4: MEMORY_MB: 8000-15000
worker5: MEMORY_MB: 5000-8000

If I want to make this work with ordinal tags then it would look like this:

task_resources:
taskA: MT6: 1
taskB: MT6: 1
taskC: MT815: 1
taskD: MT6: 1, MT49: 1, MT58: 1
taskE: MT49: 1, MT815: 1
taskF: MT815: 1

worker_resources:
worker1: MT6: inf
worker2: MT6: inf
worker3: MT49: inf
worker4: MT815: inf
worker4: MT58: inf

You can see how now the task resources need to specify multiple tags that they can be run on, rather than just the one continuous variable value. This gets even worse with more tiers. And adds the burden on the user to (1) know which tiers are specifically available on the cluster (this also makes it easier to have desync between your task resources and cluster configuration) and (2) derive/determine which tags to specify on the task resources

Right, I see you are perfectly familiar with this mechanism. AFAIK, resources does not offer the possibility to add a minimum requirement, and I’m not sure how this could be done without some abstract tagging.

This kind of complex resources scheduling policy are better handled by Scheduler or Orchestration tools on top of Dask like HPC schedulers or Kubernetes.

So I actually just found a way to do it with existing functionality. It’s a little bit more verbose than I would want, but works perfectly. The solution is negative resources! Seems like dask doesn’t treat negative values any differently (as in cap them at 0 or disallow them) so you can do this:

task resources:
task1: MEM: 9000, MEM_MIN: -9000
task2: MEM: 4000, MEM_MIN: -4000

worker resources:
worker1: MEM: 15000, MEM_MIN: -8000

In this case task1 will be able to run on worker 1, but task2 will not be able to.

It’s still a bit unideal because if you don’t submit the task with MEM_MIN it can run on a worker with a min threshold. Or if you create a worker without MEM_MIN a task submitted with MEM_MIN won’t be able to run on it. But that shouldn’t be too big of a problem if you control everything related to the cluster/dag submission.

1 Like