I’d like to replace the work stealing extension with a modified version. Is there a recommended way to configure the set of extensions that Scheduler uses?
Answering my own question after a bit of fiddling - I can disable the default work stealing with a config setting and then add its replacement with a preloaded module
import click
import dask
from distributed.cli.dask_scheduler import main
from distributed.stealing import WorkStealing
class OrganisationAwareWorkStealing(WorkStealing):
def potential_thieves_for(self, ts, idle, sat):
return [ws for ws in idle if self.org_aware_can_steal(ws, ts, sat)]
def org_aware_can_steal(self, thief, ts, victim):
return False if _org_of(thief) != _org_of(victim) else self.can_steal(thief, ts, victim)
def _org_of(worker):
orgs = [item for item in worker.resources if item.startswith('org')]
return orgs[0] if orgs else None
@click.command()
def dask_setup(scheduler):
scheduler.add_plugin(OrganisationAwareWorkStealing(scheduler))
if __name__ == '__main__':
with dask.config.set({
'distributed.scheduler.work-stealing': False,
'distributed.scheduler.preload': __file__}
):
main()
Note that this example needs a hacked version of WorkStealing
- viz Remove duplication from stealing (was Allow subclasses to control work stealing) by dmcg · Pull Request #5787 · dask/distributed · GitHub
2 Likes