Xarray operations (e.g., preprocess) running locally (post open_mfdataset) instead of on Dask distributed cluster

FYI, I’ve also posted the same question at Pangeo and Coiled. ( I’m a new user so only limited to 2 links per post). I will update this discussion in case I have a positive answer from those places.

I’m using dask.distributed and a coiled cluster to open with xarray multiple NetCDF files. Nothing fancy here.

But there is also a preprocess_xarray function running during the opening of files. This preprocess_xarray function is completely “empty” in my 1st showcase example . This example is a very simplified version of a bigger script and should be able to be ran by anyone since the data is opened.
In this example, a cluster is being started on the Coiled infra, all the NetCDF files are opened with xr.open_mfdataset, but during the preprocessing phase, the files are then downloaded to the machine running the cluster (I’m monitoring my network usage). If this function had stuff in it, It would use my local memory … Not part of this example, but afterward, I’m using ds.to_zarr() which then is running from the cluster!

I’ve tried many things:

  • adding a @delayed decorator to force preprocess_xarray to run on the cluster…
  • moving preprocess_xarray outside of open_mfdataset, and force it using client.submit(partial_preprocess_xarray, ds)
  • …and so on with no success.

Also the reason why the preprocess_xarray is not a method within the class in my example is because that function can’t be serialized and used with partial (if too complicated).

Here is another version of that gist, but slightly different. In it, the preprocess function is creating a dummy variable, and all of the memory of my local machine is being used: 2nd example

Thanks a lot for your help!

Loz

Hi @lbesnard, welcome to Dask community!

Before I try to reproduce your code, a few questions/remarks:

  • According to Xarray source code, if you use parallel=True kwarg (which it seems you do, sure about that?), the preprocess function should be wrapped inside Delayed and be ran on Dask cluster side.
  • So no need of wrapping it on your side. Another thing you might want to try is to use map_blocks after loading the data, but you shouldn’t have to do that.
  • Why don’t you use the chunks kwarg of the opening function instead of rechunking afterwards?

Hi @guillaumeeb,

Thanks for your reply.

… if you use parallel=True kwarg (which it seems you do, sure about that?), the preprocess function should be wrapped inside Delayed and be ran on Dask cluster side.

Yes, I use parallel=true. I can see on the cluster dask dashboard that files are being opened on the cluster. If i do afterwards a ds.to_zarr, the task will run also on the cluster. Only the preprocess part is being sent back to my local machine.

Another thing you might want to try is to use map_blocks after loading the data, but you shouldn’t have to do that.

I’ve tried that. Initially, I was pretty excited about it because everything seemed to work on the cluster. It was much slower however as it’s doing the preprocessing on the chunk level, and is also more way more expensive to use on a remote cluster. It seemed to work successfully as I didn’t get any error message. However, I found some extremely weird data corruption, with entire variables being replaced by NaNs. I’m not sure if it was a setting being wrong (maybe “data_vars”: “all” was missing), but i didn’t have the issue If I’d run ds = preprocess_xarray(ds) rather than ds = ds.map_blocks(partial_prepocess_xarray, ds). , so I lost confidence in map_blocks.

  • Why don’t you use the chunks kwarg of the opening function instead of rechunking afterwards?

I’m not using this because in some weird occasion, I have to open NetCDF files with different engines (NetCDF4, scipy…) because the input files I’m dealing with may be old. I’ve had occasionaly some xarray issues having the chunk=‘auto’ within open_mfdataset (Erroneous chunk size if I remember correctly). So this is why I put the chunk setting in a try except.


The example should be run easily as the input files are publicly available. The only thing to change would be the cluster details I believe

Cheers!

This is really strange.

I’m not sure I’m following. Using open_mfdataset, I think you get one chunk per file per default? Why would the preprocessing step be more expensive?

I’ll try to find time to try your example.

@guillaumeeb,
I have actually simplified my problem, and realised that the issue starts even before using preprocess (although there was maybe an issue because I was createing nan arrays with numpy rather than dask.array)

anyway, i have tried countless of things and still observe the same behaviour, where data is being dowloaded back to the machine launching the remote cluster, and local memory is feeling up.

here is a simplified code . I have tried with fargate cluster, and ec2 cluster by creating a docker image, same behaviour, so this is not a coiled cluster issue. Maybe a s3fs? or h5netcdf engine,who know?

#!/usr/bin/env python3
from functools import partial

import s3fs
import xarray as xr
from coiled import Cluster
from dask.distributed import Client
import numpy as np

def create_fileset(s3_paths, s3_fs=None):
    """
    Create a fileset from S3 objects specified by a list of full S3 paths.

    Args:
        s3_paths (str or list[str]): Either a single full S3 path (e.g., 's3://bucket_name/object_key')
                                     or a list of full S3 paths.

    Returns:
        list[file-like object]: List of file-like objects representing each object in the fileset.
    """
    if s3_fs is None:
        s3_fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False)

    if isinstance(s3_paths, str):
        s3_paths = [s3_paths]

    if not isinstance(s3_paths, list):
        raise ValueError("Invalid input format. Expecting either str or list[str].")

    # Create a fileset by opening each file
    fileset = [s3_fs.open(file) for file in s3_paths]

    return fileset


s3_file_uri_list = [
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211101111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211102111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211103111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211104111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211105111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211106111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211107111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211108111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211109111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211110111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211111111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211112111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211113111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211114111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211115111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
    's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211116111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
]

s3_file_handle_list = create_fileset(s3_file_uri_list)

cluster_options = {'n_workers': [1, 120],
                        'scheduler_vm_types': 't3.2xlarge',
                        'worker_vm_types': 't3.2xlarge',
                        'allow_ingress_from': 'me',
                        'compute_purchase_option': 'spot_with_fallback',
                        'worker_options':
                            {'nthreads': 8,
                                'memory_limit': '32GB'},
                        'name': 'thisIsATest'}

cluster = Cluster(**cluster_options)

client = cluster.get_client()
# client = Client(cluster)

open_mfdataset_params = {
                    "engine": "h5netcdf",
                    "parallel": True,
                    "data_vars": "all",
                    "concat_characters": True,
                    "mask_and_scale": True,
                    "decode_cf": False,
                    "decode_times": False,
                    "use_cftime": False,
                    "decode_coords": True,
                    "compat": "override",
                    "coords": "minimal",
                    "chunks": {'time':5,
                                'latitude': 500,
                                'longitude': 500
                              }
                }

ds = xr.open_mfdataset(s3_file_handle_list, **open_mfdataset_params)

Do you mean that the line

is downloading all data to your local machine? Are you doing anything else?

It turns out that the issue is related to s3fs and already lodge here:

The solution is to use this obscure option in s3fs:
default_file_cache=None

It sounds a bit insane that not many people are experiencing this issue as this means using a dask cluster is useless as the bottleneck becomes the machine which is starting the code

Well, glad you found that, and it’s interesting. Maybe it depends on the NetCDF files version or layout?