I am struggling with this odd situation for quite some time. I hope I can get some help around.
My issue is quite simple: I have N long-running tasks (12~24 hours each) which I want to parallelize using dask distributed on an HTCondor cluster. Problem is that I do not manage to get a large (N/2) number of workers fully occupied as the dask scheduler is assigning two or more tasks to each worker, leaving some workers idle.
Detailed description of the task: hyperparmeter tunning using scikit-learn (110 combinations to fit and score) running on 50 workers.
I have played around with the following parameters/config options.
- joblib’s
pre_dispatch
parameter - worker’s saturation
- work-stealing
- worker’s resources
No matter what I do after running for a couple of minutes, I get workers with two or more tasks scheduled, while some workers are idle. See image.
Here’s what I figured out so far:
- higher
pre_dispatch
values will send more jobs to dask, resulting in an even worse behaviour than in the image above. - even though each worker has a single “worker_futures” resource and each delayed call requires one, more than one task can be assigned to each worker.
- disabling work-stealing creates a situation in which all tasks are assigned to single worker.
- worker-saturation of 1.0 does not correct the behaviour. Neither does disabling queuing (“inf”)
This is the code I’m using:
dask.config.set(
{
"distributed.scheduler.worker-saturation": "inf",
"distributed.scheduler.work-stealing": True,
}
)
cluster = HTCondorCluster(
cores=1,
memory="16GB",
disk="4GB",
processes=1,
submit_command_extra=["-name", "head2.htc.inm7.de"],
log_directory=Path() / "htcondor_logs",
worker_extra_args=["--resources worker_futures=1"],
)
cluster.scale(jobs=n_dask)
# # connect client to your cluster
client = Client(cluster)
logger.info(f"Dask Dashboard at {client.dashboard_link}")
start_dask = time.time()
with joblib.parallel_backend(
backend="dask", wait_for_workers_timeout=2**30, n_jobs=n_dask
):
with dask.annotate(resources={"worker_futures": 1}):
# Run cross_validate with a heavy model and a GridSearchCV with 110 combinations in the grid
I remember in the past that I had similar issues with celery. The problem for me is in the scheduler, which is assigning more than one task to each worker. I understand that this is preferable with small tasks as it keeps the worker throughput high. But in my case, wrongly assigning the task to a bussy worker can results in days of waiting.
Does anyone have any idea on how this behaviour can be changed?
My next step is extending a custom scheduler, but I was hoping to get some ideas on this aspect.