Hey guys.
I use dask and distributed packages to run automation in my project.
To speed up test execution (the tests cannot be run on a single environment in parallel), I spin up multiple VMs (dask workers) and then create a queue of tests (tasks) and assign them to workers until the tasks are depleted. Each task is unique and run once unless it failed (then it’s rerun).
This works perfectly but I noticed that using priorities may speed up execution significantly.
I tried to use dask priorities but ultimately failed. Here are my findings.
I submit the tasks by client.submit() and I set the priority (the priorities are temporarily hardcoded for each test)
I tested it on a small scale (10 tasks, 2 VMs) and I already found two issues:
- All of the tasks are consumed immediately by the first VM created; I would like to have no more than one task assigned to a VM at once - this would ensure that priorities are balanced better.
- Dask dashboard shows that priorities have been assigned correctly, but only tasks on the first VM are run according to the priority. The priorities seems to be ignored on the second VM (the one that stole tasks from the first).
- I would prefer assigning only one task per VM instead of stealing, but if I were really to use stealing, than it should grab the highest priority tasks that are not being processed (now it grabs random tasks)
To resolve the first problem, I set distributed.scheduler.worker-saturation
to 1.0 but it didn’t help, all of the tasks are consumed at once.
I tried using fifo-timeout
to deal with the second one, but it wasn’t helpful (I thought that fifo-timeout is taken into consideration when stealing).
I wonder if the general setup might be the culprit?
- The VMs are identical (they are created dynamically from the same snapshot).
- I use the 2024.12.1 dask and distributed versions
The process looks as follows:
- The agent (host) VM creates tasks (not dask → asyncio.create_task()) to create VMs from snapshots. Please note that spinning up a VM and then setting it up takes several minutes, so the tasks are created before the workers exist.
1.1. Register set up VMs as workers with cmd (nthreads=1).
- Next, the agent schedules tests asynchronously:
2.1. Create a list of futures
2.2. Append (client.submit(test)) each test to the list of futures
2.3. Run as_completed to gather futures; if there are failed tests, readd them to the futures list
- If there are any existing workers, they start to pick up the tasks
With the current behaviour I haven’t encountered a single situation where a lowest priority task was run last. My task lengths vary from between 20 seconds to 3 minutes; the whole runtime can increase even by 50% if the tasks are ordered improperly.
Has anyone experienced anything similar? Is it a problem with setup or is it a bug?
Is there any way to circumvent the problem and making the priority work?
Hi @Neotroglodyte, welcome to Dask community!
As you put it, this seems like a bug, however, I wasn’t able to reproduce it. I tried on a LocalCluster with the following code:
from dask.distributed import Client
from dask.distributed import as_completed
import time
client = Client(n_workers=2)
def my_func(t):
time.sleep(t)
return t
fut = []
#low prio
for i in range(20):
fut.append(client.submit(my_func, 5, priority=-10, pure=False))
client.cluster.scale(4)
#High prio
for i in range(10):
fut.append(client.submit(my_func, 10, priority=10, pure=False))
for future, result in as_completed(fut, with_results=True):
print(f"Result: {result}")
Which outputs:
Result: 5
Result: 5
Result: 5
Result: 5
Result: 10
Result: 10
Result: 10
Result: 10
Result: 10
Result: 10
Result: 10
Result: 10
Result: 10
Result: 10
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
Result: 5
On the dashboard, I can see that Workers are assigned at most 2 tasks, which ensures that higher priority tasks are run after the first batch of lower priority ones are executed. Which is why the worker-saturation mechanism was intended for.
Could you try to build a reproducer for your problem?
Hey @guillaumeeb !
I took freedom to use your code.
I reproduced the issue with the following piece:
from dask.distributed import Client, LocalCluster
from dask.distributed import as_completed
import time
def my_func(t):
time.sleep(t)
return t
def run_cluster():
cluster = LocalCluster()
client: Client = cluster.get_client()
time.sleep(5) # I used it to have time to hook up tot he dashboard
try:
futures = []
for i in [64,32,16,8,4,2,1]:
futures.append(
client.submit(my_func, i, key=f"my_func{i}", priority=i, pure=False
))
time.sleep(1) # wait a second after submitting tasks
client.cluster.scale(n=1,cores=1)
time.sleep(1) # wait after creating the first worker
client.cluster.scale(n=2,cores=1)
for future, result in as_completed(futures, with_results=True):
print(f'Result {result}')
except:
pass
cluster.close()
if __name__ == '__main__':
run_cluster()
The output was:
Result 8
Result 32
Result 64
Result 1
Result 2
Result 4
Result 16
In your example, how many workers/threads do you have when first creating the LocalCluster?
No workers at all.
This resembles the actual use case: in the pipeline, I create an agent with a scheduler first, then create tasks and VMs (workers) in parallel - so the workers are created several minutes after the tasks are submitted.
In your example, you run LocalCluster() without any args. By default, this will infer a default number of Workers based on some heuristic. Are you sure you don’t have any workers started at the beginning?
This is intentional, but I was not aware that LocalCluster might have created a default number of workers.
Better (closer to reality) reproduction steps are below. They don’t utilise LocalCluster. Instead, I use CLI to run the scheduler and workers.
See the examples in the posts below.
In this example I use resources. It seems they cause the problem.
Failing sequence:
- Run
dask scheduler --port 8786 --scheduler-file info.txt --dashboard-address :8787 --show
- Run python code:
import asyncio
import time
from dask.distributed import Client
from dask.distributed import as_completed
def my_func(t):
print(f'sleep {t}')
time.sleep(t)
return t
async def main():
client = await Client(address='tcp://10.249.224.32:8786', name="client", asynchronous=True, timeout=180)
futures = []
for i in range(1,21):
futures.append(
client.submit(my_func, i, key=f"my_func{i}", priority=i, pure=False, resources={'WIN10':1, 'CLOUD':1, 'DB':1}
))
print('Ready for workers to register')
async for _, result in as_completed(futures, with_results=True):
print(f'Result {result}')
client.close()
result = asyncio.run(main())
- Start the first worker:
dask-worker $scheduler_address --name "worker-1" --resources "WIN10=1,CLOUD=1,DB=1" --nthreads 1
- Start the second worker:
dask-worker $scheduler_address --name "worker-2" --resources "WIN10=1,CLOUD=1,DB=1" --nthreads 1
First worker output:
sleep 12
sleep 18
sleep 17
sleep 16
sleep 13
sleep 3
sleep 1
sleep 9
sleep 7
Second worker output:
sleep 19
sleep 4
sleep 14
sleep 8
sleep 10
sleep 11
sleep 2
sleep 6
sleep 5
I cut out the resources here. The prioritization worked correctly:
- Run
dask scheduler --port 8786 --scheduler-file info.txt --dashboard-address :8787 --show
2.Run python code:
import asyncio
import time
from dask.distributed import Client
from dask.distributed import as_completed
def my_func(t):
print(f'sleep {t}')
time.sleep(t)
return t
async def main():
client = await Client(address='tcp://10.249.224.32:8786', name="client", asynchronous=True, timeout=180)
futures = []
for i in range(1,21):
futures.append(
client.submit(my_func, i, key=f"my_func{i}", priority=i, pure=False#, resources={'WIN10':1, 'CLOUD':1, 'DB':1}
))
print('Ready for workers to register')
async for _, result in as_completed(futures, with_results=True):
print(f'Result {result}')
client.close()
result = asyncio.run(main())
- Start the first worker:
dask-worker $scheduler_address --name "worker-1" --nthreads 1
- Start the second worker:
dask-worker $scheduler_address --name "worker-2" --nthreads 1
Output:
First worker:
sleep 18
sleep 17
sleep 14
sleep 13
sleep 10
sleep 9
sleep 6
sleep 5
sleep 2
sleep 1
Second worker:
sleep 16
sleep 15
sleep 12
sleep 11
sleep 8
sleep 7
sleep 4
sleep 3
OK nice, I see you also updated your issue on github, and I guess this is a bug between work stilling, priorities and resources, so you’ll get better feedback on github! Thanks for raising this.
Hah, hopefully the team will fix that!
Do you think there are any work arounds possible here for the time being?
Well, I guess the simplest would be if possible to avoid using resources and priorities together…
- All of the tasks are consumed immediately by the first VM created
This doesn’t sound right. By default in 2024.12.1 each worker should consume 1.1x the number of threads available, rounded up. If you set dask.config.set({"distributed.scheduler.worker-saturation": 1.0})
you will disable worker-side queueing completely, and with that most of the need for stealing. If you want to completely disable work stealing you can set dask.config.set({"distributed.scheduler.work-stealing": False})
.
Losing priorities on steal sounds like a bug.