Dask Resiliency with as_completed

I am using an SSHCluster with 60 workers distributed across 8 nodes. The cluster is started and managed programmatically. This is a long running simulation and it is expected to run over approx 48 hours. The work is split over the available nodes, and assigned to the remote workers via client.submit calls. The worker is specified explicitly for each submit call.

The following is a simplified example. param1, param2, param3 represent the data structures that I am passing. They will be different for each submit call; but how they are being generated is out of scope, hence why I simplified them into param1-3.

for worker_index in range(dask_numtasks):
    params = (param1, param2, param3, ...)
    worker_url = worker_urls[worker_index]
    future = client.submit(worker_method, params, workers=worker_url)
    futures.append(future)

for future in as_completed(futures):
    result = future.result()
    process_result(result)

The problem is that sometimes I am getting a situation where 1 worker is being dropped. And the simulation gets stuck. I have seen this happening in real time a couple of times. I would be looking at the list of workers from the Dashboard, and one of the workers will be highlighted with a light red background, with the “last seen” value increasing until it times out. When this happens and I try to access the worker logs, the page gets stuck trying to load and the logs are never displayed. The scheduler logs also don’t show any particular errors or potentially related logs. I have also checked the faulty node for any potential issues. All the workers would still be running and visible in System Monitor, and there will be no error logs whatsoever.

I have had cases where there were errors in the logic, and these are all handled correctly and propagated back to the client to ensure proper logging. This seems to be a network (or Dask) issue or something similar. The fact that it doesn’t always happen at the same stage of the simulation, in my opinion, substantiates my suspicions. Could this be an overload of the network switch? Is there any other way to understand (or hopefully know for sure) what might be happening?

I think the simulation gets stuck because the as_completed function keeps waiting for a future, which never returns.

I have looked at the resilience link in the Dask Distributed documentation, but it does not go into detail with regards to specific contexts (e.g. when evaluating the results via the as_completed method). The as_completed method is particularly useful for my workflow, as it ensures that results are processed as soon as they are ready and returned back to the client. How would one handle resilience in my case (i.e. continuing gracefully with a missing worker)?

Hello, I might have run into something similar. Is it a specific node? I found that the firewall would block incoming messages and the worker close itself down and the scheduler would assume it had died

hi @nickvazz, thanks for your reply. I am using Ubuntu 22.04.3 LTS, and the firewall is not enabled by default (and at all). This issue also doesn’t happen on just one node only, and there doesn’t seem to be a pattern (i.e. it does not always happen, and if it does, never at the same time)

Did you check the Worker logs on the node directly? Maybe there is some hint inside?

Since you affect the task to a specific Worker, if this Worker is failing, you won’t get the result. I’m not sure why it block though and doesn’t raise an exception.

Are other futures being process when a Worker is in this state, or does the simulation get completly stuck even if other Futures are completed after that?

I imagine the answer is no, but couldn’t you avoid affecting tasks to specific workers? Or at least give several Workers for a given Future?

How do you check the logs on the node? And is it possible to check the logs even after the cluster would have been terminated?

Yes. All the workers would have completed, and their results would have been processed and synced with the main process. The simulation gets stuck waiting for the hung worker, until everything dies.

This might actually be possible.Would Dask attempt to allocate the task to another worker, even when using the as_completed method (or does as_completed make no difference here)?

I can try this out. Do you think just terminating the process from the remote node would be a close enough replication of the described “hanging” scenario?’

Did some experimentation. Very likely specifying the worker explicitly was what was causing the lack of resiliency. The simulation is now proceeding even if I completely disconnect a node (and hence all its workers) from the network. Naturally I do not yet know the real reason why the worker was becoming invisible in the cluster. But if it was indeed network related, this should be a potential solution.

1 Like

By default the logs are piped to stderr, you’ll need to redirect them someway, but I’m not sure how with dask-ssh. You could also try to configure logging.

Yes it should, and obviously it did considering your last message.

Yes!

Yes, it’s a very hard constraint, and Daks has no way to schedule the task anywhere else if this node cannot be contacted, but maybe an error or timeout should be raised.

The changes made after opening this question, seem to be working quite nicely. Yet inexplicable things do keep happening. Once the cluster drops, without persisting the logging information, this detail is lost forever. I would like some further information about persisting the logging information.

How can we redirect stderror generically, for the nanny, workers, scheduler, etc? For e.g. have worker.log, nanny.log, scheduler.log, etc, at least for error cases.
I am already redirecting stderror in some of my custom exception cases. I would have a “try except” and redirect the error into a log file. How is this done for Dask? I know you said you are not sure for SSHCluster, is there anyone else who might know?

This (from your link) also seems interesting:

 logging:
  version: 1
  handlers:
   file:
      class: logging.handlers.RotatingFileHandler
      filename: output.log
      level: INFO
    console:
      class: logging.StreamHandler
      level: INFO
  loggers:
    distributed.worker:
      level: INFO
      handlers:
        - file
        - console
    distributed.scheduler:
      level: INFO
      handlers:
        - file
        - console

Where would this be configured, programmatically?

In the link on Logging, one of the way is explained:

You can control the logging verbosity in the Configuration, for example, the ~/.config/dask/*.yaml files

So you’ll need to create a yaml file on each of your host, maybe not that easily. And this is clearly not programmatically… This is almost surely doable by using Python logging module correctly, you might have to run the logging module configuration on all workers though.
You can also modify the Dask configuration programmatically on your main process side, but I’m not sure if and how this can be forwared to Workers before they start…

You can also try the forward logging method, but this could overwhelm your client machine a bit.