I am looking to integrate dask.distributed for jobs evaluating machine learning pipelines in GAMA. I coded a MWE abstraction below and had a few questions:
- Jobs are very heterogeneous in memory usage, is it possible to define memory per cluster instead of per worker? When memory limits would be exceeded, it would kill off the job using the most memory.
- If the workers frequently run into memory issues, I would like to scale down the number of workers so that each may use more memory. Is there a built-in for something like this, or should I keep track of workers being killed (and how?) and call
cluster.scale
manually? - If a job fails because the nanny kills the worker process because of exceeding memory, I do not want the job to be retried again. It seems like
client.cluster.scheduler.allowed_failures = 0
should achieve this, but it might also unexpectedly affect other fail cases? Or is this already covered byretries=0
default ofclient.submit
? - Jobs have a time deadline (in principle, the same for each job), if the job are not finished within a specific period, it qualifies as a failure to the system. Is there a way to have Dask handle this as such, or do I have to build this into the function I submit?
- Sometimes I get errors after I shut down the client (see output below code snippet). See the traceback below the code. 5a. why are there errors for jobs on a cluster that is shutdown 5b. what is the intended way to avoid these errors?
- Is the general usage of dask functions below appropriate?
from random import Random
import numpy as np
from sklearn.datasets import load_iris, fetch_covtype
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_validate
import time
def do_experiment(est: Pipeline, X, y, i) -> float:
results = cross_validate(est, X, y)
if i > 10:
raise ValueError("This is something that occurs after shutdown")
return np.mean(results["test_score"])
X, y = np.random.rand(10_000, 10), np.random.randint(0, 4, size=(10_000,))
# Dask setup
from dask.distributed import Client, LocalCluster, as_completed
import dask
cluster = LocalCluster(processes=False, n_workers=5, memory_limit='auto')# as cluster:
client = Client(cluster)# as client:
client.cluster.scheduler.allowed_failures = 0
# Dask Work
# Distribute the data to the workers once
Xf = client.scatter(X, broadcast=True)
yf = client.scatter(y, broadcast=True)
# Start with an initial set of experiments:
experiments = [RandomForestClassifier() for _ in range(5)]
futures = as_completed(
[client.submit(do_experiment, experiment, Xf, yf, 0) for experiment in experiments]
)
for i, future in enumerate(futures):
# Analyze result:
print(future.result())
# Design and submit a new experiment based on results:
futures.add(
client.submit(
do_experiment, RandomForestClassifier(), Xf, yf, i + 5
)
)
if i > 10:
break # Normally would be interrupted after a certain time by a third party module
client.close()
client.shutdown()
output:
0.25539999999999996
0.2606
0.2548
0.2543
0.25529999999999997
0.25399999999999995
0.2569
0.2542
0.2485
0.25520000000000004
0.25070000000000003
2022-08-11 14:42:16,621 - distributed.worker - WARNING - Compute Failed
Key: do_experiment-2666225130121b0bd3ec6c02e7be91e8
Function: do_experiment
args: (RandomForestClassifier(), array([[0.16894852, 0.44864922, 0.74075361, ..., 0.03255224, 0.59704995,
0.97329783],
[0.14537666, 0.83880844, 0.86717988, ..., 0.42677332, 0.20920538,
0.04401537],
[0.48149274, 0.25890699, 0.45397284, ..., 0.11396569, 0.99095189,
0.99550115],
...,
[0.44279767, 0.97881847, 0.10597641, ..., 0.04717878, 0.34330635,
0.63875415],
[0.20210841, 0.487176 , 0.38618452, ..., 0.62213457, 0.31806978,
0.52941727],
[0.99302234, 0.86366038, 0.54327158, ..., 0.07484623, 0.93126837,
0.26536044]]), array([2, 0, 2, ..., 1, 1, 0]), 11)
kwargs: {}
Exception: "ValueError('This is something that occurs after shutdown')"
Traceback (most recent call last):
File "/Users/pietergijsbers/repositories/gama/dask-test.py", line 38, in <module>
print(future.result())
File "/Users/pietergijsbers/repositories/gama/venv310/lib/python3.10/site-packages/distributed/client.py", line 277, in result
raise exc.with_traceback(tb)
File "/Users/pietergijsbers/repositories/gama/dask-test.py", line 12, in do_experiment
raise ValueError("This is something that occurs after shutdown")
ValueError: This is something that occurs after shutdown