Configuring Scheduler Extensions

I’d like to replace the work stealing extension with a modified version. Is there a recommended way to configure the set of extensions that Scheduler uses?

Answering my own question after a bit of fiddling - I can disable the default work stealing with a config setting and then add its replacement with a preloaded module

import click
import dask
from distributed.cli.dask_scheduler import main
from distributed.stealing import WorkStealing


class OrganisationAwareWorkStealing(WorkStealing):
    def potential_thieves_for(self, ts, idle, sat):
        return [ws for ws in idle if self.org_aware_can_steal(ws, ts, sat)]

    def org_aware_can_steal(self, thief, ts, victim):
        return False if _org_of(thief) != _org_of(victim) else self.can_steal(thief, ts, victim)


def _org_of(worker):
    orgs = [item for item in worker.resources if item.startswith('org')]
    return orgs[0] if orgs else None


@click.command()
def dask_setup(scheduler):
    scheduler.add_plugin(OrganisationAwareWorkStealing(scheduler))


if __name__ == '__main__':
    with dask.config.set({
        'distributed.scheduler.work-stealing': False,
        'distributed.scheduler.preload': __file__}
    ):
        main()

Note that this example needs a hacked version of WorkStealing - viz Remove duplication from stealing (was Allow subclasses to control work stealing) by dmcg · Pull Request #5787 · dask/distributed · GitHub

2 Likes