Generally, workers have only a private IP, WAN access and are on a separate network to the scheduler. They are spawned by a separate process, let’s ignore that for now.
The scheduler lives on a node with public IP and one open port to the outside world.
Looking at the Journey of a task, everything works well until Step7: Gather. Since the scheduler has no way to contact the workers directly, this step fails.
I’ve been looking at dask-gateway, and running multiple schedulers (one per cluster) but AFAIK, this also requires connectivity from gateway to scheduler, doesn’t it?
Finally, I had a look at plugins and came across the RabbitMQ example (distributed.dask.org/en/stable/plugins.html#rabbitmq-example).
Having a message queue on the open port could solve the issue if the scheduler could (for the gather step)
leave a message for the workers
the worker that has the result picks the msg up and pushes the result
the scheduler picks up the result from the msg queue
Before I go down that Rabbit hole, I wanted to make sure this is not a solved problem.
I might be simply missing the right keywords to look for or someone solved the problem in a different way.
I know of github.com/comp-dev-cms-ita/dask-remote-jobqueue, but would prefer not getting SSH involved.
Apologies for the improperly formatted links, but new users are only allowed 2
To be honnest, I’m even surprised it goes this far! Do you happen to have some Error stacktrace to share?
Yes, I think so.
I’m not sure you can achieve what you want with a Workerplugin. The example you are referring to only allows to handle task status changes differently. I don’t know if there is a way of also handling data transfer differently through a WorkerPlugin. But this might be a solution.
Anyways, I also don’t think there is currently a solution to this. Scheduler and Worker are supposed to be able to talk to each other, and with the Client too.
I’m sorry for all the uncertainties in my answer, this is a complex topic, and I hope other with better knowledge can chime in.
The scheduler needs to be able to contact the workers, and vice versa.
It is generally understood that they should be on the same LAN.
The only connection that is guaranteed to work through a firewall is client->scheduler.
Given that the network communication flow is a black box, anything is possible ;).
Once a connection is established from the worker side, the scheduler and workers can communicate in both directions (TCP). However, once the connection is dropped, the scheduler has no way to contact the workers.
Does anyone know how the communication in Dask works (in the form of an Alice ↔ Bob graph)?
I had a look at the gateway proxy code - in theory, if there is a way to keep the connections until the cluster is shut down, this issue can be fixed.
Network comms in dask use a connection pool. When an actor initiates an RPC call, it picks from the pool. If there are free already-established connections in the pool, they will be reused - regardless of who initiated them. Otherwise, a new one will be added and, once the RPC call is done, it will be added to the pool and reused.
When I say “surprised you got this far” I mean that you were extremely lucky not to hit a race condition where more than one RPC call is necessary at the same time - thus causing the scheduler to initiate a second TCP connection to the worker - until you reach gather.