Understanding Work Stealing

Yes, this is part of some work on distributed data across datacenters, regions or organisations, and having the compute tied as far as possible to the place where the data resides.

Instead of using workers we are (ab)using resources. So each worker in an organisation is assigned that organisation as a resource on startup:

dask-worker $schedule_host --resources "org-$org_name=1"

Then we can pin computations to an organisation by annotating with the required organisation as resource:

def in_org(name):
    return annotate(resources={f'org-{name}': 1})


with in_org('metoffice'):    
    predictions = xarray.open_dataset('/data/metoffice/000490262cdd067721a34112963bcaa2b44860ab.nc').chunk({"latitude": 10})
        
with in_org('eumetsat'):
    measurement = xarray.open_dataset('/data/eumetsat/measurements.nc').chunk({"latitude": 10})

averages = predictions.mean('realization', keep_attrs=True)

diff = (averages.height[0] - comparison_dataset).to_array()[0,...,0]
    
    
diffed = diff.compute(optimize_graph=False)

This works well (provided all the workers can see each other) but doesn’t prevent work stealing between organisations, which would be costly in terms of data transfer. We can disable work stealing altogether, but that makes computation in an organisation less effective. So…

… I’d like to gain more control over work-stealing. Something like:

from distributed.stealing import WorkStealing, _can_steal


class OrganisationAwareWorkStealing(WorkStealing):
    def potential_thieves_for(self, ts, idle, sat):
        return [ws for ws in idle if _can_steal(ws, ts, sat)]


def can_steal(thief, ts, victim):
    return False if org_of(thief) != org_of(victim) else _can_steal(thief, ts, victim)


def org_of(worker):
    orgs = [item for item in worker.resources if item.startswith('org')]
    return orgs[0] if orgs else None

where potential_thieves_for is a new method in WorkStealing:

    def potential_thieves_for(self, ts, idle, sat):
        if _has_restrictions(ts):
            return [ws for ws in idle if _can_steal(ws, ts, sat)]
        else:
            return  idle

designed as a point of inflection for this and other applications.

Does this make sense? If so, it would be helpful to know how to install OrganisationAwareWorkStealing - Configuring Scheduler Extensions