I am using Dask-gateway for a Kubernetes cluster for the remote managing clusters. Currently I faced to the following problem: I need to submit multiple task to the Dask cluster and gather the result back from it. I tried to use client.submit, client.gather functionalities from here (Futures — Dask documentation), but the result is a Future object instead of the actual result. I would be happy if someone could help me. Below you can find the example from my code.
from dask_gateway import Gateway
gateway = Gateway(url)
cluster = gateway.new_cluster(shutdown_on_close=False)
client = cluster.get_client()
return item + 1
x = client.submit(calculate, 1)
x.result() # the result of the line is a list of Futures
L = [client.submit(calculate, i) for i in range(100)]
futures = client.map(calculate, L)
results = client.gather(futures, asynchronous=True) # the result of the line is a list of Futures
results.result() # this is also a Futute object instead of the actual result
I have a few thoughts here:
- You’re using both the delayed API and futures API, you only need one of these
- You are doing a list comprehension of submits and then a map which is basically doing the same thing
- You’re setting
asynchronous=True but not calling
await on the result (I’m seeing
TypeError: 'coroutine' object is not subscriptable on the last line)
I would get rid of the delayed call and just stick with the futures API.
# Ignoring the dask-gateway stuff as it isn't relevant to the question
In : from dask.distributed import Client
...: client = Client()
In : def calculate(item):
...: return item + 1
In : x = client.submit(calculate, 1)
Out: <Future: pending, key: calculate-4d84f3b5d0f5f542fc6848e43dd783e2>
In : x.result()
For the map you don’t need the list comprehension, or the
In : futures = client.map(calculate, range(100))
In : results = client.gather(futures)
In : results
I think the problem is related to the dask-gateway as it always returns pointers to the remote objects. Could you please check your codes with dask-gateway?
I don’t agree, using
dask-gateway uses the same API. I think the problem is you’re using delayed on the function and then submitting it via the futures API, so you get a delayed future inside a concurrent future. Can you try the examples I gave with your
I think I need to create a a client instance (as I did) as mentioned in the documentation (Usage — Dask Gateway 2022.10.0 documentation), otherwise I cannot use the resources of the remote cluster.
Yeah you definitely need a client instance. Your example does this via the
cluster.get_client(). In my example I just create
Client directly (which starts a scheduler and workers automatically on my laptop).
The StackOverflow example you linked is showing lost data on the scheduler, are you also seeing that error message?
Could you try the example I shared but using
dask-gateway and post the results here?
Thanks a lot, the problem was solved.