Hi!
I have some code that uses satpy, which in turn uses xarray with dask for computations. The satpy reader works only with local files and insists on using xarray with dask arrays (I didn’t find an option to turn it off). So my reading function looks something like this
store: obstore.store.ObjectStore # some s3 or something
def read_file(name: str) -> xr.Dataset:
# download file to local temporary directory
with tempfile.TemporaryDirectory() as d:
get = store.get(name)
local_path = f'{d}/{name}'
with open(local_path, 'wb+') as f:
for bts in get:
f.write(bts)
# open with satpy as xarray with lazy dask arrays
scene = satpy.Scene([local_path], reader=...)
scene.load(...)
ds = scene.resample(...).to_xarray()
# load into memory and delete the temporary files
return ds.compute()
Now, I would like to map this function over a distributed cluster on kubernetes, followed by some .to_zarr.
If I do it naively, then each call of read_fileissues about 200 very small tasks to the same distributed client, which is very bad for the following reasons
- there are just too many small tasks, and the distributed scheduler chokes
- the internal tasks of
read_filerelate to a local file which resides on only one worker
So a very simple solution to that would be to use a different dask client for the outer parallelisation, and the inner satpy tasks. My code looks something like this
def read_task(name: str):
with dask.config.set({'scheduler': 'threads', 'num_workers': 1}):
read_file(name).to_zarr(...)
client = dask.distributed.Client('tcp://...kubernetes')
files: list[str]
client.map(read_task, files)
I was thinking that like this, the dask arrays inside read_file should automatically used an adhoc ‘threads’ scheduler managed by the task itself, but it seems that the tasks are still submitted to the outer kubernetes dask.
Do you have any tips how to do this correctly? Essentially to allow the inner tasks to use their own threaded dasks?
Any advice on how to do this in a different way would be appreciated as well.