Thanks all.
On further investigation we don’t see resource restrictions being ignored for the initial data loading tasks (these are not stolen). We could annotate downstream tasks to ensure that they are not stolen, but our trials show that with our modest change to the stealing, they don’t have to be. Which seems like a cheap useabilty win for very little effort.
The following Jupyter notebook shows our results.
Understanding Work Stealing
This sheet aims to show our problems with the current work stealing, and the behaviour of a system with our revised version.
It uses Docker containers to pretend to have 3 locations.
- This computer, where the client is running
- Metoffice
- Eumetsat
The idea is that tasks accessing data available in metoffice should be run there, and similarly with eumetsat. The algoritm should be defined, and results rendered, here.
We use Docker to host a scheduler and workers in such a way that all data in /data
is visible to the client machine (this one) for metadata visibility, but the metoffice container can only see /data/metoffice
, and eumetsat /data/metoffice
. This keeps our data separation honest.
Setup
First some imports
from time import sleep
import dask
from dask.distributed import performance_report, get_task_stream
import pytest
import ipytest
import xarray
import matplotlib.pyplot as plt
ipytest.autoconfig()
Clear out the cluster (docker stop" requires at least 1 argument
means that nothing was running)
%%bash
docker stop $(docker ps -a -q)
docker container prune --force
e395ef09a505
b0c29d9421c8
63f33eaf89f4
Deleted Containers:
e395ef09a50506098a46a88feb476de9317db9400db7ac73191236962e038e7e
b0c29d9421c8e1f11b65f039e30e749e21921509ae1248ef2367c2973b24397c
63f33eaf89f43f96ac2cb6b6fe7d4925fb75a8766ee0faf33067bc84c25f1cf2
Total reclaimed space: 66.78kB
Our Vision
Here we show an xarray calculation run on metoffice and eumetsat workers.
Run Up a Cluster
First run a scheduler. We’re going to run our version that prevents cross-organisation work-stealing.
The docker container is available at Docker Hub. It is basically conda install -c conda-forge python=3.10 dask distributed iris xarray bottleneck
and then apply some patches detailed later.
%%bash --bg
docker run --network host metoffice/irisxarray resource-aware-scheduler.py
Start 4 metoffice workers, in separate processes. These have a resource org-metoffice=1
%%bash --bg
scheduler_host=localhost:8786
org_name=metoffice
worker_cmd="dask-worker $scheduler_host --nprocs 4 --nthreads 1 --resources org-$org_name=1 --name $org_name"
docker run --network host --mount type=bind,source="$(pwd)"/$org_name-data,target=/data/$org_name --env ORG_NAME=$org_name metoffice/irisxarray $worker_cmd
Start 4 eumetsat workers, in separate processes, and with org-eumetsat=1
%%bash --bg
scheduler_host=localhost:8786
org_name=eumetsat
worker_cmd="dask-worker $scheduler_host --nprocs 4 --nthreads 1 --resources org-$org_name=1 --name $org_name"
docker run --network host --mount type=bind,source="$(pwd)"/$org_name-data,target=/data/$org_name --env ORG_NAME=$org_name metoffice/irisxarray $worker_cmd
And a client to talk to them - you can click through to the Dashboard on http://localhost:8787/status
from dask.distributed import Client
import dask
client = Client('localhost:8786')
/home/ec2-user/miniconda3/envs/irisxarray/lib/python3.10/site-packages/distributed/client.py:1096: VersionMismatchWarning: Mismatched versions found
+-------------+----------------+----------------+----------------+
| Package | client | scheduler | workers |
+-------------+----------------+----------------+----------------+
| dask | 2022.01.0 | 2022.01.1 | 2022.01.1 |
| distributed | 2022.01.0 | 2022.01.1 | 2022.01.1 |
| python | 3.10.2.final.0 | 3.10.0.final.0 | 3.10.0.final.0 |
+-------------+----------------+----------------+----------------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
Define Some Conveniences
Here are some utilities to work with workers in organisations
import os.path
from dask import annotate, delayed
def in_org(name):
return annotate(resources={f'org-{name}': 1})
@delayed
def my_org():
return os.environ['ORG_NAME']
@delayed
def tree(dir):
result = []
for path, dirs, files in os.walk(dir):
result = result + [path]
result = result + [os.path.join(path, file) for file in files]
return result
We can use them to see what workers can see what data
with in_org('metoffice'):
metoffice_data = tree('/data')
metoffice_data.compute()
['/data',
'/data/metoffice',
'/data/metoffice/000490262cdd067721a34112963bcaa2b44860ab.nc']
with in_org('eumetsat'):
eumetsat_data = tree('/data')
eumetsat_data.compute()
['/data', '/data/eumetsat', '/data/eumetsat/observations.nc']
And we can identify each worker on the task stream observable at http://localhost:8787/status
def show_all_workers():
my_org().compute(workers='metoffice-0')
my_org().compute(workers='metoffice-1')
my_org().compute(workers='metoffice-2')
my_org().compute(workers='metoffice-3')
my_org().compute(workers='eumetsat-0')
my_org().compute(workers='eumetsat-1')
my_org().compute(workers='eumetsat-2')
my_org().compute(workers='eumetsat-3')
show_all_workers()
Run A Computation
Here we pin accessing metoffice data to its workers, and eumetsat to its workers. After that we let Dask work things out.
%%time
with in_org('metoffice'):
dataset = xarray.open_dataset('/data/metoffice/000490262cdd067721a34112963bcaa2b44860ab.nc').chunk({"latitude": 10})
with in_org('eumetsat'):
comparison_dataset = xarray.open_dataset('/data/eumetsat/observations.nc').chunk({"latitude": 10})
averages = dataset.mean('realization', keep_attrs=True)
diff = averages.isel(height=5) - comparison_dataset
show_all_workers()
diffed = diff.compute(optimize_graph=False)
fig = plt.figure(figsize=(6, 6))
plt.imshow(diffed.to_array()[0,...,0], origin='lower')
CPU times: user 327 ms, sys: 66.6 ms, total: 393 ms
Wall time: 4.38 s
<matplotlib.image.AxesImage at 0x7fdd5016ab30>
We know that at the very least, only the right workers can load the data from file. After that, Dask’s preferring of workers with the data will prefer to run the mean
calculation on the metoffice. But when they get busy, then work stealing might be happening.
Let’s see whether we can observe any work stealing.
Work Stealing With Our Revisions
Remember that we are currently running our revised scheduler.
We’ll run just the averages bit. That should not be run on eumetsat, as the data isn’t there.
With the mildly hacked scheduler we’re running at the moment, that works fine - you can see it’s only run on the first 4 workers in the http://localhost:8787/status
%%time
with in_org('metoffice'):
dataset = xarray.open_dataset('/data/metoffice/000490262cdd067721a34112963bcaa2b44860ab.nc').chunk({"latitude": 10})
averages = dataset.mean('realization', keep_attrs=True)
show_all_workers()
with get_task_stream() as ts:
averaged = averages.compute(optimize_graph=False)
CPU times: user 313 ms, sys: 148 ms, total: 461 ms
Wall time: 3.45 s
Here are the workers it’s run on
workers = set((each['worker'] for each in ts.data))
assert len(workers) == 4
workers
{'tcp://127.0.0.1:35315',
'tcp://127.0.0.1:38781',
'tcp://127.0.0.1:39893',
'tcp://127.0.0.1:41725'}
Work Stealing Without Our Revisions
If we don’t use our hacked scheduler though, it goes wonky. Let’s see that by killing everything
!./docker-reset.sh
a40d54a3a528
b49f01a92f1f
a361494a9e20
Deleted Containers:
a40d54a3a528c9a0cd6b8b3e8af2defe1d81f58980af0a1fe3aaeccf1b8f83f6
b49f01a92f1f6f6e79c07b837cc247c55480f09e97ecdbb8fff15f1f4871c240
a361494a9e209850f9e514ec2e604122a6bf30498a95f132bca3647342ea4f75
Total reclaimed space: 571.5MB
and run up with the standard scheduler
%%bash --bg
docker run --network host metoffice/irisxarray dask-scheduler
/home/ec2-user/miniconda3/envs/irisxarray/lib/python3.10/site-packages/distributed/client.py:1096: VersionMismatchWarning: Mismatched versions found
+-------------+-----------+-----------+---------+
| Package | client | scheduler | workers |
+-------------+-----------+-----------+---------+
| dask | 2022.01.0 | 2022.01.1 | None |
| distributed | 2022.01.0 | 2022.01.1 | None |
+-------------+-----------+-----------+---------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
Start 4 metoffice workers, in separate processes
%%bash --bg
scheduler_host=localhost:8786
org_name=metoffice
worker_cmd="dask-worker $scheduler_host --nprocs 4 --nthreads 1 --resources org-$org_name=1 --name $org_name"
docker run --network host --mount type=bind,source="$(pwd)"/$org_name-data,target=/data/$org_name --env ORG_NAME=$org_name metoffice/irisxarray $worker_cmd
Start 4 eumetsat workers, in separate processes
%%bash --bg
scheduler_host=localhost:8786
org_name=eumetsat
worker_cmd="dask-worker $scheduler_host --nprocs 4 --nthreads 1 --resources org-$org_name=1 --name $org_name"
docker run --network host --mount type=bind,source="$(pwd)"/$org_name-data,target=/data/$org_name --env ORG_NAME=$org_name metoffice/irisxarray $worker_cmd
And a client to talk to them
from dask.distributed import Client
import dask
client = Client('localhost:8786')
/home/ec2-user/miniconda3/envs/irisxarray/lib/python3.10/site-packages/distributed/client.py:1096: VersionMismatchWarning: Mismatched versions found
+-------------+----------------+----------------+----------------+
| Package | client | scheduler | workers |
+-------------+----------------+----------------+----------------+
| dask | 2022.01.0 | 2022.01.1 | 2022.01.1 |
| distributed | 2022.01.0 | 2022.01.1 | 2022.01.1 |
| python | 3.10.2.final.0 | 3.10.0.final.0 | 3.10.0.final.0 |
+-------------+----------------+----------------+----------------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
Now run the calculation viewing the task stream
with in_org('metoffice'):
dataset = xarray.open_dataset('/data/metoffice/000490262cdd067721a34112963bcaa2b44860ab.nc').chunk({"latitude": 10})
averages = dataset.mean('realization', keep_attrs=True)
show_all_workers()
with get_task_stream() as ts:
averaged = averages.compute(optimize_graph=False)
That turns out to have been run on every worker
workers = set((each['worker'] for each in ts.data))
assert len(workers) == 8
workers
{'tcp://127.0.0.1:35075',
'tcp://127.0.0.1:36303',
'tcp://127.0.0.1:37557',
'tcp://127.0.0.1:38653',
'tcp://127.0.0.1:39673',
'tcp://127.0.0.1:39953',
'tcp://127.0.0.1:40385',
'tcp://127.0.0.1:46845'}
Our Revisions
To be honest, I don’t know why our revisions have such an effect on work stealing.
Given our PR, resource-aware-scheduler.py
looks like this
#!/usr/bin/env python
import click
import dask
from distributed.cli.dask_scheduler import main as scheduler_main
from distributed.stealing import WorkStealing
# Needs the hacked version of stealing.py to be installed in the conda environment to work
class OrganisationAwareWorkStealing(WorkStealing):
def potential_thieves_for(self, ts, idle, sat):
if ts.annotations.get('cross_org_stealing_allow', False):
return super().potential_thieves_for(ts, idle, sat)
else:
return [ws for ws in idle if self.org_aware_can_steal(ws, ts, sat)]
def org_aware_can_steal(self, thief, ts, victim):
return False if _org_of(thief) != _org_of(victim) else self.can_steal(thief, ts, victim)
def _org_of(worker):
orgs = [item for item in worker.resources if item.startswith('org')]
return orgs[0] if orgs else None
@click.command()
def dask_setup(scheduler):
scheduler.add_plugin(OrganisationAwareWorkStealing(scheduler))
if __name__ == '__main__':
with dask.config.set({
'distributed.scheduler.work-stealing': False,
'distributed.scheduler.preload': __file__}
):
scheduler_main()
I have yet to work out why this prevents work stealing when the standard resource constraint in stealing.py
for resource, value in ts.resource_restrictions.items():
try:
supplied = thief.resources[resource]
except KeyError:
return False
else:
if supplied < value:
return False
does not.