Large graph warning when pulling data from Google Earth Engine

 1. Authenticate to Google Earth Engine
import ee

ee.Authenticate()

# 2. Create a Dask cluster

from dask.distributed import Client, LocalCluster

cluster = LocalCluster(
    n_workers=4,                  # number of Python processes
    threads_per_worker=1,         # single-threaded workers
    memory_limit="3GB",           # per worker; can be "auto"
    scheduler_port=0,             # 0 = choose a free port
    dashboard_address=":8787",    # open the dashboard
    processes=True,               # True = separate processes (default)
)
client = Client(cluster)

# 3. Initialize Dask workers with Google Earth Engine
import ee
from distributed import WorkerPlugin

class EEPlugin(WorkerPlugin):
    def __init__(self):
        pass
    def setup(self, worker):
        self.worker = worker
        try:
            # Assume credentials already exist at default location
            ee.Initialize(opt_url='https://earthengine-highvolume.googleapis.com')
        except ee.EEException as e:
            raise RuntimeError("Earth Engine initialization failed. "
                            "Run ee.Authenticate() once on the client before starting the cluster.")

ee_plugin = EEPlugin()
client.register_plugin(ee_plugin)

# 4. Run some Earth Engine code! This will do a basic grab of some Landsat data, with some standard filtering. 
import ee
ee.Initialize(opt_url='https://earthengine-highvolume.googleapis.com')

# Basic cloud masking algorithm
def prep_sr_l8(image):
    # Bit 0 - Fill
    # Bit 1 - Dilated Cloud
    # Bit 2 - Cirrus
    # Bit 3 - Cloud
    # Bit 4 - Cloud Shadow
    qa_mask = image.select('QA_PIXEL').bitwiseAnd(int('11111', 2)).eq(0)
    saturation_mask = image.select('QA_RADSAT').eq(0)

    # Apply the scaling factors to the appropriate bands.
    optical_bands = image.select('SR_B.*').multiply(0.0000275).add(-0.2)
    thermal_bands = image.select('ST_B.*').multiply(0.00341802).add(149.0)

    # Replace the original bands with the scaled ones and apply the masks.
    return (image.addBands(optical_bands, None, True)
                 .addBands(thermal_bands, None, True)
                 .updateMask(qa_mask)
                 .updateMask(saturation_mask))

US_Boundaries = ee.FeatureCollection("projects/robust-raster/assets/boundaries/US")
ic = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2').filterDate('2018-01-01', '2018-12-31').map(prep_sr_l8).select(['SR_B4', 'SR_B5'])

import xarray as xr

ds = xr.open_dataset(ic, engine='ee', geometry=US_Boundaries.geometry(), crs='EPSG:5070', scale=1000, chunks='auto')

# 6. Compute NDVI using the xarray
import numpy as np

def add_ndvi(ds: xr.Dataset) -> xr.Dataset:
    # Bands: B5 = NIR, B4 = Red
    nir = ds["SR_B5"].astype("float32")
    red = ds["SR_B4"].astype("float32")

    # Compute NDVI, guard against divide-by-zero
    denom = nir + red
    ndvi = xr.where(denom != 0, (nir - red) / denom, np.nan).astype("float32")

    # Add attributes for clarity
    ndvi = ndvi.assign_attrs(
        {
            "long_name": "Normalized Difference Vegetation Index",
            "standard_name": "NDVI",
            "description": "NDVI = (SR_B5 - SR_B4) / (SR_B5 + SR_B4)",
            "units": "1",
            "source_bands": "SR_B5 (NIR), SR_B4 (Red)",
        }
    )

    # Mutate dataset directly
    return xr.Dataset({"ndvi": ndvi})

# 7. Run the computation, which should replicate the warning.
results = xr.map_blocks(func=add_ndvi, obj=ds)
results.compute()

