Hi @guillaumeeb thanks for the quick reply. I tried to sort out the error, turns out that i was trying to operate the tunnel in a different loop than the one Dask would open for the SSHCluster class.
After some research I managed to setup a nest_asyncio loop where i could run everything and link the SSHCluster class with asyncronous = True
.
I then managed to manually register my ssh key in the gateway, so now the simple string format works too!
Now I am encountering an issue regarding Dask trying to connect to the scheduler directly through tcp://10.67.22.89:8787, but of course the VM is not directly accessible to my network.
Is there any way I can change the address that Dask tries to access, so maybe I can setup a port forward from localhost:8787 to the VM_IP:8787?
Here I was trying to set the dashboard_address to localhost, I don’t know if I can set the scheduler address to localhost:port so I won’t need to setup a port forward manually
A simple access to the keywords i could use in scheduler_options too would be great, as I only found the direct Class implementation of distributed.scheduler and checking there is a bit tedious, thanks a lot.
Here is my code for reference and the whole stack error i get:
from dask.distributed import Client, SSHCluster
connect_options = {
"username": VM_USER,
"tunnel": f"{GATEWAY_USER}@{GATEWAY_HOST}",
}
cluster = SSHCluster(
hosts=VM_IPS,
connect_options=connect_options,
worker_options={"nthreads": 4},
scheduler_options={
#"port": 8786,
"dashboard_address": "localhost:8787",
"jupyter" : True,
"protocol" : False
},
remote_python="/usr/bin/python3"
)
client = Client(cluster)
print("Dask cluster started.")
print(client)
Here is the whole stack error:
2024-08-16 14:35:48,279 - distributed.deploy.ssh - INFO - /home/tuscano/.local/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
2024-08-16 14:35:48,283 - distributed.deploy.ssh - INFO - Perhaps you already have a cluster running?
2024-08-16 14:35:48,286 - distributed.deploy.ssh - INFO - Hosting the HTTP server on port 40313 instead
2024-08-16 14:35:48,288 - distributed.deploy.ssh - INFO - warnings.warn(
2024-08-16 14:35:48,409 - distributed.deploy.ssh - INFO - [W 2024-08-16 12:35:48.410 ServerApp] ServerApp.token config is deprecated in 2.0. Use IdentityProvider.token.
2024-08-16 14:35:48,434 - distributed.deploy.ssh - INFO - [I 2024-08-16 12:35:48.422 ServerApp] jupyter_server_proxy | extension was successfully linked.
2024-08-16 14:35:48,437 - distributed.deploy.ssh - INFO - [I 2024-08-16 12:35:48.425 ServerApp] jupyter_server_terminals | extension was successfully linked.
2024-08-16 14:35:48,463 - distributed.deploy.ssh - INFO - [W 2024-08-16 12:35:48.435 ServerApp] All authentication is disabled. Anyone who can connect to this server will be able to run code.
2024-08-16 14:35:48,466 - distributed.deploy.ssh - INFO - [I 2024-08-16 12:35:48.442 ServerApp] jupyter_server_proxy | extension was successfully loaded.
2024-08-16 14:35:48,468 - distributed.deploy.ssh - INFO - [I 2024-08-16 12:35:48.443 ServerApp] jupyter_server_terminals | extension was successfully loaded.
2024-08-16 14:35:48,471 - distributed.deploy.ssh - INFO - 2024-08-16 12:35:48,445 - distributed.scheduler - INFO - State start
2024-08-16 14:35:48,474 - distributed.deploy.ssh - INFO - 2024-08-16 12:35:48,446 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-bs3bdmt4', purging
2024-08-16 14:35:48,476 - distributed.deploy.ssh - INFO - 2024-08-16 12:35:48,448 - distributed.scheduler - INFO - Scheduler at: tcp://10.67.22.89:45733
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:1952, in wait_for(fut, timeout)
1951 async with asyncio.timeout(timeout):
-> 1952 return await fut
File ~/3.12venv/lib64/python3.12/site-packages/distributed/comm/tcp.py:546, in BaseTCPConnector.connect(self, address, deserialize, **connection_args)
545 else:
--> 546 stream = await self.client.connect(
547 ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
548 )
550 # Under certain circumstances tornado will have a closed connection with an
551 # error and not raise a StreamClosedError.
552 #
553 # This occurs with tornado 5.x and openssl 1.1+
File ~/3.12venv/lib64/python3.12/site-packages/tornado/tcpclient.py:279, in TCPClient.connect(self, host, port, af, ssl_options, max_buffer_size, source_ip, source_port, timeout)
270 connector = _Connector(
271 addrinfo,
272 functools.partial(
(...)
277 ),
278 )
--> 279 af, addr, stream = await connector.start(connect_timeout=timeout)
280 # TODO: For better performance we could cache the (af, addr)
281 # information here and re-use it on subsequent connections to
282 # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
CancelledError:
The above exception was the direct cause of the following exception:
TimeoutError Traceback (most recent call last)
File ~/3.12venv/lib64/python3.12/site-packages/distributed/comm/core.py:342, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
341 try:
--> 342 comm = await wait_for(
343 connector.connect(loc, deserialize=deserialize, **connection_args),
344 timeout=min(intermediate_cap, time_left()),
345 )
346 break
File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:1951, in wait_for(fut, timeout)
1950 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1951 async with asyncio.timeout(timeout):
1952 return await fut
File /usr/lib64/python3.12/asyncio/timeouts.py:115, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb)
112 if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError:
113 # Since there are no new cancel requests, we're
114 # handling this.
--> 115 raise TimeoutError from exc_val
116 elif self._state is _State.ENTERED:
TimeoutError:
The above exception was the direct cause of the following exception:
OSError Traceback (most recent call last)
File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/spec.py:331, in SpecCluster._start(self)
326 self.scheduler_comm = rpc(
327 getattr(self.scheduler, "external_address", None)
328 or self.scheduler.address,
329 connection_args=self.security.get_connection_args("client"),
330 )
--> 331 await super()._start()
332 except Exception as e: # pragma: no cover
File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/cluster.py:133, in Cluster._start(self)
132 async def _start(self):
--> 133 comm = await self.scheduler_comm.live_comm()
134 comm.name = "Cluster worker status"
File ~/3.12venv/lib64/python3.12/site-packages/distributed/core.py:1130, in rpc.live_comm(self)
1129 if not open or comm.closed():
-> 1130 comm = await connect(
1131 self.address,
1132 self.timeout,
1133 deserialize=self.deserialize,
1134 **self.connection_args,
1135 )
1136 comm.name = "rpc"
File ~/3.12venv/lib64/python3.12/site-packages/distributed/comm/core.py:368, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
367 else:
--> 368 raise OSError(
369 f"Timed out trying to connect to {addr} after {timeout} s"
370 ) from active_exception
372 local_info = {
373 **comm.handshake_info(),
374 **(handshake_overrides or {}),
375 }
OSError: Timed out trying to connect to tcp://10.67.22.89:45733 after 30 s
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
Cell In[2], line 10
1 from dask.distributed import Client, SSHCluster
4 connect_options = {
5 "username": VM_USER,
6 "tunnel": f"{GATEWAY_USER}@{GATEWAY_HOST}",
7 }
---> 10 cluster = SSHCluster(
11 hosts=VM_IPS,
12 connect_options=connect_options,
13 worker_options={"nthreads": 4},
14 scheduler_options={
15 #"port": 8786,
16 "dashboard_address": "localhost:8787",
17 "jupyter" : True,
18 "protocol" : False
19 },
20 remote_python="/usr/bin/python3"
21 )
24 client = Client(cluster)
25 print("Dask cluster started.")
File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/ssh.py:463, in SSHCluster(hosts, connect_options, worker_options, scheduler_options, worker_module, worker_class, remote_python, **kwargs)
433 scheduler = {
434 "cls": Scheduler,
435 "options": {
(...)
444 },
445 }
446 workers = {
447 i: {
448 "cls": Worker,
(...)
461 for i, host in enumerate(hosts[1:])
462 }
--> 463 return SpecCluster(workers, scheduler, name="SSHCluster", **kwargs)
File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/spec.py:284, in SpecCluster.__init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
282 if not self.called_from_running_loop:
283 self._loop_runner.start()
--> 284 self.sync(self._start)
285 try:
286 self.sync(self._correct_state)
File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:363, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
361 return future
362 else:
--> 363 return sync(
364 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
365 )
File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:439, in sync(loop, func, callback_timeout, *args, **kwargs)
436 wait(10)
438 if error is not None:
--> 439 raise error
440 else:
441 return result
File ~/3.12venv/lib64/python3.12/site-packages/distributed/utils.py:413, in sync.<locals>.f()
411 awaitable = wait_for(awaitable, timeout)
412 future = asyncio.ensure_future(awaitable)
--> 413 result = yield future
414 except Exception as exception:
415 error = exception
File ~/3.12venv/lib64/python3.12/site-packages/tornado/gen.py:767, in Runner.run(self)
765 try:
766 try:
--> 767 value = future.result()
768 except Exception as e:
769 # Save the exception for later. It's important that
770 # gen.throw() not be called inside this try/except block
771 # because that makes sys.exc_info behave unexpectedly.
772 exc: Optional[Exception] = e
File ~/3.12venv/lib64/python3.12/site-packages/distributed/deploy/spec.py:335, in SpecCluster._start(self)
333 self.status = Status.failed
334 await self._close()
--> 335 raise RuntimeError(f"Cluster failed to start: {e}") from e
RuntimeError: Cluster failed to start: Timed out trying to connect to tcp://10.67.22.89:45733 after 30 s