1. Authenticate to Google Earth Engine
import ee
ee.Authenticate()
# 2. Create a Dask cluster
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
n_workers=4, # number of Python processes
threads_per_worker=1, # single-threaded workers
memory_limit="3GB", # per worker; can be "auto"
scheduler_port=0, # 0 = choose a free port
dashboard_address=":8787", # open the dashboard
processes=True, # True = separate processes (default)
)
client = Client(cluster)
# 3. Initialize Dask workers with Google Earth Engine
import ee
from distributed import WorkerPlugin
class EEPlugin(WorkerPlugin):
def __init__(self):
pass
def setup(self, worker):
self.worker = worker
try:
# Assume credentials already exist at default location
ee.Initialize(opt_url='https://earthengine-highvolume.googleapis.com')
except ee.EEException as e:
raise RuntimeError("Earth Engine initialization failed. "
"Run ee.Authenticate() once on the client before starting the cluster.")
ee_plugin = EEPlugin()
client.register_plugin(ee_plugin)
# 4. Run some Earth Engine code! This will do a basic grab of some Landsat data, with some standard filtering.
import ee
ee.Initialize(opt_url='https://earthengine-highvolume.googleapis.com')
# Basic cloud masking algorithm
def prep_sr_l8(image):
# Bit 0 - Fill
# Bit 1 - Dilated Cloud
# Bit 2 - Cirrus
# Bit 3 - Cloud
# Bit 4 - Cloud Shadow
qa_mask = image.select('QA_PIXEL').bitwiseAnd(int('11111', 2)).eq(0)
saturation_mask = image.select('QA_RADSAT').eq(0)
# Apply the scaling factors to the appropriate bands.
optical_bands = image.select('SR_B.*').multiply(0.0000275).add(-0.2)
thermal_bands = image.select('ST_B.*').multiply(0.00341802).add(149.0)
# Replace the original bands with the scaled ones and apply the masks.
return (image.addBands(optical_bands, None, True)
.addBands(thermal_bands, None, True)
.updateMask(qa_mask)
.updateMask(saturation_mask))
US_Boundaries = ee.FeatureCollection("projects/robust-raster/assets/boundaries/US")
ic = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2').filterDate('2018-01-01', '2018-12-31').map(prep_sr_l8).select(['SR_B4', 'SR_B5'])
import xarray as xr
ds = xr.open_dataset(ic, engine='ee', geometry=US_Boundaries.geometry(), crs='EPSG:5070', scale=1000, chunks='auto')
# 6. Compute NDVI using the xarray
import numpy as np
def add_ndvi(ds: xr.Dataset) -> xr.Dataset:
# Bands: B5 = NIR, B4 = Red
nir = ds["SR_B5"].astype("float32")
red = ds["SR_B4"].astype("float32")
# Compute NDVI, guard against divide-by-zero
denom = nir + red
ndvi = xr.where(denom != 0, (nir - red) / denom, np.nan).astype("float32")
# Add attributes for clarity
ndvi = ndvi.assign_attrs(
{
"long_name": "Normalized Difference Vegetation Index",
"standard_name": "NDVI",
"description": "NDVI = (SR_B5 - SR_B4) / (SR_B5 + SR_B4)",
"units": "1",
"source_bands": "SR_B5 (NIR), SR_B4 (Red)",
}
)
# Mutate dataset directly
return xr.Dataset({"ndvi": ndvi})
# 7. Run the computation, which should replicate the warning.
results = xr.map_blocks(func=add_ndvi, obj=ds)
results.compute()
r:\Users\adrianom.UNR.conda\envs\rreditmode\lib\site-packages\distributed\client.py:3362: UserWarning: Sending large graph of size 368.26 MiB.This may cause some slowdown.Consider loading the data with Dask directlyor using futures or delayed objects to embed the data into the graph without repetition.See also for more information.warnings.warn(
I am running an NDVI calculation using xee, a Python package that integrates Google Earth Engine with xarray. It’s a hefty computation. I’m getting this large graph warning listed at the end of the code snippet. Sometimes it takes a while for the computation to start. Other times, when I ask for even more data than this code is asking for, it doesn’t start up at all. Reading some similar posts here in the Dask Discourse group, I noticed some people stating that this issue has to do with loading the data locally and to fix this, I need to read the data into the workers directly. I’m assuming this to be the case here as well. I’m not sure. If you have Earth Engine Python as well as the packages I import in the above snippet, you should be able to replicate this error yourself.
