Dask drops while sending data between actors with no error logs

Consider an SSHCluster of 58 workers on 8 nodes, where each worker is created as an actor. At some stage, actors send portions of their data to each other (consider it state information synchronization).

I am having a problem where after some time, the program stops abruptly but I have no error logs whatsoever. I do know however that it most likely happened when actor 30 was sending its data to the other workers.

I have a “send_results” method, and a “receive_results” method;
the send_results method is called from the client “n” times, where “n” is the number of workers;
each send_results method call, calls the receive_results method “n-1” times , i.e. once for every worker apart from itself;

(this might be inefficient, in fact each worker wastes an average of 3 seconds to send the data to the other workers, but it was necessary due to the single-threaded nature of Dask actors. An actor cannot send the data and be waiting for a response i.e. completion, while at the same time be processing the data coming from another actor).

both the send and receive methods are equipped with a try except block which looks something like the below:

try:
    # logic goes here
except Exception as e:
    with open(stack_trace_log_file_name, "w") as fi:
        traceback.print_exc(file=fi)
    raise

which in my opinion should always log to file locally, as well as propagate up the calling chain:

  • if receive_results fails, it would log an error on the local actor on which it crashed;
  • as well as propagate to send_results, which would in turn log an error on the local actor which called that remote instance of receive_results,
  • as well as propagate to the the client. the client has a similar mechanism to log any error generically. only it doesn’t raise again, it simply logs any caught error to file.

is there anything wrong with the error handling in this case? does not having any error logs in this case constitute a higher likelihood of a network overload issue? this is my only plausible explanation. also, if it wasn’t a network overload, what else could it be? Without any error logs I feel quite lost.

Here you mean you submit an Actor to each Worker? I’m not sure how you’ll do this, do you use another mechanism like plugins?

I don’t see anything wrong about the error handling, but your use case seems really complex, so it is hard to guess anything. Are you able to test with a LocalCluster? Could you try to build a reproducer or at least a simple example of what you are doing?

hi @guillaumeeb. thanks for your reply. yes indeed, I submit an actor to each worker. My initial wording was wrong.

This is how you create an Actor on a worker based on the documentation:

from dask.distributed import Client   
client = Client()

actors = []
for worker_url in worker_urls:
    future = client.submit(ActorClass, params, workers=worker_url, actor=True)
    actors.append(future.result())              

Unfortunately there is no simple way to provide a better example of what I am doing. It is quite complex. This however shows the concept of actor creation and is relevant for my new findings.

Since I asked the question I have dug deeper and identified a potential problem. It might be related to memory usage after all. I have analysed the memory and there are 2 points at which the scheduler seems to increase in memory considerably, in one case, it automatically cleans up once finished, and in another case it doesn’t.

I am using the client.register_worker_callback to load some static data; while the data is being added on each worker at the beginning, I can see the scheduler, gradually increasing in memory. Once all the workers stabilise, and are ready to start receiving tasks, the scheduler goes back to using normal memory.

Then when creating an actor on each worker as shown above (note the “params” array which includes a number of very large data structures which the actor would in turn hold on to in a stateful manner e.g. self.prop1, self.prop2, etc), a similar behaviour is observed, in fact, when seeing each actor being created on the Dask dashboard, I can see the memory increasing on the scheduler at the same time. However, when all the actors are created, the memory is not cleared, unlike the previous case. Consequently, before the heavy-lifting starts happening, there will only be a little memory available. It is therefore likely that if the memory increases slightly while the processing is ongoing, it would crash due to going out of memory.

I am conducting some tests on a single node with 3 workers just to be able to report here:

The client holds 3.8gb of memory;
The scheduler holds 4.1gb of memory;
And each worker holds 2.7gb of memory;

From the workers I suspect 1.8gb of memory is static data which is pre-initialized on each worker, and around 0.9gb is data that is sent along with each actor as parameters. The scheduler was at 2.9gb before creating the actors, and then increased by 0.9gb. This is exacerbated with an increasing number of workers. For e.g. in the main test, the main node, which has 38gb of memory, and was hosting the client, the scheduler, and 2 workers, was already close to 90% memory usage after starting 58 actors.

I get that we are keeping a reference to the actor; but what does Dask keep in the scheduler exactly? It is definitely not just a reference. The scheduler needs to “temporarily” store the parameters to send along with each actor’s creation, but does it have to keep it (or keep that much of it)? Is it failing to clean up this memory (e.g. properly dispose of the “working memory”), or is this intended? Is there a way to force it to clean-up this extra memory?

Honnestly, Actors is not a widely used feature, and your use case with one Actor per Worker each handling and exchanging heavy information was probably never thought of before. It is very well possible that you are running into some limitations.

Maybe you could try to use Client(..., direct_to_workers=True), but I’m not sure this will solve all of your problems.