I am trying to parallelize the evaluation of many machine-learning models on a single provided dataset. As the dataset is large, it should be distributed to the workers via scatter. Additionally, I set the memory_limit parameter in LocalCluster. Sometimes, a model will want to use a large amount of memory and go over the limit. My desired behavior is that that future will simply be canceled, and my function will return a default value. I got this working without scatter, but when scattering the data first, the client crashes once the first future goes over the set memory_limit. What is the correct way of using both scatter and memory_limit and gracefully handling bad futures?
Here is a minimal example that demonstrates my issue:
import numpy as np
import pandas as pd
import dask
from dask.distributed import Client
from dask.distributed import LocalCluster
def generate_numpy_array(num_gb):
num_bytes = num_gb * 1024 * 1024 * 1024
num_elements = int(num_bytes // np.dtype('float64').itemsize)
arr = np.random.rand(num_elements)
s = np.sum(arr)
return num_gb
with LocalCluster(n_workers=2, threads_per_worker=1, memory_limit="5GB") as cluster:
with Client(cluster) as client:
sizes = [10,1,1]
sizes = [client.scatter(s) for s in sizes] # if this line is removed, everything works
futures = [client.submit(generate_numpy_array, s, pure=False) for s in sizes]
dask.distributed.progress(futures, notebook=True)
dask.distributed.wait(futures)
results = []
for size, future in zip(sizes, futures):
if not future.done():
future.cancel()
print(f'timeout')
results.append("TIMEOUT")
elif future.exception():
results.append("exception")
print(future.exception())
else:
results.append(future.result())
results
Here, the function being parallelized will generate a random matrix with a given size in GB. It does some stuff, then returns a value. Later we collect all the results in a list. I set a memory limit of 5GB. The desired behavior would be that we will just append a default value instead of the result for each parameter that crashes. In this example, sizes=[10,1,1], so I would like the output of results to be [‘exception’, 1, 1].
If I comment out the line that transforms the data into futures, everything seems to work correctly, and I get the desired output.
However, if I first scatter the parameters, the client crashes once the first future goes over the memory limit.
I get the following error: CancelledError: ['generate_numpy_array-db61b9bc-6285-4be0-bb2e-7c1683254904']
some more related questions:
- When a future goes over the memory limit, dask tries to reschedule it a number of times. How can I limit how many retries a given futures is allowed?
- Is it possible to get a more informative error message that indicates that the task failed due to memory issues? The future.exception just returns that it failed, but not why.
- when using a local cluster, does scatter produce multiple copies of the data in RAM for each worker, or can all workers read the same object reference?
Thanks for the help!