Scheduler not saturating workers

Hello,

I have been having very bad issues with the dask scheduler. I recently upgraded from dask/distributed version: 2022.6.1-py3.8 to 2023.7.0-py3.9 (while solving Workers constantly dying). I have been using dask to call subprocesses for long running exe simulations which worked quite well. After upgrading it seems to not behave even in the slightest.

No matter what settings I change, I can not seem to keep all workers working. It seems like the scheduler puts jobs on specific workers, and leave other workers completely idle until I resubmit my job workload. It seems to start off okay but eventually most workers start to sit idle. I have tried tuning some of the scheduler configuration to various values with no luck. Some of these probably have zero effect but these were the values I initially started varying.

os.environ['DASK_DISTRIBUTED__SCHEDULER__ACTIVE_MEMORY_MANAGER__START'] = 'True'
os.environ['DASK_DISTRIBUTED__SCHEDULER__ACTIVE_MEMORY_MANAGER__INTERVAL'] = '1s'
os.environ['DASK_DISTRIBUTED__SCHEDULER__ACTIVE_MEMORY_MANAGER__MEASURE'] = 'optimistic'
os.environ['DASK_DISTRIBUTED__COMM__RETRY__COUNT'] = '0' # Default == 0
os.environ['DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN'] = '1s' # default == '1s'
os.environ['DASK_DISTRIBUTED__DEPLOY__LOST-WORKER-TIMEOUT'] = '15s' # default == '15s'
os.environ['DASK_DISTRIBUTED__SCHEDULER__ALLOWED-FAILURES'] = '3' # default == 3
os.environ['DASK_DISTRIBUTED__SCHEDULER__WORKER-TTL'] = '5 minutes' # default == '5 minutes'
os.environ['DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING'] = "True"
os.environ['DASK_DISTRIBUTED__SCHEDULER__DEFAULT_TASK_DURATIONS__SWEEP'] = '5min'
os.environ['DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION'] = '1.1'
os.environ['DASK_DISTRIBUTED__SCHEDULER__UNKNOWN_TASK_DURATION'] = '1s'
os.environ['DASK_DISTRIBUTED__SCHEDULER__TRANSITION_LOG_LENGTH'] = '10000'
os.environ['DASK_DISTRIBUTED__SCHEDULER__EVENTS_LOG_LENGTH'] = '10000'
os.environ['DASK_DISTRIBUTED__SCHEDULER__DASHBOARD__STATUS__TASK_STREAM_LENGTH'] = '500'
os.environ['DASK_DISTRIBUTED__SCHEDULER__DASHBOARD__TASKS__TASK_STREAM_LENGTH'] = '50000'

Currently

The past

I have come across a lot of issues/documentation that might be helpful.

Similar to my problem I think

Came across but unsure if related

https://distributed.dask.org/en/latest/work-stealing.html
https://distributed.dask.org/en/latest/scheduling-policies.html

Any tips would be greatly appreciated!

Hi @nickvazz,

Question is: could you come up with some minimum reproducer?

One of the big changes that might affect you between the two versions is:

You could try to use the previous behavior by using:

with dask.config.set({"distributed.scheduler.worker-saturation": "inf"}):

Hi @guillaumeeb,

Thanks for the quick response! I cant seem to get a minimum reproducer, but this looks like it roughly does the same thing: Root-ish tasks all schedule onto one worker · Issue #6573 · dask/distributed · GitHub but with long running tasks (roughly 3-45 minutes).

I tried

with dask.config.set({"distributed.scheduler.worker-saturation": "inf"}):

But the issue still persists.

Something that might be helpful is a little more information about how I am using the workers. I am starting N-workers on each machine each with one thread and 4GB of memory. The workers themselves never run into memory issues but I think that is because each of these simulations is called as a subprocess from within the worker, so it does not seem to be aware of the memory use of the .exe it is calling. Might this be partially to blame for why backlogged work isnt moving to idle workers?

Another thought I had was that maybe I could force queuing for all tasks greater than the number of workers*threads, i.e. 3 workers, 1 thread each on 5 machines → 15 workers – start tasks 1-15 and queue tasks 15-M. Does that seem possible?

This sort of thing happens

First set of jobs finishes, but then they all get thrown to a specific worker for some reason

You shouldn’t have to do this.

Could you share at least some code snippet that shows how you are submitting tasks to Dask?

Roughly what I am doing is

import glob
import subprocess
import os

from distributed import Client

client = Client('tcp://scheduler')

def get_priority_of_sim_file(sim_file):
     return priority

def run_sim(sim_file):
     p = subprocess.Popen(f"sim.exe {sim_file}")
     result, error = p.communicate()
     return result

sim_files = glob.glob("/some/directory/*.py")
sim_priorities = list(map(get_priority_of_sim_file, sim_files))

futures = []
for sim_file, priority in zip(sim_files, sim_priorities):
    key = os.path.basename(sim_file)
    future = client.submit(run_sim, sim_file, key=key, priority=priority)
    futures.append(future)

results = client.gather(futures)

Hi @nickvazz,
your pseudocode looks very simple. Do you think you could mock the output of get_priority_of_sim_file to make it produce a realistic distribution, as well as mock run_sim with a time.sleep with durations that mimick your actual tasks? Since I’m seeing <100 tasks, you could easily measure the two in your real code and hardcode them in a list.

You definitely should not need to tamper with the dask config or absolutely anything else for this algorithm to work efficiently.

Hi @crusaderky,

Usually we run anywhere from 1k-30k simulations like that taking anywhere from 2 minutes to 30 minutes each over about 250 cores (single threaded).

I tried just force restarting the cluster and resubmitting the jobs in its own subprocess over and over and got about 50% throughput.

I tried switching to a client.map rather than submitting each simulation individually (losing their key info and being about to prioritize them) and it didnt quite work.

Our simulation leaves around some files and if force killed does not clean up after itself (after having hard killed the dask workers a fair number of times). After manualy cleaning these files, it seems to generally work as expected now which is very confusing after a week and a half of it misbehaving. SO I have no idea what was wrong or why it would seemingly hang the cluster. Perhaps the dask-worker folder holds information about task duration? Anyway, thanks for checking in. Hopefully things stay smooth!

1 Like