Creating a dask SSHCluster with a tunnel

Hello, I am trying to create a Dask cluster via SSH.
I have 3 VMs under a subnet where I can access to a gate.
I read dask and then asyncssh.connect documentation and discovered the keyword “tunnel” that can be passed in connect_options during the SSHCluster class initialization.

I tried both passing a string in f’gate_user@gate_host’ format and an async.connect function itself with host, username, password.

The point is that I cannot install my public key to access the gate without a password and passing the async.ssh object results in this error:

AttributeError: '_ACMWrapper' object has no attribute 'create_connection'

here is my code for reference:

import asyncssh

gateway_options = {
    "username": GATEWAY_USER,
    "password": GATEWAY_PASS,
    "client_host_keys": GATEWAY_KEY
}

dask_connect_options = {
        "username": VM_USER,
        "tunnel": asyncssh.connect(GATEWAY_HOST, connect_options=gateway_options)
}

from distributed import Client, SSHCluster

worker_addresses = ['10.67.22.89', '10.67.22.241', '10.67.22.171']

cluster = SSHCluster(
    hosts=worker_addresses,
    connect_options=dask_connect_options,
    worker_options={
        "nthreads": 4, 
        "memory_limit": "8GB",  
    },
    scheduler_options={
        "port": 8786,
    }
)


client = Client(cluster)

Hi @tusca99, welcome to Dask Discourse forum!

Could you post the complete Stacktrace of your error? It’s difficult to tell if it comes from Dask or asyncssh…

Hi @guillaumeeb thanks for the quick reply. I tried to sort out the error, turns out that i was trying to operate the tunnel in a different loop than the one Dask would open for the SSHCluster class.
After some research I managed to setup a nest_asyncio loop where i could run everything and link the SSHCluster class with asyncronous = True .

I then managed to manually register my ssh key in the gateway, so now the simple string format works too!

Now I am encountering an issue regarding Dask trying to connect to the scheduler directly through tcp://10.67.22.89:8787, but of course the VM is not directly accessible to my network.

Is there any way I can change the address that Dask tries to access, so maybe I can setup a port forward from localhost:8787 to the VM_IP:8787?

Here I was trying to set the dashboard_address to localhost, I don’t know if I can set the scheduler address to localhost:port so I won’t need to setup a port forward manually

A simple access to the keywords i could use in scheduler_options too would be great, as I only found the direct Class implementation of distributed.scheduler and checking there is a bit tedious, thanks a lot.

Here is my code for reference and the whole stack error i get:

from dask.distributed import Client, SSHCluster

connect_options = {
    "username": VM_USER,
    "tunnel": f"{GATEWAY_USER}@{GATEWAY_HOST}",
}


cluster = SSHCluster(
    hosts=VM_IPS,
    connect_options=connect_options,
    worker_options={"nthreads": 4},
    scheduler_options={
        #"port": 8786, 
        "dashboard_address": "localhost:8787",
        "jupyter" : True,
        "protocol" : False
    },
    remote_python="/usr/bin/python3"
)


client = Client(cluster)
print("Dask cluster started.")
print(client)

Here is the whole stack error:

2024-08-16 14:35:48,279 - distributed.deploy.ssh - INFO - /home/tuscano/.local/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
2024-08-16 14:35:48,283 - distributed.deploy.ssh - INFO - Perhaps you already have a cluster running?
2024-08-16 14:35:48,286 - distributed.deploy.ssh - INFO - Hosting the HTTP server on port 40313 instead
2024-08-16 14:35:48,288 - distributed.deploy.ssh - INFO - warnings.warn(
2024-08-16 14:35:48,409 - distributed.deploy.ssh - INFO - [W 2024-08-16 12:35:48.410 ServerApp] ServerApp.token config is deprecated in 2.0. Use IdentityProvider.token.
2024-08-16 14:35:48,434 - distributed.deploy.ssh - INFO - [I 2024-08-16 12:35:48.422 ServerApp] jupyter_server_proxy | extension was successfully linked.
2024-08-16 14:35:48,437 - distributed.deploy.ssh - INFO - [I 2024-08-16 12:35:48.425 ServerApp] jupyter_server_terminals | extension was successfully linked.
2024-08-16 14:35:48,463 - distributed.deploy.ssh - INFO - [W 2024-08-16 12:35:48.435 ServerApp] All authentication is disabled.  Anyone who can connect to this server will be able to run code.
2024-08-16 14:35:48,466 - distributed.deploy.ssh - INFO - [I 2024-08-16 12:35:48.442 ServerApp] jupyter_server_proxy | extension was successfully loaded.
2024-08-16 14:35:48,468 - distributed.deploy.ssh - INFO - [I 2024-08-16 12:35:48.443 ServerApp] jupyter_server_terminals | extension was successfully loaded.
2024-08-16 14:35:48,471 - distributed.deploy.ssh - INFO - 2024-08-16 12:35:48,445 - distributed.scheduler - INFO - State start
2024-08-16 14:35:48,474 - distributed.deploy.ssh - INFO - 2024-08-16 12:35:48,446 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-bs3bdmt4', purging
2024-08-16 14:35:48,476 - distributed.deploy.ssh - INFO - 2024-08-16 12:35:48,448 - distributed.scheduler - INFO -   Scheduler at:   tcp://10.67.22.89:45733

---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:1952, in wait_for(fut, timeout)
   1951 async with asyncio.timeout(timeout):
-> 1952     return await fut

File ~/3.12venv/lib64/python3.12/site-packages/distributed/comm/tcp.py:546, in BaseTCPConnector.connect(self, address, deserialize, **connection_args)
    545 else:
--> 546     stream = await self.client.connect(
    547         ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
    548     )
    550 # Under certain circumstances tornado will have a closed connection with an
    551 # error and not raise a StreamClosedError.
    552 #
    553 # This occurs with tornado 5.x and openssl 1.1+

File ~/3.12venv/lib64/python3.12/site-packages/tornado/tcpclient.py:279, in TCPClient.connect(self, host, port, af, ssl_options, max_buffer_size, source_ip, source_port, timeout)
    270 connector = _Connector(
    271     addrinfo,
    272     functools.partial(
   (...)
    277     ),
    278 )
--> 279 af, addr, stream = await connector.start(connect_timeout=timeout)
    280 # TODO: For better performance we could cache the (af, addr)
    281 # information here and re-use it on subsequent connections to
    282 # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)

CancelledError: 

The above exception was the direct cause of the following exception:

TimeoutError                              Traceback (most recent call last)
File ~/3.12venv/lib64/python3.12/site-packages/distributed/comm/core.py:342, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    341 try:
--> 342     comm = await wait_for(
    343         connector.connect(loc, deserialize=deserialize, **connection_args),
    344         timeout=min(intermediate_cap, time_left()),
    345     )
    346     break

File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:1951, in wait_for(fut, timeout)
   1950 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1951     async with asyncio.timeout(timeout):
   1952         return await fut

File /usr/lib64/python3.12/asyncio/timeouts.py:115, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb)
    112     if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError:
    113         # Since there are no new cancel requests, we're
    114         # handling this.