r:\Users\adrianom.UNR.conda\envs\rreditmode\lib\site-packages\distributed\client.py:3362: UserWarning: Sending large graph of size 368.26 MiB.This may cause some slowdown.Consider loading the data with Dask directlyor using futures or delayed objects to embed the data into the graph without repetition.See also for more information.warnings.warn(

I am running an NDVI calculation using xee, a Python package that integrates Google Earth Engine with xarray. It’s a hefty computation. I’m getting this large graph warning listed at the end of the code snippet. Sometimes it takes a while for the computation to start. Other times, when I ask for even more data than this code is asking for, it doesn’t start up at all. Reading some similar posts here in the Dask Discourse group, I noticed some people stating that this issue has to do with loading the data locally and to fix this, I need to read the data into the workers directly. I’m assuming this to be the case here as well. I’m not sure. If you have Earth Engine Python as well as the packages I import in the above snippet, you should be able to replicate this error yourself.

Hi @adrianom-gh, welcome to Dask community!

You are right, the warning here doesn’t come from reading the data on client side. I was able to install xee package and run your code, even if I only get the warning with a graph of 88MiB.

This huge graph is the result of you working with a huge collection. With your parameters, each variable of the Xarray Dataset ends up as a Dask array of 97320 chunks. At the end of this simple computation, Dask sends a graph with 585560 keys, which is huge.

Running results.compute() will result in trying to build a 7.5 TiB array in your client process, so I don’t advise to do this. However, I tried to run it in order to observe your problem, indeed, because the graph is huge and complex, it takes 1 or 2 minutes before the computation really starts on the Scheduler/Workers. This is expected when the Scheduler has to deal with this number of tasks. Dask can handle well up to 1 million tasks, but it delays the start of computation. If possible, I often advise to try to keep computations under 100k tasks.

Note that xee is also complaining a bit, I get a lot of
WARNING:root:Xee is indexing into the ImageCollection beyond 10000 images. This operation can be slow. To improve performance, consider filtering the ImageCollection prior to using Xee or enabling fast_time_slicing..

So what’s the solution here? I can think of two:

  • Try to define bigger chunks when using xr.open_dataset. 'auto' will result in Dask/Xarray trying to get chunks of about 100MiB, which is good for many use cases, but not necessarily with big collections. You can for example divide tasks number by 4 with:
ds = xr.open_dataset(ic, engine='ee', geometry=US_Boundaries.geometry(), crs='EPSG:5070', scale=1000, chunks={'time':96, 'X':1024, 'Y':1024})
  • Launch several computations each on a subset of the date range.

Finaly, keep in mind you probably don’t want to call results.compute() at the end.

Hi @guillaumeeb ! Thank you so much for responding! I do know that Google Earth Engine has limitations on how much data you can pull per request. The xee developers suggest not going beyond about 48 MiB of data per chunk, otherwise you get an Earth Engine error stating you reached a limit. Unfortunately, I use the max possible chunk size Earth Engine allows (48 MiB), so I can’t make it any bigger without getting the error I mentioned earlier. Looks like this is just a limitation I can’t do anything about. Doing the computation in subsets, as you suggested, is probably the best course of action. As for results.compute(), do you suggest I use .persist() instead to avoid the client-side reading of data?

Okay, I didn’t know. Makes sense for them.

What do you want to do with this final data? Perform some analysis on it? Or save it to disk? If so you should directly do this, this is a huge dataset to keep in memory!

I guess both to answer your question. add_ndvi() is a basic testing function I am using just to debug the large graph warning. This function will be replaced with a another more practical use-case for analysis. These results are written to disk. I have code that already writes results to disk, so results.compute() does free chunks from disk once tasks are completed. If you have any other suggestions, I’m all ears! I’m not sure if results.compute() is the right way to use it in this case.

Well, you need to make sure that compute() is not called on a big collection, because otherwise it will return back all your dataset in the client memory.

If you want to modify some data and write it to disk, you usaly use something like to_zarr().

For my use-case, I need to export results to a raster data format and to my knowledge no such method in xarray.Dataset exists outside of dataset.rio.to_raster(). But I need a bit more control than what dataset.rio.to_raster() provides. So I wrote my own wrapper function that exports an xarray.Dataset chunk to a raster. This wrapper function wraps over the user function that gets passed into xr.map_blocks and runs when results.compute() gets called. So, I guess in this case, I need some way to trigger my computation and export the results. I’m assuming to_zarr() does both for you (starts the computation and exports results). I don’t believe there is any other way to do what I’m suggesting without using results.compute(). .persist() maybe? That would move away from loading stuff on the client side.

Okay, that makes sense and you are right! Just be careful to not return anything into map_blocks!