I have a list of futures to evaluate. After a given time limit, I would like to cancel all remaining futures and return a default value. Ideally, I would also like a progress bar. However, It looks like the dask.distributed.progress waits until all futures are complete before continuing, which prevents me from timing out the evaluation.
Here is a minimal example:
import numpy as np
import pandas as pd
import dask
from dask.distributed import Client
from dask.distributed import LocalCluster
import time
def slow_function(s):
time.sleep(s)
return s
def parallel_eval(arr, timeout):
with LocalCluster(n_workers=5, threads_per_worker=1, memory_limit="5GB") as cluster:
with Client(cluster) as client:
futures = [client.submit(slow_function, s, pure=False) for s in arr]
dask.distributed.progress(futures, notebook=False)
try:
dask.distributed.wait(futures, timeout=timeout)
except dask.distributed.TimeoutError:
print("terminating parallel evaluation due to timeout")
results = []
for future in futures:
if not future.done():
future.cancel()
print(f'timeout')
results.append("TIMEOUT")
elif future.exception():
results.append("exception")
print(future.exception())
else:
results.append(future.result())
return results
if __name__ == "__main__":
results_1 = parallel_eval([1,1,2,2,3], 10)
print()
results_2 = parallel_eval([1,1,2,2,3,10], 5)
print()
print(results_1)
print(results_2)
Desired output:
[1, 1, 2, 2, 3]
[1, 1, 2, 2, 3, 'TIMEOUT']
Actual output:
[1, 1, 2, 2, 3]
[1, 1, 2, 2, 3, 10]
I get the desired output correctly when removing the line dask.distributed.progress(futures, notebook=False)
; however, with the progress bar, all futures run to completion.
Questions:
- How can I have a progress bar and a global timeout?
- Related question, is it possible to set a time limit for an individual future? Currently, in my application, I enforce this within the function being parallelized itself. But I am wondering if Dask has a built-in method for this.
Thanks for the help!