Change Number of Workers During Runtime

Hello Dask Community,

TLDR: I was wondering if there is a way to change the number of workers during runtime when you are using a client/scheduler/worker setup? Preferably using the client to start up one or more new workers.

I came across this stack overflow post talking about adaptive cluster sizing that looked promising but it seems client.cluster == None when the client is instantiated with a scheduler address so this approach will not work.

Why:
TLDR: I would like to have an exact number of workers used in one part of my process then either add a specific number of workers or change the min and max number of workers and let dask autoscale on it’s own for the rest of my process.

I am querying data in parallel using trino and dask delayed to load the data into a dask dataframe. When querying data I want to have a limited number of workers so that it doesn’t overwhelm trino which will cause all my queries to fail if the # of queries in the queue goes past it’s limit. Once the data is queried and persisted into memory, I would like to scale up the number of workers to complete the rest of the work faster.

Current Non-Ideal Workaround:
I am currently starting up the dask cluster with extra workers and then using client.scheduler_info() to get a list of the available workers and only using a limited number of them in the persist operation that will trigger the querying of my data from trino:

selected_workers = list(client.scheduler_info()['workers'])[:4]
ddf = dask.persist(ddf, workers=selected_workers)[0]

This works because I can have lets say 6 workers total but the querying will only use 4 of them. This isn’t ideal though because there are a lot of workers sitting idle and taking up resources when they aren’t needed yet.

I have looked over as much documentation as possible but I cannot seem to find the answer to this question. I don’t see any available methods on the client class that seem like they would accomplish this task. I am hoping dask gives us the customizability to do something like this.

Thank you in advance to anyone who is able to help or point me in the right direction!

Hi @Kaegan,

How are you starting the Dask Cluster at first. It sound like you’re not using a cluster manager of some kind. But if this is the case, how would you want the Client to know how it can start new workers?

Hello @guillaumeeb,

Thank you for the response! I am still pretty new to dask so I don’t understand the underlying configurations (using a cluster manager vs. connecting to a scheduler etc.) entirely. I am using dask as a managed service as apart of an analytics application from an outside company. Right now I start up a cluster using a python API that they provided us. I am able to configure the cluster however I want, set min/max replicas, cpu/memory allocated to each worker, etc. then when I start up the cluster I get the scheduler address that I can use to connect to a client.

I was hoping the client could communicate with the scheduler to increase or decrease the number of workers as needed but based on your response it seems the schedulers only ability is to delegate work. I was hoping that because you can auto scale using min/max replicas there would be a way to manually say I want this many workers for the first part of my process then increase the number for a separate part of the process.

Is there a different setup or configuration that you would recommend for this use case? Sorry I know the information I have provided is limited. Because they have abstracted the cluster startup process into a python API I am not sure exactly what they are using underneath but I could ask them to get more information or go through the code myself.

I seriously appreciate any advice you can offer! Thank you!

Does this Python API provide some kind of way to modify the size of the cluster once it is created? Usually with Dask distributed, you use cluster objects and have a scale method you can use to modify the number of workers (see https://github.com/dask/distributed/blob/main/distributed/deploy/cluster.py#L268). This is what is using the Autoscaling feature of Dask for adapting to the incoming number of tasks to process.

Maybe this would be the best approach to better understand if there are limitations or if all you need is already there (e.g. they implemented the Cluster API).

Another solution that comes to my mind reading your post again: if you can configure min/max workers, can you just configure the minimum to 4, use your workaround to submit work to only 4 workers at first, and then let it scale when reaching the second part of your workflow?

Hey @guillaumeeb

Thank you so much for your explanations and suggestions!! I will look into the python API more and dive into the source code or contact the company to figure out what is going on. In the meantime I am testing out how well using the min/max replicas approach will work with my current workaround. I’ll post an update here if it would be relevant to others.

Thank you again for your help!

1 Like