Getting heartbeats from unknown workers

Hello, I’m getting a bunch of these errors running Dask:

2022-07-20 10:39:51,552 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:44235'. 
2022-07-20 10:39:51,552 - distributed.worker - ERROR - Scheduler was unaware of this worker 'tcp://127.0.0.1:44235'. Shutting down.

And then the process ends – it’s fine with one thread, and I’ve ensured there are no other python processes listening on the network at all. The Dask version I’m using is 2022.7.0.

My code is basically just:

if __name__ == "main":
    with Client() as client:
        fire_and_forget(client.map(really_long_process, list_of_inputs)))

Okay, I got a bit further with my EC2 cluster.

  extra_bootstrap = [
      "DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt-get install -y awscli s3fs",
      "aws ecr get-login-password --region [region] | docker login --username AWS --password-stdin [registry]",
  ]
  with EC2Cluster(extra_bootstrap=extra_bootstrap, n_workers=mp_num, docker_image="[registry]", docker_args="--device /dev/fuse --cap-add SYS_ADMIN --security-opt apparmor:unconfined",  # For mounting S3 buckets
                  instance_type="[instance]", security=False,  # To work around https://github.com/dask/dask-cloudprovider/issues/249
                  iam_instance_profile={"Name": "[iam_profile]"}, worker_options={"nthreads": 4}) as cluster:
      with Client(cluster) as client:
          client.register_worker_plugin(AddProcessPool)  # From https://www.youtube.com/watch?v=vF2VItVU5zg&t=515s
          with dask.annotate(executor="processes"):
              client.gather(client.map(my process, my_input))

It can’t seem to find the "processes" executor on the worker, which means that the register_worker_plugin call isn’t propagated in this case.

@hameer-spire Welcome to Discourse! Could you please share a minimal+reproducible example, and some more details on how you’re setting up your cluster in the first example? It’ll allow us to help you better.

I’m also findind something similar (see traceback below). Sadly it’s not easy to create an MCVE probably because it’s a complex function and often happens on remote machines e.g. an EC2 machine and in my case it’s a shared jupyterhub instance.

Would it be possible to elaborate in Why did my worker die? — Dask.distributed 2022.8.1 documentation for CommClosedError?

2022-08-19 02:50:30,752 - distributed.worker - WARNING - Heartbeat to scheduler failed
Traceback (most recent call last):
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 223, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/worker.py", line 1159, in heartbeat
    response = await retry_operation(
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/utils_comm.py", line 383, in retry_operation
    return await retry(
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/utils_comm.py", line 368, in retry
    return await coro()
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/core.py", line 1154, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/core.py", line 919, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:35462 remote=tcp://127.0.0.1:40265>: Stream is closed
2022-08-19 02:50:30,805 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:34603'.
2022-08-19 02:50:30,805 - distributed.worker - WARNING - Heartbeat to scheduler failed
Traceback (most recent call last):
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 223, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/worker.py", line 1159, in heartbeat
    response = await retry_operation(
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/utils_comm.py", line 383, in retry_operation
    return await retry(
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/utils_comm.py", line 368, in retry
    return await coro()
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/core.py", line 1154, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/core.py", line 919, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File "/opt/userenvs/ray.bell/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:35484 remote=tcp://127.0.0.1:40265>: Stream is closed
2022-08-19 02:50:30,812 - distributed.worker - ERROR - Scheduler was unaware of this worker 'tcp://127.0.0.1:34603'. Shutting down.

I found a retry mostly fixes it.