I am running a worker node on my laptop and connecting to a scheduler node running in the cloud; I have a TCP tunnel set up via ngrok. My current process looks like this:
- Set up ngrok tcp endpoint via python script. This creates a tcp endpoint (
tcp://x.tcp.xx-xxx-x.ngrok.io:16446
) for the contact_address and forwards that to localhost:13370.
- Start the dask worker via CLI with
dask worker --contact-address tcp://x.tcp.xx-xxx-x.ngrok.io:16446 --listen-address tcp://localhost:13370 tcp://xx.xxx.xx.xxx:8786
This works beautifully and I’m able to complete tasks and gather
on the cloud scheduler/client works with no issues.
I am now trying to programmatically spin up the worker instance in the same python script that I’m already using to start the ngrok tcp tunnel. But there doesn’t seem to be a listen_address
*arg/**kwarg ??? The below snippet does not in fact hook into the worker and tcp tunnel in the same way as the CLI.
from dask.distributed import Worker
scheduler_address = 'tcp://xx.xxx.xx.xxx:8786'
async def start_dask_worker(scheduler_address, ngrok_url):
worker = Worker(scheduler_address, name=f"Worker-local-ngrok", contact_address=ngrok_url, host="localhost", protocol="tcp", port="13370")
await worker.start()
# start ngrok tunnel and grab the ngrok_url from the listener.url()
asyncio.run(start_dask_worker(scheduler_address, ngrok_url))
# keep the script alive and running
Anything I’m missing here? I just need listen_address…
From looking at the source code, it seems listen_address is on Nanny’s side. Maybe you should try to start a Nanny instead of just a Worker. This is what does the command line.
1 Like
Ahh I see, I didn’t realize the CLI was starting a Nanny. I will give that a go, thanks for the recommendation.
Okay, forward progress! But still seems to not be hooking the worker to listen_address in the same way as the CLI.
import asyncio, ngrok
from dask.distributed import Nanny
scheduler_address = 'tcp://xx.xxx.xx.xxx:8786'
async def start_nanny(ngrok_url):
await Nanny(
scheduler_address,
name="Worker-local-ngrok",
contact_address=ngrok_url,
listen_address="tcp://localhost:13370"
)
# This is able to forward to CLI instance `dask worker --listen_address tcp://localhost:13370`
async def create_ngrok_listener() -> ngrok.Listener:
# create session
session: ngrok.Session = (
await ngrok.SessionBuilder()
.authtoken_from_env()
.metadata("Online in One Line")
.connect()
)
# create listener
return (
await session.tcp_endpoint()
.metadata("example listener metadata from python")
.listen_and_forward("tcp://localhost:13370")
)
if __name__ == "__main__":
listener = asyncio.run(create_ngrok_listener())
listener_url = listener.url()
print(f"NGROK STARTED AT: {listener_url}")
asyncio.run(start_nanny(listener_url))
try:
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
ngrok.disconnect(listener_url)
I see the nanny registered in my cloud scheduler. I am able to complete tasks on the local worker. The cloud scheduler is able to create a TCP connection back to my ngrok tunnel at gather()
, but then the connection from local ngrok → tcp://localhost:13370
for the nanny gets refused with error connecting to upstream error=Connection refused (os error 61)
.
I believe this is still in the way I am setting up the Nanny in my code because if I comment out the asyncio.run(start_nanny(listener_url))
line, run the script to start the tcp tunnel, and then use the dask worker
cli then everything works perfectly…
[EDIT] I updated the Nanny instance to include protocol="tcp"
with no change in behavior.
Hm, I’m not sure what could be the problem here. Maybe it’s wort opening an issue with your example. cc @crusaderky.
1 Like