Migrating from Classic to Operator: cross-K8s cluster schedulers

Hey all,

So, we have implemented a layer on top of the classic KubeCluster to allow running an heterogenous adaptive scheduler, with several different KubeClusters.
One of the cool thing it supports is scheduling jobs in AWS and on prem (or on another K8s cluster) within the same dask scheduler (let’s say easy and flexible access to fancy GPUs or to large RAM, or …)

It seems that the new operator API will be a nice way to support multiple groups, and maybe with a bit of customisation, adaptive scaling as well, but I’m not sure of what the best way would be to handle cross-cluster scheduling.
Any advice?

Hi @champialex, welcome to this forum!

So if I understand correctly, you have at least two separated Kubernetes clusters, and you developed some layer to address both cluster with a single Dask Scheduler.

I don’t think the new operator API can support multiple clusters currently. I think we will need @jacobtomlinson advice here.


Yup, that’s correct. And with adaptive scheduling, to only pay for EC2 hardware when needed (and for good resource utilisation in general).
It does seem a bit hard to get that to work with the operator.

Sorry for the delay in responding here. This is not a use case I had considered before.

Most folks create/destroy KubeCluster instances as and when they need them. I’m curious what the benefit of spanning one Dask cluster over multiple Kubernetes clusters is vs having many Dask clusters each in a single Kubernetes cluster?

No worries, thanks for your reply :slight_smile:

The main benefit is submitting a single dask graph, where different part of the graph can execute in different K8s clusters and allowing tasks to depend on each other.
Let’s say, fancy ML GPUs in AWS for some nodes in the graph, and regular things to do data collection & cleaning, all in a single dask graph.
Or let’s say, the on premise K8s cluster gets filled up, and then transparently move some workers to AWS so that the job can still execute.
It also makes it super easy to transparently retry a task that failed because it ran out of memory on a bigger worker.
Essentially more flexibility in where a particular task is run, depending on resource needs & availability, transparently for the end user, while leveraging dask’s Scheduler.

It’s very possible that I missed some fancy dask features as well, so if it is possible to implement that easily in native dask that would be perfect!

Also, to clarify, we currently have several KubeCluster, each adapting its workers needs based on what resources are required by the Scheduler. Currently each KubeCluster has a single types of resources and K8s cluster (eg. all workers in this KubeCluster have 500G of memory, 120 cores, in EKS).
Though that may need to change when we add the ability to transparently allocate workers to EKS or on-premise.
So the main benefit we get is having several KubeCluster, all managed by a single Scheduler.

Thanks for writing that up. All that motivation makes total sense.

What you’re describing is why we implemented DaskWorkerGroup resources in the operator. So a single DaskCluster can have multiple groups of workers with different node selectors, annotations, affinities, resources, etc.


However it sounds like you have multiple discrete Kubernetes clusters with some kind of network and DNS bridging between them? Rather than a single federated Kubernetes control plane spanning multiple clouds using something like Karmada.

Right now the way a DaskWorkerGroup in the operator connects the workers to a scheduler is using a cluster selector to find the scheduler Service. The limitation of this is that the scheduler is assumed to be running in the same namespace and managed by a DaskCluster object.

Perhaps we also need an alternative way to configure things so you can create a DaskWorkerGroup without a parent DaskCluster and configure the address of the scheduler manually?

No we don’t use Karmada. That seems like it could be nice, though it is another can of worms :slight_smile:

What about allowing people to configure the ApiClient set in daskworkergroup_replica_update from the spec (or from somewhere else), and pass in their own K8s configuration rather than relying on global configuration, so that a user can customize the Kubernetes context used by a worker?