Limit number of queued tasks per worker

I am struggling with this odd situation for quite some time. I hope I can get some help around.

My issue is quite simple: I have N long-running tasks (12~24 hours each) which I want to parallelize using dask distributed on an HTCondor cluster. Problem is that I do not manage to get a large (N/2) number of workers fully occupied as the dask scheduler is assigning two or more tasks to each worker, leaving some workers idle.

Detailed description of the task: hyperparmeter tunning using scikit-learn (110 combinations to fit and score) running on 50 workers.

I have played around with the following parameters/config options.

  • joblib’s pre_dispatch parameter
  • worker’s saturation
  • work-stealing
  • worker’s resources

No matter what I do after running for a couple of minutes, I get workers with two or more tasks scheduled, while some workers are idle. See image.

Screenshot 2024-08-27 at 13.42.11

Here’s what I figured out so far:

  • higher pre_dispatch values will send more jobs to dask, resulting in an even worse behaviour than in the image above.
  • even though each worker has a single “worker_futures” resource and each delayed call requires one, more than one task can be assigned to each worker.
  • disabling work-stealing creates a situation in which all tasks are assigned to single worker.
  • worker-saturation of 1.0 does not correct the behaviour. Neither does disabling queuing (“inf”)

This is the code I’m using:


    dask.config.set(
        {
            "distributed.scheduler.worker-saturation": "inf",
            "distributed.scheduler.work-stealing": True,
        }
    )

    cluster = HTCondorCluster(
        cores=1,
        memory="16GB",
        disk="4GB",
        processes=1,
        submit_command_extra=["-name", "head2.htc.inm7.de"],
        log_directory=Path() / "htcondor_logs",
        worker_extra_args=["--resources worker_futures=1"],
    )
    cluster.scale(jobs=n_dask)

    # # connect client to your cluster
    client = Client(cluster)

    logger.info(f"Dask Dashboard at {client.dashboard_link}")

    start_dask = time.time()

    with joblib.parallel_backend(
        backend="dask", wait_for_workers_timeout=2**30, n_jobs=n_dask
    ):
        with dask.annotate(resources={"worker_futures": 1}):
            # Run cross_validate with a heavy model and a GridSearchCV with 110 combinations in the grid

I remember in the past that I had similar issues with celery. The problem for me is in the scheduler, which is assigning more than one task to each worker. I understand that this is preferable with small tasks as it keeps the worker throughput high. But in my case, wrongly assigning the task to a bussy worker can results in days of waiting.

Does anyone have any idea on how this behaviour can be changed?

My next step is extending a custom scheduler, but I was hoping to get some ideas on this aspect.

Hi @fraimondo, welcome to Dask Discourse forum!!

Interesting problem. Dask is optimized for short tasks by default, so it’s possible work-stealing has some limitations here. But maybe there is some way to make it work better.

Does it changes something if you set distributed.scheduler.unknown-task-duration to something like several hours?

cc @crusaderky