Submit task with required resources to YarnCluster

Using distributed workers (for example, using distributed.LocalCluster) I can do the following:

import dask
import distributed
import time

with dask.config.set({
    "distributed.worker.resources.CPU": 3,
    "distributed.worker.resources.MEM": 4e9,
}):
    cluster = distributed.LocalCluster()

with distributed.Client(cluster) as client:
    task1 = client.submit(time.sleep, 1, resources={'CPU': 1, 'MEM': 1e9}),
    task2 = client.submit(time.sleep, 2, resources={'CPU': 3, 'MEM': 1e9})
    task1.result()
    task2.result()

In particular, here, Dask will assign tasks to workers such that for any given task, the task’s declared CPU/MEM resource “requirements” (defined by the client.submit(..., resources=...) argument) can be satisfied by the remaining CPU/MEM on the worker which Dask selects to execute the task. This behavior is described in the docs. One implication is that task2 from above will never execute alongside task1 on the same worker because task2 requires enough CPU to reserve an entire worker for itself.

I want to achieve the same “resource matching” usecase, but with the YarnCluster instead: I want to be able to submit a task T for execution, while ensuring that the YarnCluster will reserve a specific number of cores/memory on a worker specifically for consumption by task T.

Is this supported? How can it be achieved? If it is not supported, any recommendations?

Note that: task T may use more/less than the reserved cores/memory. So it is not a hard/enforced resource resource constraint, but moreso, a tool to optimize scheduling. For example, if I know task T is cpu or memory-heavy, this “resource matching” functionality would allow me to ensure that task T always executes on a worker with plenty of cpus available – increasing the performance and stability of my task T.

@jcj Hi! I believe there was some discussion about this issue on the Dask Slack, were those solutions helpful?