Run dask inside dask

Hi!

I have some code that uses satpy, which in turn uses xarray with dask for computations. The satpy reader works only with local files and insists on using xarray with dask arrays (I didn’t find an option to turn it off). So my reading function looks something like this

store: obstore.store.ObjectStore  # some s3 or something
def read_file(name: str) -> xr.Dataset: 
    # download file to local temporary directory
    with tempfile.TemporaryDirectory() as d:
        get = store.get(name)
        local_path = f'{d}/{name}'
        with open(local_path, 'wb+') as f:
            for bts in get:
                f.write(bts)

        # open with satpy as xarray with lazy dask arrays
        scene = satpy.Scene([local_path], reader=...)
        scene.load(...)
        ds = scene.resample(...).to_xarray()

        # load into memory and delete the temporary files
        return ds.compute()

Now, I would like to map this function over a distributed cluster on kubernetes, followed by some .to_zarr.

If I do it naively, then each call of read_fileissues about 200 very small tasks to the same distributed client, which is very bad for the following reasons

  • there are just too many small tasks, and the distributed scheduler chokes
  • the internal tasks of read_filerelate to a local file which resides on only one worker

So a very simple solution to that would be to use a different dask client for the outer parallelisation, and the inner satpy tasks. My code looks something like this

def read_task(name: str):
    with dask.config.set({'scheduler': 'threads', 'num_workers': 1}):
        read_file(name).to_zarr(...)
client = dask.distributed.Client('tcp://...kubernetes')
files: list[str]
client.map(read_task, files)

I was thinking that like this, the dask arrays inside read_file should automatically used an adhoc ‘threads’ scheduler managed by the task itself, but it seems that the tasks are still submitted to the outer kubernetes dask.

Do you have any tips how to do this correctly? Essentially to allow the inner tasks to use their own threaded dasks?

Any advice on how to do this in a different way would be appreciated as well.

1 Like

Hi,

seems like I solved it and there was only a minor issue in my code.

I think that dask.config.scheduler is ignored when DASK_SCHEDULER_ADDRESS is set (as is in my kubernetes cluster).

So it was enough to change ds.compute() to dask.base.compute(ds, scheduler=’threads’)

EDIT: Sorry for the confusion… It seems that my original code works, it was just that I was setting the dask.config in a different function that I was not calling :confused:

1 Like