Setting Worker Ports

Hey y’all, I’m looking for some guidance. Let me start with a bit of background.

I’m new to Dask and HPC in general and am currently running python scripts on an IBM Spectrum LSF cluster. I’m currently trying to use LocalCluster to do so. I am launching the script via:

bsub -G compute-general -q general-interactive -n 8 -R 'rusage[mem=12GB]' -Is -a "docker([dockerhub_path])"  python3 ~/git/study/process_files.py 8

As I’ve been shown to in my institutions HPC documentation.

And that seems to mostly work; however, the issue I’m running into currently appears to be a port-forwarding issue, as the cluster only has the ports 8000:8999 open for use and I’m getting a handshake failed error when launching workers outside of that port range.

I’m aware of dask_jobqueue package for interacting with LSF clusters, but unless I’m mistaken that seems to lack the necessary group option ((https://www.ibm.com/docs/en/spectrum-lsf/10.1.0?topic=o-g-1). I’m also not sure that would fix the port issues I’m running into.

So, I guess my question is how can I set the ports for Workers launched from the LocalCluster context? I see this github issue that appears to have implemented that functionality, but I’m still having trouble with the syntax: https://github.com/dask/distributed/issues/899. If this is possible could someone please show me an example syntax?

Or if it is not possible does anyone have some suggestions otherwise? Should I try launching workers from the Docker file and then running my scripts?

For additional context the exception printout is:

2023-03-31 02:06:13,231 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 329, in connect
    handshake = await wait_for(comm.read(), time_left())
  File "/usr/local/lib/python3.10/site-packages/distributed/utils.py", line 1849, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1244, in heartbeat
    response = await retry_operation(
  File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 434, in retry_operation
    return await retry(
  File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 413, in retry
    return await coro()
  File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1262, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1506, in connect
    return await connect_attempt
  File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1427, in _connect
    comm = await connect(
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 334, in connect
    raise OSError(
OSError: Timed out during handshake while connecting to tcp://127.0.0.1:40883 after 30 s
2023-03-31 02:06:44,230 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 329, in connect
    handshake = await wait_for(comm.read(), time_left())
  File "/usr/local/lib/python3.10/site-packages/distributed/utils.py", line 1849, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1244, in heartbeat
    response = await retry_operation(
  File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 434, in retry_operation
    return await retry(
  File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 413, in retry
    return await coro()
  File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1262, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1506, in connect
    return await connect_attempt
  File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1427, in _connect
    comm = await connect(
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 334, in connect
    raise OSError(
OSError: Timed out during handshake while connecting to tcp://127.0.0.1:40883 after 30 s
2023-03-31 02:07:15,229 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 329, in connect
    handshake = await wait_for(comm.read(), time_left())
  File "/usr/local/lib/python3.10/site-packages/distributed/utils.py", line 1849, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1244, in heartbeat
    response = await retry_operation(
  File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 434, in retry_operation
    return await retry(
  File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 413, in retry
    return await coro()
  File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1262, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1506, in connect
    return await connect_attempt
  File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1427, in _connect
    comm = await connect(
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 334, in connect
    raise OSError(
OSError: Timed out during handshake while connecting to tcp://127.0.0.1:40883 after 30 s

Hi @wmcelhenney93, welcome here!

Did you try:

cluster = LocalCluster(port="8000:8999")

I tired that yesterday and it still gave me troubles, but today that seems to be working.

I keep running into these intermittent issues with the cluster, and I can’t tell if it’s me or the cluster.

Anyway I think that actually works. Thank you for the answer.