Metaflow + Dask over k8s how to make the cluster to not shut down?

Hello, first thanks for the great job on Dask. I am trying to use metaflow & dask on k8s. I am using the k8s dask kubernetes operator. I would like to have an initial step in metaflow that creates a dask cluster that will be used by all next steps with the following code:

            cluster = KubeCluster(
                name=self.dask_cluster_name,
                n_workers=2,
                resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "2Gi"}},
                env={"EXTRA_PIP_PACKAGES": "s3fs==2023.12.2"},
                shutdown_on_close=False,
            )

it works as intended but as soon as the pod that ran the start step and created the dask cluster is over the cluster is closed. I would like to find a way to make the cluster persistent until I explicitly call the close function in my final metaflow step. How would I be able to do that ?

Hi @Benjamin, welcome to Dask Discourse forum!

I’m not really familiar with Dask Kubernetes, but I would assume that this is what shutdown_on_close should achieve, but it sound like it doesn’t.

One way could be to start a KubeCluster or a Helm cluster using kubectl directly, but *I’m not sure this should be needed.

cc @jacobtomlinson.

Hi @guillaumeeb thanks for your answer.

I thought too that the shutdown_on_close parameter should achieve this but yes it does not seem to be the case. It is as if the KubeCluster lifetime is tied to the lifetime of the pod that created it.

Sure, I could start the cluster from outside and give the scheduler address as a parameter to the metaflow steps but I think it won’t be as nice. I was seeking to expose a kubernetes cluster to many developpers that will be able to run independent metaflow dags with independent dask clusters that would be dynamically created, managed and deleted by each metaflow dags.

Setting shutdown_on_close should allow the cluster to persist after the process exits. If it isn’t behaving like this then it’s probably a bug.

Out of interest does it behave correctly if your set the paramater after creation?

cluster = KubeCluster(
    name=self.dask_cluster_name,
    n_workers=2,
    resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "2Gi"}},
    env={"EXTRA_PIP_PACKAGES": "s3fs==2023.12.2"},
)
cluster.shutdown_on_close = False

Either way could you open an issue on GitHub so we can investigate?

Thanks @jacobtomlinson.

I can confirm that the behavior is the same if the parameter is set after creation.

I have created the issue there: