Memory Management of Dask Cluster and a few new user questions

I am looking to integrate dask.distributed for jobs evaluating machine learning pipelines in GAMA. I coded a MWE abstraction below and had a few questions:

  1. Jobs are very heterogeneous in memory usage, is it possible to define memory per cluster instead of per worker? When memory limits would be exceeded, it would kill off the job using the most memory.
  2. If the workers frequently run into memory issues, I would like to scale down the number of workers so that each may use more memory. Is there a built-in for something like this, or should I keep track of workers being killed (and how?) and call cluster.scale manually?
  3. If a job fails because the nanny kills the worker process because of exceeding memory, I do not want the job to be retried again. It seems like client.cluster.scheduler.allowed_failures = 0 should achieve this, but it might also unexpectedly affect other fail cases? Or is this already covered by retries=0 default of client.submit?
  4. Jobs have a time deadline (in principle, the same for each job), if the job are not finished within a specific period, it qualifies as a failure to the system. Is there a way to have Dask handle this as such, or do I have to build this into the function I submit?
  5. Sometimes I get errors after I shut down the client (see output below code snippet). See the traceback below the code. 5a. why are there errors for jobs on a cluster that is shutdown 5b. what is the intended way to avoid these errors?
  6. Is the general usage of dask functions below appropriate?
from random import Random
import numpy as np
from sklearn.datasets import load_iris, fetch_covtype
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_validate
import time

def do_experiment(est: Pipeline, X, y, i) -> float:
  results = cross_validate(est, X, y)
  if i > 10:
    raise ValueError("This is something that occurs after shutdown")
  return np.mean(results["test_score"])

X, y = np.random.rand(10_000, 10), np.random.randint(0, 4, size=(10_000,))

# Dask setup
from dask.distributed import Client, LocalCluster, as_completed
import dask

cluster = LocalCluster(processes=False, n_workers=5, memory_limit='auto')# as cluster:
client =  Client(cluster)# as client:
client.cluster.scheduler.allowed_failures = 0 

# Dask Work
# Distribute the data to the workers once
Xf = client.scatter(X, broadcast=True)
yf = client.scatter(y, broadcast=True)

# Start with an initial set of experiments:
experiments = [RandomForestClassifier() for _ in range(5)]
futures = as_completed(
  [client.submit(do_experiment, experiment, Xf, yf, 0) for experiment in experiments]
)

for i, future in enumerate(futures):
  # Analyze result:
  print(future.result())
  # Design and submit a new experiment based on results:
  futures.add(
    client.submit(
      do_experiment, RandomForestClassifier(), Xf, yf, i + 5
    )
  )
  
  if i > 10:
    break  # Normally would be interrupted after a certain time by a third party module

client.close()

client.shutdown()

output:

0.25539999999999996
0.2606
0.2548
0.2543
0.25529999999999997
0.25399999999999995
0.2569
0.2542
0.2485
0.25520000000000004
0.25070000000000003
2022-08-11 14:42:16,621 - distributed.worker - WARNING - Compute Failed
Key:       do_experiment-2666225130121b0bd3ec6c02e7be91e8
Function:  do_experiment
args:      (RandomForestClassifier(), array([[0.16894852, 0.44864922, 0.74075361, ..., 0.03255224, 0.59704995,
        0.97329783],
       [0.14537666, 0.83880844, 0.86717988, ..., 0.42677332, 0.20920538,
        0.04401537],
       [0.48149274, 0.25890699, 0.45397284, ..., 0.11396569, 0.99095189,
        0.99550115],
       ...,
       [0.44279767, 0.97881847, 0.10597641, ..., 0.04717878, 0.34330635,
        0.63875415],
       [0.20210841, 0.487176  , 0.38618452, ..., 0.62213457, 0.31806978,
        0.52941727],
       [0.99302234, 0.86366038, 0.54327158, ..., 0.07484623, 0.93126837,
        0.26536044]]), array([2, 0, 2, ..., 1, 1, 0]), 11)
kwargs:    {}
Exception: "ValueError('This is something that occurs after shutdown')"

Traceback (most recent call last):
  File "/Users/pietergijsbers/repositories/gama/dask-test.py", line 38, in <module>
    print(future.result())
  File "/Users/pietergijsbers/repositories/gama/venv310/lib/python3.10/site-packages/distributed/client.py", line 277, in result
    raise exc.with_traceback(tb)
  File "/Users/pietergijsbers/repositories/gama/dask-test.py", line 12, in do_experiment
    raise ValueError("This is something that occurs after shutdown")
