Submit future with timeout

With a future you can do Future.result([timeout]) which will limit the amount of time the future can run. Is there a way to also do this with submit (ie. submitting a function application to the scheduler Client.submit(func, *args[, key, workers, ...]))?

@jadeidev Welcome to Discourse! Perhaps you can use distributed.wait?

from dask.distributed import Client, wait
from time import sleep

client = Client()

def add(x, y):
    sleep(10)
    return x + y

a = client.submit(add, 30, 20)

wait(a, timeout=5) # Will timeout because add() takes 10 seconds

Also see, Documentation – Waiting on Futures

Thank you @pavithraes.
Question
(1) when I do wait On multiple futures, does the time apply per future or on all of them together (ie all futures would need to finish by this amount of time)
(2) I wanted to use wait on multiple futures and then get results from futures that were able to complete within the allowed timeframe.is that possible?

@jadeidev Good questions!

when I do wait On multiple futures, does the time apply per future or on all of them together (ie all futures would need to finish by this amount of time)

It would apply to all futures together!

I wanted to use wait on multiple futures and then get results from futures that were able to complete within the allowed timeframe.is that possible?

So, Future.result(timeout=...) is meant to do exactly this – you can call result on all your futures with appropriate timeouts to match your outcome. That said, maybe you’re looking for something else? A minimal example and some more details+context about your workflow can help us answer your questions better.

Also, note that wait returns the futures that were completed in the given time if you set return_when='FIRST_COMPLETED', which might be helpful:

from dask.distributed import Client, wait
from time import sleep

# Ensure only two tasks run simultaneously
client = Client(n_workers=2, threads_per_worker=1)
client

def inc(x):
    sleep (10)
    return x + 1

a = client.submit(inc, 10)
b = client.submit(inc, 20)
c = client.submit(inc, 30)
d = client.submit(inc, 40)

# It will take 10s to complete 2 tasks, another 10s for other two.
# Hence, this returns 2 completed futures and 2 pending, after 15s
futures = wait([a,b,c,d], timeout=15, return_when='FIRST_COMPLETED')

futures
# Output: DoneAndNotDoneFutures(done={<Future: finished, type: int, key: inc-xxx>, <Future: finished, type: int, key: inc-xxx>}, not_done={<Future: pending, key: inc-xxx>, <Future: pending, key: inc-xxx>})

Let me know if this helps!

1 Like

thank you, that works! I appreciate you and the time you take to answer my question.
@pavithraes
Future.result() wouldn’t really work out for me because I don’t want it to be blocking.
For my use case, I want to cancel futures that didn’t complete within the allotted time. I have opted for using the 'ALL_COMPLETED' and handling the exception should it timeout. This way wait will return when either all completed or when it times out, whichever is first.
Based on your suggestion I came up with this:

from dask.distributed import Client, wait
from time import sleep

# Ensure only two tasks run simultaneously
client = Client(n_workers=2, threads_per_worker=1)
client
def inc(x):
    sleep(10)
    return x + 1


a = client.submit(inc, 10)
b = client.submit(inc, 20)
c = client.submit(inc, 30)
d = client.submit(inc, 40)
futures = [a, b, c, d]

# It will take 10s to complete 2 tasks, another 10s for other two.
# Hence, this returns 2 completed futures and 2 canceled futures
try:
    wait(futures, timeout=15, return_when="ALL_COMPLETED")
except Exception:
    for f in futures:
        if f.status != "finished":
            f.cancel()
# at this point all tasks are finished or canceled, using the as_completed for convenience only.
# could have iterated on futures directly
for f, result in as_completed(futures, with_results=True):
    # do something with the result
    print(f)
    print(result)
1 Like

Glad I could help, and thank you for sharing your example!