Hello,
I’m kind of new to Dask despite I used xarray for few months now. Currently, I’m struggling processing a 10.5 GB dataset of 1/2 hourly data I need to sum on a hourly base :
Opening the dataset...
Total number of time steps: 346224
Size of each processing chunk: 346 time steps
<xarray.Dataset>
Dimensions: (time: 346224, lat: 90, lon: 90)
Coordinates:
* time (time) datetime64[ns] 2002-01-01T00:15:00 ... 2021-09-3...
* lat (lat) float32 -18.53 -18.51 -18.49 ... -16.74 -16.72
* lon (lon) float32 -150.4 -150.4 -150.4 ... -148.5 -148.5
Data variables:
precipitationCal (time, lat, lon) float32 dask.array<chunksize=(346224, 10, 10), meta=np.ndarray>
In short, I need to run something like:
hourly_data = data.resample(time="1H").sum()
I tried so many scripts the last three days it would be to long to list ; but in short:
- chunck over time only (1000 ; 100) ; then time/lat/lon ; then lat/lon only (30 ; 10)
- automatic LocalCluster / defined Cluster : 2, 4, 9 workers / memory 4, 2, 1 GiB
- loop over time indexes via '.isel(start_idx, end_idx) : record result as NetCDF ; roll over whole dataset
My PC isn’t that bad : 16G RAM, 20 cores and runs under Ubuntu 22.04.
Nevertheless, I can’t get the RAM freed after each iteration of the loop despite using .close() - hence the RAM tends to fill up, and it eventually crashes.
I’m really out of option ; I can’t see why I cannot get xarray & Dask to manage this dataset and process it as intended.
I would very gladly use a little help. Thank you
Would like more information ? The current script I try to run:
from dask.distributed import Client, LocalCluster
# Create a Dask cluster with specific settings
cluster = LocalCluster(
n_workers=9, # Number of workers
memory_limit='1GiB' # Memory limit per worker
)
# Connect to the Dask cluster
client = Client(cluster)
# Print the Dask dashboard link
print(f"Dask dashboard: {client.dashboard_link}")
import os
import xarray as xr
import numpy as np
# Define chunk size (adjust based on available memory and dataset size)
chunk_size = {
'lat': 10,
'lon': 10
}
# Define IMERG file path and prefix
imerg_path = '/media/pmauger/LaCie/01_DATA/IMERG'
file_name = "IMERG_2002_2021_INTERPOLATED_RAW.nc"
# Output directory for processed chunks
output_dir = '/media/pmauger/LaCie/01_DATA/IMERG/chunks'
# Open the dataset
print("Opening the dataset...")
imerg = xr.open_dataset(os.path.join(imerg_path, file_name), chunks=chunk_size)
# Test
#imerg = imerg.isel(time=slice(0, 480))
# Calculate the total number of time steps
num_time_steps = len(imerg.time)
print(f"Total number of time steps: {num_time_steps}")
# Calculate the size of each processing chunk (approximately one-tenth of total time steps)
chunk_size_proc = num_time_steps // 1000
print(f"Size of each processing chunk: {chunk_size_proc} time steps")
# Loop through the dataset in chunks
for i in range(0, num_time_steps, chunk_size_proc):
# Define the start and end indices for the current chunk
start_idx = i
end_idx = min(i + chunk_size_proc, num_time_steps)
print(f"Processing chunk from index {start_idx} to {end_idx - 1}...")
# Generate output file name based on chunk index
output_file_path = os.path.join(output_dir, f'chunk_{start_idx}_{end_idx}.nc')
print(f"Saving processed chunk to: {output_file_path}")
# Resample to hourly data (sum 2 * 1/2 hourly)
imerg_chunk = imerg.isel(time=slice(start_idx, end_idx)).resample(time="1H").sum()
# Save the processed chunk to a NetCDF file
imerg_chunk.to_netcdf(output_file_path)
# List all processed chunk files
processed_files = sorted([os.path.join(output_dir, f) for f in os.listdir(output_dir) if f.startswith('chunk_')])
print(f"Processed {len(processed_files)} chunks.")
# Concatenate processed chunks along the time dimension using Dask
final_dataset = xr.open_mfdataset(processed_files, combine='nested', concat_dim='time')
# Save the final concatenated dataset to a NetCDF file
output_final_path = os.path.join(imerg_path, 'final_dataset.nc')
print(f"Saving final dataset to: {output_final_path}")
final_dataset.to_netcdf(output_final_path)
print(f'Final dataset saved: {output_final_path}')
It runs perfectly if I un-mute the ‘slicing’ of original imerg dataset to 0:480 time-steps. Nevertheless I can see the memory increasing at each iteration ; so I cannot run it over almost a thousand loops, which would be ridiculous.