After deploying dask kubernetes I had an issue with a user due to a big memory usage at the “gathering” step. We understood in the case of Dask Kubernetes deployement that the scheduler gather all the data in the POD memory. So after multiple OOM Killed (POD) we tried to set the parameter (direct_to_worker). How ever this parameter seems to not working in this context (deployement with the helm Dask Kubernetes).
I would like to know if i made a mistake or if the feature is not available for Dask Kubernetes ?
There is no difference between Dask Kubernetes and any other Dask deployment mechanism. So it’s likely a problem with their use of Dask Distributed, rather than anything Kubernetes specific.
It would help to see a code example of what the user is trying to do.
I am almost sure the code of the student notebook is far from optimized (you are right). It’s why i wanted to use a pod with enougth memory for is jhub session and it would work (during the time to review the code). However I try to know if the parameter direct_to_workers is supposed to work with a dask client using a Kubernetes Cluster configuration ?
@Hugo54 is a colleague of mine at CNES, we were testing an OVH cloud deployment with some (non optimized and complex) user code, performing a huge gather at some point. Even using direct=True kwarg in gather (or so we were told), we were seing a Scheduler pod restart due to memory. We supposed at some point that this kwarg was not working, maybe because of networking complexity in K8S preventing the Jupyter user pod to directly access Dask workers.
However, I found time to try a much simpler example with Dask latest.py3.12 docker images:
from dask_kubernetes.operator import KubeCluster
from dask.distributed import Client
import numpy as np
cluster = KubeCluster(
namespace="dask-operator",
image='ghcr.io/dask/dask:latest-py3.12',
name="test-direct",
n_workers=4,
resources={
"requests": {
"cpu": "1",
"memory": "2Gi"
},
"limits": {
"cpu": "1",
"memory": "2Gi"
}
}
)
client = cluster.get_client()
# Build some consequent Numpy array
def create_numpy():
return np.random.random((10000, 10000))
futures = [client.submit(create_numpy, pure=False) for _ in range(6)]
results = client.gather(futures, direct=True) #Works
results = client.gather(futures) #Network error due to scheduler pod restarting
This example seems to indicate that, at least with latest versions, direct=True kwarg works well with Dask Kubernetes.
If the Jupyter Pod is in the same Kubernetes cluster as the Dask cluster then there’s no reason why it shouldn’t be able to connect directly to the workers. Unless some network policy is restricting it. But in that case I would expect to see errors.