Hello everyone.
I am using Dask-gateway for a Kubernetes cluster for the remote managing clusters. Currently I faced to the following problem: I need to submit multiple task to the Dask cluster and gather the result back from it. I tried to use client.submit, client.gather functionalities from here (Futures — Dask documentation), but the result is a Future object instead of the actual result. I would be happy if someone could help me. Below you can find the example from my code.
from dask_gateway import Gateway
gateway = Gateway(url)
cluster = gateway.new_cluster(shutdown_on_close=False)
cluster.scale(nodes_number)
client = cluster.get_client()
@dask.delayed
def calculate(item):
return item + 1
x = client.submit(calculate, 1)
x.result() # the result of the line is a list of Futures
L = [client.submit(calculate, i) for i in range(100)]
futures = client.map(calculate, L)
results = client.gather(futures, asynchronous=True) # the result of the line is a list of Futures
results[0].result() # this is also a Futute object instead of the actual result
I have a few thoughts here:
- You’re using both the delayed API and futures API, you only need one of these
- You are doing a list comprehension of submits and then a map which is basically doing the same thing
- You’re setting
asynchronous=True
but not calling await
on the result (I’m seeing TypeError: 'coroutine' object is not subscriptable
on the last line)
I would get rid of the delayed call and just stick with the futures API.
# Ignoring the dask-gateway stuff as it isn't relevant to the question
In [1]: from dask.distributed import Client
...: client = Client()
In [2]: def calculate(item):
...: return item + 1
...:
In [3]: x = client.submit(calculate, 1)
...: x
Out[3]: <Future: pending, key: calculate-4d84f3b5d0f5f542fc6848e43dd783e2>
In [4]: x.result()
Out[4]: 2
For the map you don’t need the list comprehension, or the asynchronous
kwarg.
In [10]: futures = client.map(calculate, range(100))
In [11]: results = client.gather(futures)
In [12]: results[0]
Out[12]: 1
I think the problem is related to the dask-gateway as it always returns pointers to the remote objects. Could you please check your codes with dask-gateway?
I don’t agree, using LocalCluster
and dask-gateway
uses the same API. I think the problem is you’re using delayed on the function and then submitting it via the futures API, so you get a delayed future inside a concurrent future. Can you try the examples I gave with your dask-gateway
cluster.
I think I need to create a a client instance (as I did) as mentioned in the documentation (Usage — Dask Gateway 2022.10.0 documentation), otherwise I cannot use the resources of the remote cluster.
Yeah you definitely need a client instance. Your example does this via the cluster.get_client()
. In my example I just create Client
directly (which starts a scheduler and workers automatically on my laptop).
The StackOverflow example you linked is showing lost data on the scheduler, are you also seeing that error message?
Could you try the example I shared but using dask-gateway
and post the results here?
Thanks a lot, the problem was solved.