I am getting more and more familiar with dask and have successfully deployed with helm with GKE. It is such a game-changer and we are so excited to be using this.
As a machine learning engineer, I would like to run specific operations on specific node types. For example, it would be nice to pull down zarr/xarray datastores, run transforms on cpu machines, and transfer to GPU nodes for inference.
I have found this page Worker Resources — Dask.distributed 2022.8.1+6.gc15a10e8 documentation that seems to describe how to select with resources to pick for certain tasks. Something like
from distributed import Client
client = Client('scheduler:8786')
data = [client.submit(load, fn) for fn in filenames]
processed = [client.submit(process, d, resources={'GPU': 1}) for d in data]
final = client.submit(aggregate, processed, resources={'MEMORY': 70e9})
However, I am not sure how to deploy multiple worker types with helm
. It feels natural to me that worker types could be defined in the values.yaml
file like:
scheduler:
name: scheduler # Dask scheduler name.
enabled: true # Enable/disable scheduler.
image:
repository: "daskdev/dask" # Container image repository.
tag: 2022.1.0 # Container image tag.
pullPolicy: IfNotPresent # Container image pull policy.
pullSecrets: # Container image [pull secrets](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/).
...
worker:
name: cpu-worker-group # Dask worker name. # can we make multiple with different node pools?
image:
repository: "daskdev/dask" # Container image repository.
tag: 2022.1.0 # Container image tag.
pullPolicy: IfNotPresent # Container image pull policy.
dask_worker: "dask-worker --name cpu-worker" # Dask worker command. E.g `dask-cuda-worker` for GPU worker.
pullSecrets: # Container image [pull secrets](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/).
# - name: regcred
...
nodeSelector:
cloud.google.com/gke-nodepool: name-of-cpu-node-pool
worker:
name: gpu-worker-group # Dask worker name. # can we make multiple with different node pools?
image:
repository: "daskdev/dask" # Container image repository.
tag: 2022.1.0 # Container image tag.
pullPolicy: IfNotPresent # Container image pull policy.
dask_worker: "dask-cuda-worker --name gpu-worker" # Dask worker command. E.g `dask-cuda-worker` for GPU worker.
pullSecrets: # Container image [pull secrets](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/).
# - name: regcred
...
nodeSelector:
cloud.google.com/gke-nodepool: name-of-gpu-node-pool
Ideally, multiple worker types would be available, but I am having a hard time finding anything about this with a helm deployment. When I try to deploy with the above values.yaml
, I only see the first worker and the second seems to be ignored.
Also, it would be great to be able to choose a worker set for a specific task. Above, the second worker
has name: gpu-worker-group
and an option for a name in the worker command: dask-cuda-worker --name gpu-worker
. I don’t think it makes sense to use --name here as it is used to name an individual instance of those workers from what I understand. However, anything like this would be great:
# either pass the worker set name with a pattern
future = client.submit(func, *args, workers=['gpu-worker*'])
# or specify a group name somehow
future = client.submit(func, *args, workers=['gpu-worker-group'])
I have seen that we can specify the host:port, but I don’t want to mess with dask’s ability to chose which worker might already have data, etc. I would like to just specify a worker type for things to run on to take advantage of hardware and resources differences. Ideally, I would have worker sets: small cpu resources, large cpu resources, gpu resources.
A few questions here:
- Is there a way
values.yaml
for a helm deployment can define multiple workers? This would be so nice and seems possible with Ray’s helm deployment. - Is there a way we can select worker types for client.submit, client.map, etc? This seems like an analogy to the nodeSelector.
- If there is a way to do this, how would we scale?
cluster.scale(5, worker_type='gpu-worker-group)
?
Thanks for any thoughts!