How can I print a progress bar and also utilize a timeout when evaluating dask futures?

I have a list of futures to evaluate. After a given time limit, I would like to cancel all remaining futures and return a default value. Ideally, I would also like a progress bar. However, It looks like the dask.distributed.progress waits until all futures are complete before continuing, which prevents me from timing out the evaluation.

Here is a minimal example:

import numpy as np
import pandas as pd
import dask
from dask.distributed import Client
from dask.distributed import LocalCluster
import time

def slow_function(s):
    time.sleep(s)
    return s

def parallel_eval(arr, timeout):
    with LocalCluster(n_workers=5, threads_per_worker=1, memory_limit="5GB") as cluster:
        with Client(cluster) as client:
            futures = [client.submit(slow_function, s, pure=False) for s in arr]
            
            dask.distributed.progress(futures, notebook=False)

            try: 
                dask.distributed.wait(futures, timeout=timeout)
            except dask.distributed.TimeoutError:
                print("terminating parallel evaluation due to timeout")

            results = []
            for future in futures:
                if not future.done():
                    future.cancel()
                    print(f'timeout')
                    results.append("TIMEOUT")
                elif future.exception():
                    results.append("exception")
                    print(future.exception())
                else:
                    results.append(future.result())

            return results

if __name__ == "__main__":
    results_1 = parallel_eval([1,1,2,2,3], 10)
    print()
    results_2 = parallel_eval([1,1,2,2,3,10], 5)

    print()
    print(results_1)
    print(results_2)

Desired output:

[1, 1, 2, 2, 3]
[1, 1, 2, 2, 3, 'TIMEOUT']

Actual output:

[1, 1, 2, 2, 3]
[1, 1, 2, 2, 3, 10]

I get the desired output correctly when removing the line dask.distributed.progress(futures, notebook=False); however, with the progress bar, all futures run to completion.

Questions:

  1. How can I have a progress bar and a global timeout?
  2. Related question, is it possible to set a time limit for an individual future? Currently, in my application, I enforce this within the function being parallelized itself. But I am wondering if Dask has a built-in method for this.

Thanks for the help!

Hi @zechiel,

Unfortunately, I think there is no built-in solution for both your questions. You’ll have to build your own solutions in order to achieve what you want.

You could also have a look to Dask source code and see if you can contribute something!

Thanks for the confirmation! I may look into it