Cannot fetched the data from remote dask cluster

Hello everyone.
I am using Dask-gateway for a Kubernetes cluster for the remote managing clusters. Currently I faced to the following problem: I need to submit multiple task to the Dask cluster and gather the result back from it. I tried to use client.submit, client.gather functionalities from here (Futures — Dask documentation), but the result is a Future object instead of the actual result. I would be happy if someone could help me. Below you can find the example from my code.

from dask_gateway import Gateway
gateway = Gateway(url)
cluster = gateway.new_cluster(shutdown_on_close=False)
cluster.scale(nodes_number)
client = cluster.get_client()

@dask.delayed
def calculate(item):
    return item + 1

x = client.submit(calculate, 1)
x.result() # the result of the line is a list of Futures

L = [client.submit(calculate, i) for i in range(100)]
futures = client.map(calculate, L)
results = client.gather(futures, asynchronous=True)  # the result of the line is a list of Futures
results[0].result()  # this is also a Futute object instead of the actual result

I have a few thoughts here:

  • You’re using both the delayed API and futures API, you only need one of these
  • You are doing a list comprehension of submits and then a map which is basically doing the same thing
  • You’re setting asynchronous=True but not calling await on the result (I’m seeing TypeError: 'coroutine' object is not subscriptable on the last line)

I would get rid of the delayed call and just stick with the futures API.

# Ignoring the dask-gateway stuff as it isn't relevant to the question
In [1]: from dask.distributed import Client
   ...: client = Client()

In [2]: def calculate(item):
   ...:     return item + 1
   ...:

In [3]: x = client.submit(calculate, 1)
   ...: x
Out[3]: <Future: pending, key: calculate-4d84f3b5d0f5f542fc6848e43dd783e2>

In [4]: x.result()
Out[4]: 2

For the map you don’t need the list comprehension, or the asynchronous kwarg.

In [10]: futures = client.map(calculate, range(100))

In [11]: results = client.gather(futures)

In [12]: results[0]
Out[12]: 1

I think the problem is related to the dask-gateway as it always returns pointers to the remote objects. Could you please check your codes with dask-gateway?

I don’t agree, using LocalCluster and dask-gateway uses the same API. I think the problem is you’re using delayed on the function and then submitting it via the futures API, so you get a delayed future inside a concurrent future. Can you try the examples I gave with your dask-gateway cluster.

Something similar to my question (python - .map followed by .result() and/or client.gather() results in a crash if workers are remote - Stack Overflow). Please check.

I think I need to create a a client instance (as I did) as mentioned in the documentation (Usage — Dask Gateway 2022.10.0 documentation), otherwise I cannot use the resources of the remote cluster.

Yeah you definitely need a client instance. Your example does this via the cluster.get_client(). In my example I just create Client directly (which starts a scheduler and workers automatically on my laptop).

The StackOverflow example you linked is showing lost data on the scheduler, are you also seeing that error message?

Could you try the example I shared but using dask-gateway and post the results here?

Thanks a lot, the problem was solved.