Correct usage of "cluster.adapt"

I want to use the adaptive scaling for running jobs on HPC clusters, but it keeps crashing after a while. Using the exact same code by static scaling works perfectly. I have reduced my project to a minimal failing example:

import time
from dask_jobqueue import SLURMCluster
from distributed import Client, progress

cluster = SLURMCluster()
cluster.adapt(minimum_jobs=1, maximum_jobs=4)

def dummy_work(my_arg):
    print(my_arg)
    time.sleep(600)
    return my_arg*2

args = range(10)
with Client(cluster) as client:
    fut = client.map(dummy_work, args)
    progress(fut, interval=10.0)
    res = client.gather(fut)
    print(res)

args = range(100,110)
with Client(cluster) as client:
    fut = client.map(dummy_work, args)
    progress(fut, interval=10.0)
    res = client.gather(fut)
    print(res)

args = range(200,230)
with Client(cluster) as client:
    fut = client.map(dummy_work, args)
    progress(fut, interval=10.0)
    res = client.gather(fut)
    print(res)

print("SUCCESS")

When running this code, it initially works fine, but upon completion of the last client.map call, I get this error:

Traceback (most recent call last):
  File "dummy_dask.py", line 31, in <module>
    res = client.gather(fut)
  File "/home/raphael/env/lib/python3.7/site-packages/distributed/client.py", line 2152, in gather
    asynchronous=asynchronous,
  File "/home/raphael/env/lib/python3.7/site-packages/distributed/utils.py", line 310, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/raphael/env/lib/python3.7/site-packages/distributed/utils.py", line 376, in sync
    raise exc.with_traceback(tb)
  File "/home/raphael/env/lib/python3.7/site-packages/distributed/utils.py", line 349, in f
    result = yield future
  File "/home/raphael/env/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/home/raphael/env/lib/python3.7/site-packages/distributed/client.py", line 2009, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('dummy_work-bf3f0e9bfdc71bfb52bb831ade5f40fd', <WorkerState 'tcp://10.0.11.21:46643', name: SLURMCluster-2-5, status: closed, memory: 0, processing: 2>)
distributed.utils - ERROR -
Traceback (most recent call last):
  File "/home/raphael/env/lib/python3.7/site-packages/distributed/utils.py", line 693, in log_errors
    yield
  File "/home/raphael/env/lib/python3.7/site-packages/distributed/scheduler.py", line 7116, in feed
    await asyncio.sleep(interval)
  File "/home/raphael/env/lib/python3.7/asyncio/tasks.py", line 595, in sleep
    return await future
concurrent.futures._base.CancelledError

I’ve messed around a lot to try to get it to work, but it always gives this error. Does anyone have experience with this?

dask_jobqueue==0.7.2
dask==2022.2.0
distributed==2022.2.0

Hi @RaphaelRobidas and welcome to discourse!

Thanks for providing the snippet, I was able to reproduce a very similar issue locally. Adjusting cluster.adapt by setting the interval and target_duration seems to fix the issue (example snippet below). The defaults seem to assume your task will be completed far sooner than it actually is, and then deleting workers accordingly.

import time
from distributed import Client, LocalCluster, progress

cluster = LocalCluster()
cluster.adapt(
    minimum=1, maximum=4, interval='10s', target_duration='60s'
)
client = Client(cluster)

def dummy_work(my_arg):
    print(my_arg)
    time.sleep(20)
    return my_arg*2

args = range(10)
with Client(cluster) as client:
    fut = client.map(dummy_work, args)
    progress(fut, interval=10.0)
    res = client.gather(fut)
    print(res)

args = range(100,110)
with Client(cluster) as client:
    fut = client.map(dummy_work, args)
    progress(fut, interval=10.0)
    res = client.gather(fut)
    print(res)

args = range(200,230)
with Client(cluster) as client:
    fut = client.map(dummy_work, args)
    progress(fut, interval=10.0)
    res = client.gather(fut)
    print(res)

print("SUCCESS")

Thanks for insight. Increasing the target_duration does influence the time the task runs before the workers get killed. However, this isn’t exactly a solution for me, as the task duration can vary greatly in an unpredictable fashion. Using a long target_duration will delay scale-up and still lead to the workers getting killed if the task runs for longer than expected.

I found that using a very large wait_count prevents the workers from being killed (at least locally). However, I guess this must also prevent scale-down?

That’s a good point about increasing wait_count, I think your assumption is correct that this could prevent the cluster from scaling down workers. It sounds like this might require striking the right balance between interval, target_duration, and wait_count. I don’t have extensive experience using adapt, but I’m wondering if @guillaumeeb has anything to add here?

If you come up with a good solution I’d encourage you to share it here or even add to the dask-jobqueue docs page for advanced tips and tricks.

1 Like

Hi @RaphaelRobidas and @scharlottej13,
Sorry for the delay.

First thing I would say is: adaptive is hard to get right, always wonder first if you really need it. Do you:

  • Perform interactive analysis, say in a notebook, with the same cluster object, submitting a few computations a day?
  • Submit a batch job, but with different parts, some which need a lot of workers, other which don’t?
  • Have a workload you don’t know how much time will take, and need to automatically scale up again when worker jobs reach walltime?

Other than that, I’d say you don’t need adaptive scaling on HPC clusters. And for each point, there may be alternative that are easier to manage.

If you feel you really need it, then another thing: I think the underlying mechanisms have not been developed with long tasks (I see a sleep(600) in your example code?) with a

duration (that) can vary greatly in an unpredictable fashion

But anyway, you can probably try as @scharlottej13 say to find the good balance between arguments. A good thing is that this seems unrelated to dask-jobqueue, as @scharlottej13 reproduced the problem witj LocalCluster. This will be easier to debug. I greatly encourage you to look into adaptive logic which is not that complicated (but it has change a lot since the last time I looked into it).

One important thing: adaptive should not try to kill a worker if a task is running on it, or in very specific case (lot of resources available on other workers).

Some other hints or remarks I can think of:

  • target_duration should not be less than a single task duration. Probably something like the average task duration.
  • scaling needs is computed thanks to the scheduler estimation of occupancy (distributed/scheduler.py at e8c06696146bc797c7c6d05e0f29c7d0debe8293 · dask/distributed · GitHub). If your task duration varies a lot, it will not be estimated correctly. Scheduler estimate this based on previous similar task duration.
  • You can give Dask some hints about your task duration (see `Adaptive` doesn't scale quickly for long-running jobs · Issue #3627 · dask/distributed · GitHub), This should make scaling up faster (don’t have to wait ofr the end of a task).
  • In your initial post, you don’t give information about how you SlurmCluster is configured (walltime, job resources, worker resources, etc.). With Dask-jobqueue, you should set those accordingly to your workload.
  • But again, if this can be reproduced with LocalCluster, you should focus on this to narrow the problem.
2 Likes

Hello @scharlottej13 and @guillaumeeb,

Thanks for the details. In the end, I decided to simply use the number of tasks to choose how many workers should be requested. This way, there is some sort of scaling with respect to the amount of work to be done, but it is much simpler to handle than the adaptive scaling.

The main disadvantage from my point of view is that, towards the end of the run, many nodes will have no work to perform yet remain in the cluster. This will add a bit of unnecessary HPC time billing, but shouldn’t be dramatic. I guess that some more complex custom scaling mechanics could mitigate that, but it is not worth it for my uses.

Best

I often try to have at least twice as many tasks as I have available Dask threads/workers. So that the load is evenly split among all workers. But if you really have tasks that are much longer than other, this might not be sufficient.

I agree that Adaptive scaling should work for this.

1 Like