@guillaumeeb,
I have actually simplified my problem, and realised that the issue starts even before using preprocess (although there was maybe an issue because I was createing nan arrays with numpy rather than dask.array)
anyway, i have tried countless of things and still observe the same behaviour, where data is being dowloaded back to the machine launching the remote cluster, and local memory is feeling up.
here is a simplified code . I have tried with fargate cluster, and ec2 cluster by creating a docker image, same behaviour, so this is not a coiled cluster issue. Maybe a s3fs? or h5netcdf engine,who know?
#!/usr/bin/env python3
from functools import partial
import s3fs
import xarray as xr
from coiled import Cluster
from dask.distributed import Client
import numpy as np
def create_fileset(s3_paths, s3_fs=None):
"""
Create a fileset from S3 objects specified by a list of full S3 paths.
Args:
s3_paths (str or list[str]): Either a single full S3 path (e.g., 's3://bucket_name/object_key')
or a list of full S3 paths.
Returns:
list[file-like object]: List of file-like objects representing each object in the fileset.
"""
if s3_fs is None:
s3_fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False)
if isinstance(s3_paths, str):
s3_paths = [s3_paths]
if not isinstance(s3_paths, list):
raise ValueError("Invalid input format. Expecting either str or list[str].")
# Create a fileset by opening each file
fileset = [s3_fs.open(file) for file in s3_paths]
return fileset
s3_file_uri_list = [
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211101111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211102111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211103111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211104111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211105111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211106111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211107111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211108111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211109111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211110111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211111111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211112111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211113111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211114111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211115111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
's3://imos-data/IMOS/SRS/SST/ghrsst/L3SM-1dS/dn/2021/20211116111000-ABOM-L3S_GHRSST-SSTfnd-MultiSensor-1d_dn_Southern.nc',
]
s3_file_handle_list = create_fileset(s3_file_uri_list)
cluster_options = {'n_workers': [1, 120],
'scheduler_vm_types': 't3.2xlarge',
'worker_vm_types': 't3.2xlarge',
'allow_ingress_from': 'me',
'compute_purchase_option': 'spot_with_fallback',
'worker_options':
{'nthreads': 8,
'memory_limit': '32GB'},
'name': 'thisIsATest'}
cluster = Cluster(**cluster_options)
client = cluster.get_client()
# client = Client(cluster)
open_mfdataset_params = {
"engine": "h5netcdf",
"parallel": True,
"data_vars": "all",
"concat_characters": True,
"mask_and_scale": True,
"decode_cf": False,
"decode_times": False,
"use_cftime": False,
"decode_coords": True,
"compat": "override",
"coords": "minimal",
"chunks": {'time':5,
'latitude': 500,
'longitude': 500
}
}
ds = xr.open_mfdataset(s3_file_handle_list, **open_mfdataset_params)