Converting worker --listen-address to python method args

I am running a worker node on my laptop and connecting to a scheduler node running in the cloud; I have a TCP tunnel set up via ngrok. My current process looks like this:

  • Set up ngrok tcp endpoint via python script. This creates a tcp endpoint (tcp://x.tcp.xx-xxx-x.ngrok.io:16446) for the contact_address and forwards that to localhost:13370.
  • Start the dask worker via CLI with
dask worker --contact-address tcp://x.tcp.xx-xxx-x.ngrok.io:16446 --listen-address tcp://localhost:13370 tcp://xx.xxx.xx.xxx:8786

This works beautifully and I’m able to complete tasks and gather on the cloud scheduler/client works with no issues.

I am now trying to programmatically spin up the worker instance in the same python script that I’m already using to start the ngrok tcp tunnel. But there doesn’t seem to be a listen_address *arg/**kwarg ??? The below snippet does not in fact hook into the worker and tcp tunnel in the same way as the CLI.

from dask.distributed import Worker

scheduler_address = 'tcp://xx.xxx.xx.xxx:8786'

async def start_dask_worker(scheduler_address, ngrok_url):
    worker = Worker(scheduler_address, name=f"Worker-local-ngrok", contact_address=ngrok_url, host="localhost", protocol="tcp", port="13370")
    await worker.start()

# start ngrok tunnel and grab the ngrok_url from the listener.url()

asyncio.run(start_dask_worker(scheduler_address, ngrok_url))

# keep the script alive and running

Anything I’m missing here? I just need listen_address…

From looking at the source code, it seems listen_address is on Nanny’s side. Maybe you should try to start a Nanny instead of just a Worker. This is what does the command line.

1 Like

Ahh I see, I didn’t realize the CLI was starting a Nanny. I will give that a go, thanks for the recommendation.

Okay, forward progress! But still seems to not be hooking the worker to listen_address in the same way as the CLI.

import asyncio, ngrok
from dask.distributed import Nanny

scheduler_address = 'tcp://xx.xxx.xx.xxx:8786'

async def start_nanny(ngrok_url):
    await Nanny(
        scheduler_address,
        name="Worker-local-ngrok",
        contact_address=ngrok_url,
        listen_address="tcp://localhost:13370"
    )

# This is able to forward to CLI instance `dask worker --listen_address tcp://localhost:13370`
async def create_ngrok_listener() -> ngrok.Listener:
    # create session
    session: ngrok.Session = (
        await ngrok.SessionBuilder()
            .authtoken_from_env()
            .metadata("Online in One Line")
            .connect()
    )
    # create listener
    return (
        await session.tcp_endpoint()
            .metadata("example listener metadata from python")
            .listen_and_forward("tcp://localhost:13370")
    )

if __name__ == "__main__":
    listener = asyncio.run(create_ngrok_listener())
    listener_url = listener.url()
    print(f"NGROK STARTED AT: {listener_url}")

    asyncio.run(start_nanny(listener_url))

    try:
        import time
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    finally:
        ngrok.disconnect(listener_url)

I see the nanny registered in my cloud scheduler. I am able to complete tasks on the local worker. The cloud scheduler is able to create a TCP connection back to my ngrok tunnel at gather(), but then the connection from local ngrok → tcp://localhost:13370 for the nanny gets refused with error connecting to upstream error=Connection refused (os error 61).

I believe this is still in the way I am setting up the Nanny in my code because if I comment out the asyncio.run(start_nanny(listener_url)) line, run the script to start the tcp tunnel, and then use the dask worker cli then everything works perfectly…

[EDIT] I updated the Nanny instance to include protocol="tcp" with no change in behavior.

Hm, I’m not sure what could be the problem here. Maybe it’s wort opening an issue with your example. cc @crusaderky.

1 Like