ValueError: This is something that occurs after shutdown
1 Like

Hi @PGijsbers,

I’ll try to give the answers I can, but maybe some Dask Distributed maintainers will correct me.

  1. I’ve never seen a way of defining a memory per cluster. Memory is always specified by worker process as far as I know. One thing you could try if your code is compatible (e.g. not GIL bounded) is to use few worker process with several threads in each worker. Using LocalCluster, you can even go fully threaded.
  2. Scaling down worker dynamically will not change the amount of memory each worker has available. You need to configure each worker memory at the beginning when building your Cluster object.
  3. I think that if you have retries=0, if one task fails, the entire computation will fail, but maybe you can achieve what you want with other options.
  4. I don’t know a way of specifying a task time limitation in Dask, but maybe there is some.
  5. I’m not sure about your code, are you sure you’re correctly waiting and gathering all results/futures before closing the client object?
  6. I’m not sure about the enumerate() in the for loop. As stated above, I’m not sure your code is waiting for all tasks to end. Also, I’m not sure of the result of calling LocalCluster with n_workers kwarg along with processes=False.
1 Like

Thanks for the help!

  1. So, to the best of your knowledge, there is no way to change the memory of a worker at runtime?

  2. I don’t know as I am a Dask novice. I did figure out to cancel the remaining jobs (futures.clear()), but the warnings still arise. I don’t want to wait and gather on those last jobs - after a certain time I no longer care for any jobs still being processed/in the job queue. I just want them to silently be aborted/cleaned up.

  3. I assumed n_workers is equivalent to threads_per_worker when processes=False, but perhaps I should explicitly use threads_per_worker with n_workers=1?

  1. Not that I’m aware of. I’ve never trie it though. But you can try digging in the worker class and see if you find something.
  2. There is certainly a better way, but you can always ignore the warnings if the cancellation is ok for you.
  3. I think you should verify this. With only one process and threads, you should have only one memory limit, shared between all the threads.

I’m ignoring the warnings now, but the only way to do that seems to be silence_logs=logging.ERROR, and I would much prefer to silence only this specific type of warning and not all warnings.

Just tested this. I confirm this creates 5 worker processes, each with just a fraction of total memory.

Using:

cluster = LocalCluster(processes=False, threads_per_worker=5, memory_limit='auto')# as cluster:

fixes that, and allows memory to be shared across threads.

Running your code, I notice also that it actually never reaches the break and so client.close() and shutdown() section. The snippet here is cancelled because one of the experiment throws the ValueError before the shutdown!

With the code below, I got no error (notice the condition to stop).

# Start with an initial set of experiments:
experiments = [RandomForestClassifier() for _ in range(5)]
futures = as_completed(
  [client.submit(do_experiment, experiment, Xf, yf, 0) for experiment in experiments]
)

for i, future in enumerate(futures):
  # Analyze result:
  print(f'Indice: {i}, result: {future.result()}')
  # Design and submit a new experiment based on results:
  futures.add(
    client.submit(
      do_experiment, RandomForestClassifier(), Xf, yf, i + 5
    )
  )
  print(f'Submitted indice: {i + 5}')
  
  if i >= 9:
    print(f'Stopping experiments: {i + 5}')
    break  # Normally would be interrupted after a certain time by a third party module
    
client.close()
client.shutdown()

Whelp, good catch! On the other hand, I think your code simply does not throw an error because it never processes a future which would throw an error.
Consider replacing the entire inner loop with just:

  # Design and submit a new experiment based on results:
  futures.add(
    client.submit(
      do_experiment, RandomForestClassifier(), Xf, yf, i + 5
    )
  )
  print(f'Submitted indice: {i + 5}')
  
  if i >= 15:
    print(f'Stopping experiments: {i + 5}')
    break  # Normally would be interrupted after a certain time by a third party module

In this case, the program shuts down as expected. Results are never even truly processed! But the warnings of the errors are still produced, despite future.result() never being called on a future which had an exception.

The full updated snippet:

from random import Random
import numpy as np
from sklearn.datasets import load_iris, fetch_covtype
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_validate
import time

def do_experiment(est: Pipeline, X, y, i) -> float:
  results = cross_validate(est, X, y)
  if i > 10:
    raise ValueError("This is something that occurs after shutdown")
  return np.mean(results["test_score"])

X, y = np.random.rand(10_000, 10), np.random.randint(0, 4, size=(10_000,))

# Dask setup
from dask.distributed import Client, LocalCluster, as_completed

