Deploying Multiple Worker Types with Helm

I am getting more and more familiar with dask and have successfully deployed with helm with GKE. It is such a game-changer and we are so excited to be using this.

As a machine learning engineer, I would like to run specific operations on specific node types. For example, it would be nice to pull down zarr/xarray datastores, run transforms on cpu machines, and transfer to GPU nodes for inference.

I have found this page Worker Resources — Dask.distributed 2022.01.0+2.g922d9872 documentation that seems to describe how to select with resources to pick for certain tasks. Something like

from distributed import Client
client = Client('scheduler:8786')

data = [client.submit(load, fn) for fn in filenames]
processed = [client.submit(process, d, resources={'GPU': 1}) for d in data]
final = client.submit(aggregate, processed, resources={'MEMORY': 70e9})

However, I am not sure how to deploy multiple worker types with helm. It feels natural to me that worker types could be defined in the values.yaml file like:

scheduler:
  name: scheduler  # Dask scheduler name.
  enabled: true  # Enable/disable scheduler.
  image:
    repository: "daskdev/dask"   # Container image repository.
    tag: 2022.1.0 # Container image tag.
    pullPolicy: IfNotPresent  # Container image pull policy.
    pullSecrets:  # Container image [pull secrets](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/).
  ...
  
worker:
  name: cpu-worker-group  # Dask worker name. # can we make multiple with different node pools?
  image:
    repository: "daskdev/dask"  # Container image repository.
    tag: 2022.1.0  # Container image tag.
    pullPolicy: IfNotPresent  # Container image pull policy.
    dask_worker: "dask-worker --name cpu-worker"  # Dask worker command. E.g `dask-cuda-worker` for GPU worker.
    pullSecrets:  # Container image [pull secrets](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/).
    #  - name: regcred
  ...
  nodeSelector:
    cloud.google.com/gke-nodepool: name-of-cpu-node-pool

  
worker:
  name: gpu-worker-group  # Dask worker name. # can we make multiple with different node pools?
  image:
    repository: "daskdev/dask"  # Container image repository.
    tag: 2022.1.0  # Container image tag.
    pullPolicy: IfNotPresent  # Container image pull policy.
    dask_worker: "dask-cuda-worker --name gpu-worker"  # Dask worker command. E.g `dask-cuda-worker` for GPU worker.
    pullSecrets:  # Container image [pull secrets](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/).
    #  - name: regcred
  ...
  nodeSelector:
    cloud.google.com/gke-nodepool: name-of-gpu-node-pool

Ideally, multiple worker types would be available, but I am having a hard time finding anything about this with a helm deployment. When I try to deploy with the above values.yaml, I only see the first worker and the second seems to be ignored.

Also, it would be great to be able to choose a worker set for a specific task. Above, the second worker has name: gpu-worker-group and an option for a name in the worker command: dask-cuda-worker --name gpu-worker. I don’t think it makes sense to use --name here as it is used to name an individual instance of those workers from what I understand. However, anything like this would be great:

# either pass the worker set name with a pattern
future = client.submit(func, *args, workers=['gpu-worker*'])
# or specify a group name somehow
future = client.submit(func, *args, workers=['gpu-worker-group'])

I have seen that we can specify the host:port, but I don’t want to mess with dask’s ability to chose which worker might already have data, etc. I would like to just specify a worker type for things to run on to take advantage of hardware and resources differences. Ideally, I would have worker sets: small cpu resources, large cpu resources, gpu resources.

A few questions here:

  1. Is there a way values.yaml for a helm deployment can define multiple workers? This would be so nice and seems possible with Ray’s helm deployment.
  2. Is there a way we can select worker types for client.submit, client.map, etc? This seems like an analogy to the nodeSelector.
  3. If there is a way to do this, how would we scale? cluster.scale(5, worker_type='gpu-worker-group)?

Thanks for any thoughts!

Some relevant issues:

Thanks for opening this @ljstrnadiii.

This isn’t possible today, but is a great idea. I’ve opened an issue on GitHub to track this request.

4 Likes

This is now possible! The Helm Chart can have additional worker groups configured with different resources.

Check out the full writeup here How to run different worker types with the Dask Helm Chart

2 Likes