Using distributed
workers (for example, using distributed.LocalCluster
) I can do the following:
import dask
import distributed
import time
with dask.config.set({
"distributed.worker.resources.CPU": 3,
"distributed.worker.resources.MEM": 4e9,
}):
cluster = distributed.LocalCluster()
with distributed.Client(cluster) as client:
task1 = client.submit(time.sleep, 1, resources={'CPU': 1, 'MEM': 1e9}),
task2 = client.submit(time.sleep, 2, resources={'CPU': 3, 'MEM': 1e9})
task1.result()
task2.result()
In particular, here, Dask will assign tasks to workers such that for any given task, the task’s declared CPU/MEM resource “requirements” (defined by the client.submit(..., resources=...)
argument) can be satisfied by the remaining CPU/MEM on the worker which Dask selects to execute the task. This behavior is described in the docs. One implication is that task2
from above will never execute alongside task1
on the same worker because task2
requires enough CPU to reserve an entire worker for itself.
I want to achieve the same “resource matching” usecase, but with the YarnCluster
instead: I want to be able to submit a task T
for execution, while ensuring that the YarnCluster will reserve a specific number of cores/memory on a worker specifically for consumption by task T
.
Is this supported? How can it be achieved? If it is not supported, any recommendations?
Note that: task T
may use more/less than the reserved cores/memory. So it is not a hard/enforced resource resource constraint, but moreso, a tool to optimize scheduling. For example, if I know task T
is cpu or memory-heavy, this “resource matching” functionality would allow me to ensure that task T
always executes on a worker with plenty of cpus available – increasing the performance and stability of my task T
.