Using 'soft resources constraints with client.submit

Hi,

I am using dask distributed with Dask CUDAworkers and regular Dask Workers.

I have a bunch of tasks submitted to scheduler via client.submit, which can both run on CPU or GPU (actually I detect whether I am on a GPU worker or not based on worker name, since for some reason worker.state.total_resources gives an empty dict within the tasks.). Obviously I prefer using GPU when available as it is way faster.

Basically when I am on a GPU worker i am using CuPy instead of Numpy for calculations for that task, this is the only difference.

I have only one GPU but a lot of CPU so what I want is to ensure GPU is always working full-time, and then use the available CPU to carry out the extra job.

Currently I am using submits keyword arg ‘resources’ which is hard constraint.

But this is not optimal as the CPU workers are not used, only the GPU. If I don’t set any resources restrictions, the GPU worker is not used fulltime: there is some load balancing that seems to prevent it.

How would you recommend to solve this problem ?

I saw that dask resources annotations could be used as they are kind of ‘soft constraints’. But it seems it does the same as keyword ‘resources’. I also saw on the forum that resources annotations does not work when using submit.

Is there some way to tell scheduler that workers with a specific resources have to be preferred (but not mandatory) for a given task submitted using client.submit ?

Thanks

Vianney

Hi @vianneyl, welcome to Dask Discourse forum!

Well it looks like something not easily feasible in Dask.

Annotations and resources are the same mechanism backend, so hard constraints.

I can make several suggestions:

  • First, you say:

It might be due to queing. If this is not a problem for you, you might want to try desabling it by setting distributed.scheduler.worker-saturation to inf.

Hi Guillaume,

thanks for replying.

looks like we are neighbours, my wife and you are working in the same company apparently, and myself working in ‘a big space company’ the other side of the ‘rocade’ :smiley:

I’m unpacking your comment about queuing, because i am facing some unexpected results

side question: Since i am using dask scheduler CLI to launch the scheduler, i don’t see in its CLI options how to pass worker-saturation param to the CLI (https://distributed.dask.org/en/stable/scheduling-policies.html#queuing mentions that it needs to be set on scheduler before it starts - does it means at scheduler instantiation, or before any call to compute/submit ?). from tests it seems that using dask config set with context in python does not change the value when scheduler is ran from CLI.

my cluster is as follows: worker saturation not set (default value). 40 regular CPU workers with resources=CPU:1 (and they are single thread workers i.e. nthreads=1) and 1 CUDAWorker (from dask-cuda) with resources=GPU:1 (single thread worker as well). Both workers and scheduler started from CLI.

i have N tasks ‘compute_metrics’. each of them needs one ‘propagate_TLE’ task (resources =CPU:1) and one ‘propagate_keplerians’ task (resources=GPU:1).

Actually all ‘compute_metrics’ tasks use the same ‘propagate_keplerians’ task results, but a different ‘propagate TLE’ task

all tasks are submitted using client.submit with resources keyword

putting all ‘submit’ calls inside a “with dask.config.set({“distributed.scheduler.worker-saturation”: np.inf}):” does not change this behavior (but it is very likely - see above - this param is not accounted for by my CLI-ran scheduler)

on the processing plot: i was expecting no one exceeding ceil(1.1 * 1) = 2, but I see that tasks being processed largely exceed 2 on every workers (10/worker in average).

when i remove my CUDAWorker and disable all ‘resources’ keywords (so having only 40 single-thread CPU (regular dask) workers with no resources required at submit() level), I can see some effect, but it is a bit strange: the scheduler seems to split all tasks in two batches. the first batch is sent to all workers, and memory rise up rapidly. Once the first batch of ‘propagate_TLE’ tasks (dark green) is completed, the ‘compute_metrics’ are started (light green). At this time, each worker has in memory about 10 tasks and memory use rises rapidly (altough the worker memory limit is not reached, but it seems to me that queing behavior is not dependent on worker memory?)

Then as ‘compute_metrics’ tasks of the first batch are completed, scheduler submits new ‘propagate_TLE’, but this time it fulfills the “max 2 tasks per worker’ default rule (worker saturation=1.1 and nthreads=1). we can see that each worker executes one or two propagate_TLE tasks, then a ‘compute metrics’ task, then it starts over with one or two propagate_TLE tasks

i cant manage to upload pictures, probably “thanks” to my company security rules..

Nice coincidence, I’ll give my professional email by private message!

Yes, you need to define configuration before starting Scheduler, there are several ways to do that, you might use for example an easy one by using CLI.

That is strange, how are you configuring your workers, how do you launch them using CLI? It seems below you indicate only one thread per Worker?

It might depend on your workflow, a code example might be helpful here.

This is definitely what I would expect right from the start.

For the configuration this is the same as the one I mentioned in this issue, ‘1st configuration’

https://github.com/dask/distributed/issues/9128#issuecomment-3466994271

For the code example, basically I do the following:

For i in range(N):
Client.submit(propagate_kep, args_kep[i], resources={'GPU':1})

Then I store each future in a dict_future_keps

Then I do another loop of length M using propagate_tle as fun, args_tle as arguments, and CPU:1 as resources. And I also store these futures in a dict_futures_tle

Finally I run the following loop:

For i,j in product(range(N), range(M)):
Client.submit(metrics, dict_future_keps[i], dict_futures_tle[j], resources ={tested various stuff: nothing, or GPU:1})

And finally compute then wait (metrics does not output anything, only writing files)

Vianney

So I gave a quick look by building this example:

from distributed import LocalCluster, Client
import time
from itertools import product

cluster = LocalCluster(n_workers=40, threads_per_worker=1, memory_limit='1giB')
client = cluster.get_client()

def propagate_kep(n):
    time.sleep(4)
    return n

def propagate_tle(m):
    time.sleep(3)
    return m

def metrics(n,m):
    time.sleep(0.3)
    return n*m

N = 100
M = 150

futures_kep = []
for i in range(N):
    futures_kep.append(client.submit(propagate_kep, i))

futures_tle = []
for i in range(M):
    futures_tle.append(client.submit(propagate_tle, i))

futures_metrics = []
for i,j in product(range(N), range(M)):
    futures_metrics.append(client.submit(metrics,futures_kep[i],futures_tle[j]))

And I indeed saw more than 2 tasks placed on each workers (3 or 4), and then when metrics start, there is an increasing number of tasks placed on each workers. I also try with client.map and got the same behavior.
Trying only submitting things not related, I noticed that each new submit for loop was increasing initial placement by 2 on each worker. which is getting regulated after the first executions. This is probably due to the way Future API work, Dask not nowing at first how all is related or not, but it might be a bug in queuing mechanism.

I also tried implementing it though Delayed API, but again, task queuing does not seem to happen, I’m really not sure why.

from dask import delayed

delayed_kep = []
for i in range(N):
    delayed_kep.append(delayed(propagate_kep)(i))

delayed_tle = []
for i in range(M):
    delayed_tle.append(delayed(propagate_tle)(i))

delayed_metrics = []
for i,j in product(range(N), range(M)):
    delayed_metrics.append(delayed(metrics)(delayed_kep[i],delayed_tle[j]))

client.compute(delayed_metrics)