OutOfBoundsDatetime error when running Dask distributed on SLURM

Hello, I hope this is an appropriate venue for this question!

I am currently attempting to run a workflow that aims to convert coupled GCM output “history fields” in netCDF format to time-series zarr stores on cloud storage (Amazon S3 specifically). I’ve successfully run the code on a fairly large dataset on a jupyterhub and I was hoping to transfer the work into SLURM batch jobs. However, I’m running into seemingly random “OutOfBoundsDatetime” errors when running some of the jobs. Does anyone know how to diagnose this sort of error when using Dask/Zarr/xarray/fsspec?

Thank you!

ERROR MESSAGE:

2024-05-20 19:09:32,823 - distributed.worker - WARNING - Compute Failed
Key:       ('open_dataset-concatenate-ce1061766bc0525459c914c7603c93fe', 4, 0, 0)
Function:  getter
args:      (ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=_ElementwiseFunctionArray(_ElementwiseFunctionArray(LazilyIndexedArray(array=<xarray.backends.netCDF4_.NetCDF4ArrayWrapper object at 0x14f9d1809e80>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None)))), func=functools.partial(<function _apply_mask at 0x14f9e8c21940>, encoded_fill_values={1e+36}, decoded_fill_value=nan, dtype=dtype('float32')), dtype=dtype('float32')), func=functools.partial(<function decode_cf_timedelta at 0x14f9e8c24f70>, units='seconds'), dtype=dtype('<m8[ns]')), key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None)))))), (slice(0, 1, None), slice(0, 192, None), slice(0, 288, None)))
kwargs:    {}
Exception: 'OutOfBoundsDatetime("cannot convert input 85286404096.0 with the unit \'s\'")'

Below is the code section that produces the error:

# Load data
data = xr.open_mfdataset(f'{path}/{prefix}',combine='nested',concat_dim = timedim)
    
## Sets up DASK Delayed objects
## Creates empty zarr stores on s3 at this stages
l_pool_delayed = []
for var,zarr_store,mode,t0,varlist in self.to_do_dict[f'{domain}.{model}.{freq}']:
    if self.verbose: print(f'Set up DASK delayed: {var}')
    in_data = data[varlist + timevars].chunk(chunk).sel({timedim:slice(t0,'9999-12-31')})
  
    mapper = fsspec.get_mapper(zarr_store,
                           client_kwargs={'region_name':self.s3_region},
                           check=False)

    writeobj = in_data.to_zarr(store=mapper,mode=mode,compute=False)
    l_pool_delayed.append(writeobj)

## Starts the dask processes!
print('BEGINNING DASK DISTRIBUTED COMPUTE')
with Client(threads_per_worker=self.nthread, n_workers=self.nworker) as client:
    dask.compute(*l_pool_delayed)

A couple additional comments:

  1. I am setting up the SLURM job by calling the above python script with a bash script.
  2. When I run the same script on jupyterhub it completes without issue
  3. This occurs at different points in the computation when I attempt to re-do them. i.e., the error occurs when processing different variables.
  4. “85286404096.0” does not appear as a value anywhere in the dataset, as far as I can tell, so I’m not sure where this value is coming from.
  5. Each of the workers throws the same error, but the float value in the exception message is different in each (e.g., another error message says Exception: 'OutOfBoundsDatetime("cannot convert input 85333753856.0 with the unit \'s\'")')

Hi @hirasawaharuki, welcome to Dask community!

Of course!

It’s hard to understand the cause of this error for now, but I don’t think this is related to Slurm since you are just using a LocalCluster inside jobs. Are you launching several jobs in parallel?

The problem in your post is that your code is not reproducible and looks very specific in how your writing individual chunks, which does not mean it is the source of the error.

I’ve got several questions:

  • You say the same script, (LocalCluster included?), is running fine on a Jupyterhub. Which Jupyterhub, on which environment?
  • Are your Python environment in Jupyterhub and on your HPC system the same?
  • Finally, would you be able to build a reproducible example with fake generated data?

Hi @guillaumeeb,

Thank you for the reply! I am launching several jobs at the same time, but they are processing different datasets on different nodes, so they should be completely independent.

  1. I successfully ran the same code on JupyterLab v3.5.0 that’s hosted on an AWS HPC6a cluster. In principle, the SLURM clusters should be the same type.
  2. I’m using different versions of conda (conda 22.9.0 vs conda 23.7.4). The diff for the conda environments is (/home/ec2-user/anaconda3/envs/awsproc is from the cluster I’m having trouble with):
< # packages in environment at /home/ec2-user/anaconda3/envs/awsproc:
---
> # packages in environment at /home/jupyter-haruki/.conda/envs/hhenv:
18d17
< asynctest                 0.13.0                     py_0    conda-forge
100d98
< heapdict                  1.0.1                      py_0    conda-forge

  1. I am having some trouble finding a reproducible example, since the code seems to work on other clusters. I’ll follow up once I’ve found something that reproduces this error.

Thanks,
Haruki

1 Like