When using scatter and memory_limit, the dask client will crash if any future goes above the limit. Does not happen when using just one or the other. How can I use both while also canceling futures that fail, without crashing the entire client?

I am trying to parallelize the evaluation of many machine-learning models on a single provided dataset. As the dataset is large, it should be distributed to the workers via scatter. Additionally, I set the memory_limit parameter in LocalCluster. Sometimes, a model will want to use a large amount of memory and go over the limit. My desired behavior is that that future will simply be canceled, and my function will return a default value. I got this working without scatter, but when scattering the data first, the client crashes once the first future goes over the set memory_limit. What is the correct way of using both scatter and memory_limit and gracefully handling bad futures?

Here is a minimal example that demonstrates my issue:

import numpy as np
import pandas as pd
import dask
from dask.distributed import Client
from dask.distributed import LocalCluster

def generate_numpy_array(num_gb):
    num_bytes = num_gb * 1024 * 1024 * 1024
    num_elements = int(num_bytes // np.dtype('float64').itemsize)
    arr = np.random.rand(num_elements)
    s = np.sum(arr)
    return num_gb

with LocalCluster(n_workers=2, threads_per_worker=1, memory_limit="5GB") as cluster:
    with Client(cluster) as client:

        sizes = [10,1,1]
        sizes = [client.scatter(s) for s in sizes] # if this line is removed, everything works
        futures = [client.submit(generate_numpy_array, s, pure=False) for s in sizes]
        dask.distributed.progress(futures, notebook=True)
        dask.distributed.wait(futures)
        
        results = []
        for size, future in zip(sizes, futures):
            if not future.done():
                future.cancel()
                print(f'timeout')
                results.append("TIMEOUT")
            elif future.exception():
                results.append("exception")
                print(future.exception())
            else:
                results.append(future.result())

results

Here, the function being parallelized will generate a random matrix with a given size in GB. It does some stuff, then returns a value. Later we collect all the results in a list. I set a memory limit of 5GB. The desired behavior would be that we will just append a default value instead of the result for each parameter that crashes. In this example, sizes=[10,1,1], so I would like the output of results to be [‘exception’, 1, 1].

If I comment out the line that transforms the data into futures, everything seems to work correctly, and I get the desired output.

However, if I first scatter the parameters, the client crashes once the first future goes over the memory limit.

I get the following error: CancelledError: ['generate_numpy_array-db61b9bc-6285-4be0-bb2e-7c1683254904']

some more related questions:

  1. When a future goes over the memory limit, dask tries to reschedule it a number of times. How can I limit how many retries a given futures is allowed?
  2. Is it possible to get a more informative error message that indicates that the task failed due to memory issues? The future.exception just returns that it failed, but not why.
  3. when using a local cluster, does scatter produce multiple copies of the data in RAM for each worker, or can all workers read the same object reference?

Thanks for the help!

Hi,
The scatter is inconsequential.

You are observing a race condition between your user function and the memory monitor.
The memory monitor will check the process memory every 100ms. If your user function manages to ramp up from below 4.75 gb (5gb *0.95, the terminate threshold) to the max and then release the memory in less than 100ms, the memory monitor won’t notice. Otherwise the process will be killed.

Please read : Worker Memory Management — Dask.distributed 2023.5.0 documentation

When a future goes over the memory limit, dask tries to reschedule it a number of times. How can I limit how many retries a given futures is allowed?

You can configure the number of retries through the dask config : Configuration — Dask documentation

Is it possible to get a more informative error message that indicates that the task failed due to memory issues? The future.exception just returns that it failed, but not why.

If a worker dies, all tasks on it fail with KilledWorker. I agree that MemoryError would be better. This is a frequently requested feature.

when using a local cluster, does scatter produce multiple copies of the data in RAM for each worker, or can all workers read the same object reference?

Multiple copies. Pass processes=False, num_workers=1 to use multitreading and share references.

Thanks for the help. I tried looking in to the documentation more as you suggested, but I am still stuck as to how to modify my code to prevent the initial crashing. What would you suggest?

Here’s an example that is a bit closer to what I actually want to do:

import numpy as np
import pandas as pd
import dask
from dask.distributed import Client
from dask.distributed import LocalCluster
from dask import config as cfg
import sklearn
import sklearn.linear_model
import sklearn.ensemble
import sklearn.svm
import time
from dask_ml.wrappers import ParallelPostFit

def set_dask_settings():
    cfg.set({'distributed.scheduler.worker-ttl': None})
    cfg.set({'distributed.scheduler.allowed-failures':1})
    dask.config.set({"distributed.worker.daemon": False})
set_dask_settings()

def generate_xy(num_gb):
    num_bytes = num_gb * 1024 * 1024 * 1024
    num_elements = int(num_bytes // np.dtype('float64').itemsize)
    X = np.random.rand(num_elements)
    n_cols = len(X) // 1024
    n_rows = len(X) // n_cols
    total_size = n_rows * n_cols
    X = X[:total_size]
    X = X.reshape(-1, 1024)
    y = np.random.randint(0, 2, size=X.shape[0])
    return X, y

def fit_estimator(est, X, y):
    est = est()
    est.fit(X, y)
    score = est.score(X,y)
    return score

with LocalCluster(n_workers=3, threads_per_worker=1, memory_limit="1GB", processes=True) as cluster:
    with Client(cluster) as client:

        X, y = generate_xy(.5)
        X = client.scatter(X)
        y = client.scatter(y)

        X2, y2 = generate_xy(0.001)
        X2 = client.scatter(X2)
        y2 = client.scatter(y2)

        futures = [client.submit(fit_estimator, sklearn.linear_model.LogisticRegression, X, y, pure=False), client.submit(fit_estimator, sklearn.linear_model.LogisticRegression, X2, y2, pure=False)]
        dask.distributed.progress(futures, notebook=True)
        dask.distributed.wait(futures)
        
        results = []
        for future in futures:
            if not future.done():
                future.cancel()
                print(f'timeout')
                results.append("TIMEOUT")
            elif future.exception():
                results.append("exception")
                print(future.exception())
            else:
                results.append(future.result())

results

This goes over memory when the logistic regression gets fitted, and crashes the client. I want to handle this case so that only that model gets terminated, but the rest remain.