cluster = LocalCluster(processes=False, threads_per_worker=5, memory_limit='auto')# as cluster:cluster:
client =  Client(cluster)# as client:
client.cluster.scheduler.allowed_failures = 0 

# Dask Work
# Distribute the data to the workers once
Xf = client.scatter(X, broadcast=True)
yf = client.scatter(y, broadcast=True)


experiments = [RandomForestClassifier() for _ in range(5)]
futures = as_completed(
  [client.submit(do_experiment, experiment, Xf, yf, 0) for experiment in experiments]
)

for i, future in enumerate(futures):
  # Analyze result:
  # if future.status != "error":
  #   print(f"Fetching result for {i}")
  #   print(f'Indice: {i}, result: {future.result()}')
  # else:
  #   print(f'Indice: {i} did not finish')

  # Design and submit a new experiment based on results:
  futures.add(
    client.submit(
      do_experiment, RandomForestClassifier(), Xf, yf, i + 5
    )
  )
  print(f'Submitted indice: {i + 5}')
  
  if i >= 15:
    print(f'Stopping experiments: {i + 5}')
    break  # Normally would be interrupted after a certain time by a third party module
    
client.close()
client.shutdown()

You’re right: my code submits future which should throw an error, but the client and cluster are stopped, and thus those futures cancelled, before the ValueError is actually raised.

I’m not sure of what you expect here. Once you submit a future, and wait for its end with the as_completed loop, the do_experiment function will be executed on a worker. It it raises an Exception, then this exception is logged and stored in the future state. The Exception will be raised on Client side only if you try to get the result, see Futures — Dask documentation

If a future fails, then Dask will raise the remote exceptions and tracebacks if you try to get the result

You don’t want to see those logs because they are generated by your code?

I’d expect to be able to silence warnings printed to console coming from my code (because those I will process when handling the futures anyway, or simply ignore). But I want to do so without silencing all warnings coming from Dask (i.e., silence_logs=logging.ERROR). For example, if there are warnings about connection issues, resources, or anything other than the execution of my code, I would like to be aware of them.

Exactly, because I already know these jobs might fail (in particular towards the end of the lifetime). But from experience my users will complain/think something is wrong when the warnings are printed to console.

In this case, I don’t think there is an easy way to silence only this log.

See distributed/worker.py at main · dask/distributed · GitHub.

Two things I would suggest:

  • Play with the logging module and trying to filter only those logs (don’t know how complicated that can be).
  • Handle your failures differently, not by throwing an Exception, but for example returning a specific value/type in do_experiment.

Thank you for the pointers and all the help! While not all questions have a definitive answer now, I’ll mark your initial post as answer :slight_smile:

I ignore the warnings now by configuring the log directly, after importing from dask.distributed:

def warning_filter(record) -> int:
  if record.filename == "worker.py" and "Compute Failed" in record.msg and isinstance(record.args, tuple):
    # print("Ignoring:", record.msg % record.args)
    return 'This is something that occurs after shutdown' not in record.args[-1]
  return 1

logging.getLogger("distributed.worker").addFilter(warning_filter
)
1 Like

I’ll just note here: question 1 in the OP doesn’t appear to have been resolved, and would be very useful to me as well. Managing memory at the worker level means I have to be quite conservative, because I never know whether many workers will be near their memory limits/thresholds at the same time. Since it is a LocalCluster, the operative question is how much memory the total cluster is using. I would be happy to avoid spilling by going over the memory limit on one worker if other workers are substantially below it.

Hi @zmbc, welcome to Dask Discourse,

Unfortunately, there is no way of managing Worker memory dynamically. Either you are able to tell if some tasks you have will need a lot of memory, and start a dedicated Worker for those using resources, either you’ll have to be conservative. This is true using several Worker processes at least. You can try to set a memory limit for each Worker that would result in a higher total memory than you dispose of, but Dask will not be able to schedule tasks based on available memory.

My issue here isn’t that Dask doesn’t schedule tasks based on available memory. My issue is that the memory management features – spilling and pausing – can only be triggered based on individual worker memory and not overall cluster memory. What I would want is to have no worker-level limit on memory, but have the scheduler tell workers to start spilling when they collectively reach some threshold, and pause all workers at some other threshold. Since for a LocalCluster memory is effectively shared, managing memory separately at the process level isn’t very meaningful.

LocalCluster is just a specific implementation of distributed cluster, the Scheduler here is notreally aware of the fact that all worker processes are on the same node. So yes, the memory management features are handled at the process level.