--> 115         raise TimeoutError from exc_val
    116 elif self._state is _State.ENTERED:

TimeoutError: 

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/spec.py:331, in SpecCluster._start(self)
    326     self.scheduler_comm = rpc(
    327         getattr(self.scheduler, "external_address", None)
    328         or self.scheduler.address,
    329         connection_args=self.security.get_connection_args("client"),
    330     )
--> 331     await super()._start()
    332 except Exception as e:  # pragma: no cover

File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/cluster.py:133, in Cluster._start(self)
    132 async def _start(self):
--> 133     comm = await self.scheduler_comm.live_comm()
    134     comm.name = "Cluster worker status"

File ~/3.12venv/lib64/python3.12/site-packages/distributed/core.py:1130, in rpc.live_comm(self)
   1129 if not open or comm.closed():
-> 1130     comm = await connect(
   1131         self.address,
   1132         self.timeout,
   1133         deserialize=self.deserialize,
   1134         **self.connection_args,
   1135     )
   1136     comm.name = "rpc"

File ~/3.12venv/lib64/python3.12/site-packages/distributed/comm/core.py:368, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    367 else:
--> 368     raise OSError(
    369         f"Timed out trying to connect to {addr} after {timeout} s"
    370     ) from active_exception
    372 local_info = {
    373     **comm.handshake_info(),
    374     **(handshake_overrides or {}),
    375 }

OSError: Timed out trying to connect to tcp://10.67.22.89:45733 after 30 s

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
Cell In[2], line 10
      1 from dask.distributed import Client, SSHCluster
      4 connect_options = {
      5     "username": VM_USER,
      6     "tunnel": f"{GATEWAY_USER}@{GATEWAY_HOST}",
      7 }
---> 10 cluster = SSHCluster(
     11     hosts=VM_IPS,
     12     connect_options=connect_options,
     13     worker_options={"nthreads": 4},
     14     scheduler_options={
     15         #"port": 8786, 
     16         "dashboard_address": "localhost:8787",
     17         "jupyter" : True,
     18         "protocol" : False
     19     },
     20     remote_python="/usr/bin/python3"
     21 )
     24 client = Client(cluster)
     25 print("Dask cluster started.")

File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/ssh.py:463, in SSHCluster(hosts, connect_options, worker_options, scheduler_options, worker_module, worker_class, remote_python, **kwargs)
    433 scheduler = {
    434     "cls": Scheduler,
    435     "options": {
   (...)
    444     },
    445 }
    446 workers = {
    447     i: {
    448         "cls": Worker,
   (...)
    461     for i, host in enumerate(hosts[1:])
    462 }
--> 463 return SpecCluster(workers, scheduler, name="SSHCluster", **kwargs)

File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/spec.py:284, in SpecCluster.__init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
    282 if not self.called_from_running_loop:
    283     self._loop_runner.start()
--> 284     self.sync(self._start)
    285     try:
    286         self.sync(self._correct_state)

File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:363, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    361     return future
    362 else:
--> 363     return sync(
    364         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    365     )

File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:439, in sync(loop, func, callback_timeout, *args, **kwargs)
    436         wait(10)
    438 if error is not None:
--> 439     raise error
    440 else:
    441     return result

File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:413, in sync.<locals>.f()
    411         awaitable = wait_for(awaitable, timeout)
    412     future = asyncio.ensure_future(awaitable)
--> 413     result = yield future
    414 except Exception as exception:
    415     error = exception

File ~/3.12venv/lib64/python3.12/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/spec.py:335, in SpecCluster._start(self)
    333 self.status = Status.failed
    334 await self._close()
--> 335 raise RuntimeError(f"Cluster failed to start: {e}") from e

RuntimeError: Cluster failed to start: Timed out trying to connect to tcp://10.67.22.89:45733 after 30 s

I think there is an external_address kwarg for most Clusters for this case. Not sure it works with SSHCluster, worth a try though.

If it doesn’t, then you’ll need to give the correct address to the client by hand.

1 Like

Hey sorry for not replying but I had a strict schedule and had to finish my project.

Sadly no external_address argument for SSHCluster, I setup port forwarding for8787 to forward the dashboard and settled to use one of the machines as the scheduler too.

Thank you anyway for your time and patience!

1 Like