#! python
import argparse, os, sys, re
os.environ['USE_PYGEOS'] = '0'
ALRORITHM_SETTINGS = [1,2,5,10]
def getCmdArgs():
p = argparse.ArgumentParser(description = "Filter and export spatially indexed GEDI shots from multiple products using the h3 (hexagons) or egi (pixels) system")
p.add_argument("-o", "--output", dest="output", required=True, type=str, help="output directory or file path")
p.add_argument("-r", "--region", dest="region", required=False, type=str, default=None, help="path to vector (.shp, .gpkg, .kml etc.) or raster (.tif) file with region of interest to extract shots from OR iso3 country code - if not provided, all land surface data will be queried")
p.add_argument("-l2a", "--l2a", dest="l2a", nargs='+', type=str, default=None, required=False, help="GEDI L2A variables to export")
p.add_argument("-l2b", "--l2b", dest="l2b", nargs='+', type=str, default=None, required=False, help="GEDI L2B variables to export")
p.add_argument("-l4a", "--l4a", dest="l4a", nargs='+', type=str, default=None, required=False, help="GEDI L4A variables to export")
p.add_argument("-a", "--anci", dest="anci", type=str, default=None, required=False, help="quoted dictionary of ancillary variables to export - e.g. \"{'glad_forest_loss':['loss','lossyear']}\"")
p.add_argument("-rh", "--rh", dest="rh", nargs='+', type=int, default=None, required=False, help="RH metrics to extract from the selected algorithm setting for each shot [space separated list of percentiles or -1 for all rh metrics]")
p.add_argument("-t0", "--time_start", dest="time_start", type=str, default=None, required=False, help="start date to filter shots [YYYY-MM-DD]")
p.add_argument("-t1", "--time_end", dest="time_end", type=str, default=None, required=False, help="end date to filter shots [YYYY-MM-DD]")
p.add_argument("-q", "--query", dest="query", required=False, type=str, default=None, help="single string with custom filters upon listed variables - use pandas.DataFrame.query notation")
p.add_argument("-y", "--quality", dest="quality", required=False, action='store_true', help="apply latest quality filter recipe")
p.add_argument("-e", "--egi", dest="egi", required=False, action='store_true', help="export shots with EGI spatial index (exact pixels) instead of H3 (approximate hexagons) - much slower!")
p.add_argument("-m", "--merge", dest="merge", required=False, action='store_true', help="merge outputs and export to single file")
p.add_argument("-g", "--geo", dest="geo", required=False, action='store_true', help="export file as georreferenced points")
p.add_argument("-f", "--format", dest="format", required=False, type=str, default='parquet', help="output files format [default = parquet]")
p.add_argument("-D", "--debug", dest="debug", required=False, action='store_true', help="allow dask to display error messages")
n = os.cpu_count() // 4
p.add_argument("-n", "--cores", dest="cores", required=False, type=int, default=n, help=f"number of cpu cores to use [default = {n}]")
p.add_argument("-s", "--threads", dest="threads", required=False, type=int, default=1, help="number of threads per cpu [default = 1]")
p.add_argument("-A", "--ram", dest="ram", required=False, type=int, default=20, help="maximum RAM usage per cpu - in Giga Bytes [default = 20]")
p.add_argument("-p", "--port", dest="port", required=False, type=int, default=10000, help="port where to open dask dashboard [default = 10000]")
cmdargs = p.parse_args()
return cmdargs
def parse_algorithm_selection(var_list, prod='l2a'):
has_star = any(['_a*' in i for i in var_list])
if not has_star:
return var_list, []
gedi_vars = []
algo_mapper = []
for i in var_list:
if '_a*' not in i:
gedi_vars.append(i)
continue
q = i.replace('_a*', '_a\d*')
out_name = i.replace('_a*', '')
q_vars = gh3.list_gedi_variables(q).loc[prod].column.to_list()
gedi_vars += q_vars
obj = {'in': i, 'out': out_name, 'merge': q_vars}
algo_mapper.append(obj)
return gedi_vars, algo_mapper
if __name__ == '__main__':
args = getCmdArgs()
from datetime import datetime
import pandas as pd
import geopandas as gpd
import gedidriver as gdr
import gedih3 as gh3
import dask, logging, ast
import dask_geopandas as dkg
from dask.distributed import Client, progress
is_greedy = gh3.safe.tell_user(os.cpu_count() // 2, verbose=False)
is_greedy = gh3.safe.tell_user(os.cpu_count() // 2) # 2nd time gets cpu percent correctly
if is_greedy and not gh3.safe.clear_user():
print("## -- too many resources used by your current spawned processes - try again once those are finished")
sys.exit("## -- EXIT")
else:
print("## -- user cleared - opening distributed backend")
client_args = dict(n_workers=args.cores, threads_per_worker=args.threads, dashboard_address=f':{args.port}', memory_limit=f'{args.ram}GB')
if not args.debug:
client_args['silence_logs'] = logging.ERROR
client = Client(**client_args)
print("## -- dask dashboard available at:", client.dashboard_link)
anci = None if args.anci is None else ast.literal_eval(args.anci)
preq = args.query
reg = None
selection_mapper = []
has_select = False
if args.l2a:
args.l2a, l2a_select = parse_algorithm_selection(args.l2a, 'l2a')
selection_mapper += l2a_select
has_select |= ('selected_algorithm' in args.l2a)
if args.l2b:
args.l2b, l2b_select = parse_algorithm_selection(args.l2b, 'l2b')
selection_mapper += l2b_select
if args.l4a:
args.l4a, l4a_select = parse_algorithm_selection(args.l4a, 'l4a')
selection_mapper += l4a_select
has_select |= ('selected_algorithm' in args.l4a)
if len(selection_mapper) > 0 and not has_select:
if args.l2a:
args.l2a.append('selected_algorithm')
else:
args.l2a = ['selected_algorithm']
if args.region is not None:
print("## -- loading region of interest")
if not os.path.isfile(args.region):
try:
countries = gdr.execute_query(f"select iso3, geometry as geom from fia.countries where iso3 = '{ args.region.upper() }'", True)
if len(countries) > 0:
reg = countries
else:
sys.exit(f"## -- file not found: {args.region}")
except:
sys.exit("## -- no access to the postgres database")
elif args.region.lower().endswith('parquet') or args.region.lower().endswith('pq') or args.region.lower().endswith('parq'):
reg = gpd.read_parquet(args.region)
elif args.region.lower().endswith('kml'):
import fiona
fiona.drvsupport.supported_drivers['KML'] = 'rw'
reg = gpd.read_file(args.region, driver='KML')
elif args.region.lower().endswith('tif'):
import rioxarray
from shapely.geometry import box
with rioxarray.open_rasterio(args.region) as img:
reg = gpd.GeoSeries([box(*img.rio.bounds())], crs=img.rio.crs)
else:
reg = gpd.read_file(args.region)
if args.quality:
if preq is None:
preq = 'quality_flag'
else:
preq += ' and quality_flag'
if anci is None:
anci = {"quality_flags":["quality_flag"]}
elif 'quality_flags' not in anci:
anci['quality_flags'] = ['quality_flag']
elif 'quality_flags' in anci and 'quality_flag' not in anci['quality_flags']:
anci['quality_flags'] += ['quality_flag']
if args.geo:
has_geo = False
if args.l2a:
has_geo |= ('lat_lowestmode' in args.l2a and 'lon_lowestmode' in args.l2a)
if args.l2b:
has_geo |= ('lat_lowestmode' in args.l2b and 'lon_lowestmode' in args.l2b)
if args.l4a:
has_geo |= ('lat_lowestmode' in args.l4a and 'lon_lowestmode' in args.l4a)
if not has_geo:
if args.l2a:
args.l2a += ['lon_lowestmode', 'lat_lowestmode']
elif args.l2b:
args.l2b += ['lon_lowestmode', 'lat_lowestmode']
elif args.l4a:
args.l4a += ['lon_lowestmode', 'lat_lowestmode']
if args.time_start or args.time_end:
has_time = False
l2b_time = False
tvar = 'delta_time'
if args.l2a:
has_time |= ('delta_time' in args.l2a)
elif args.l2b:
l2b_time = ('geolocation/delta_time' in args.l2b)
has_time |= l2b_time
elif args.l4a:
has_time |= ('delta_time' in args.l4a)
if not has_time:
if args.l2a:
args.l2a += ['delta_time']
elif args.l2b:
l2b_time = True
args.l2b += ['geolocation/delta_time']
elif args.l4a:
args.l4a += ['delta_time']
if l2b_time:
tvar = '`geolocation/delta_time`'
t_query = None
if args.time_start:
t0 = datetime.strptime(args.time_start, '%Y-%m-%d').timestamp() - gdr.START_DATE.timestamp()
t_query = f"{tvar} > {t0}"
if args.time_end:
t1 = datetime.strptime(args.time_end, '%Y-%m-%d').timestamp() - gdr.START_DATE.timestamp()
if t_query:
t_query += f" and {tvar} < {t1}"
else:
t_query = f"{tvar} < {t1}"
if preq is None:
preq = t_query
else:
preq += ' and ' + t_query
if args.rh is not None:
if args.rh[0] < 0:
args.rh = list(range(101))
rh_cols = [f'geolocation/rh_a{i}_{j:03d}' for i in ALRORITHM_SETTINGS for j in args.rh]
if args.l4a is None or 'selected_algorithm' not in args.l4a:
rh_cols.append('selected_algorithm')
args.l2a = list(rh_cols) if args.l2a is None else list(set(args.l2a + rh_cols))
print("## -- reading distributed data frame")
gdf = gh3.load_gedi(l2a=args.l2a, l2b=args.l2b, l4a=args.l4a, anci=anci, square_tiles=args.egi, region=reg)
if len(selection_mapper) > 0:
print("## -- applying algorithm selection on flagged variables")
gdf['algo'] = gdf.selected_algorithm.apply(lambda x: x if x in ALRORITHM_SETTINGS else 10)
for i in selection_mapper:
gdf[i['out']] = 0
for a in ALRORITHM_SETTINGS:
a_var = i['in'].replace('*', str(a))
gdf[i['out']] += (gdf.algo == a) * (gdf[a_var])
gdf = gdf.drop(columns=i['merge'])
if args.rh is not None:
print("## -- selecting RH metrics")
keep_vars = [i for i in gdf.columns if i not in rh_cols]
if 'algo' not in gdf.columns:
gdf['algo'] = gdf.selected_algorithm.apply(lambda x: x if x in ALRORITHM_SETTINGS else 10)
meta = gh3.rh_filter(gdf.head(), keep=keep_vars, rh_list=args.rh, algo_column='algo')
gdf = gdf.map_partitions(gh3.rh_filter, keep=keep_vars, rh_list=args.rh, algo_column='algo', meta=meta)
if 'algo' in gdf.columns:
gdf = gdf.drop(columns='algo')
if preq is not None:
print("## -- applying data filters")
gdf = gdf.query(preq)
if args.geo:
print("## -- scheduling georreferencing")
gdf = gdf.assign(geometry = dkg.points_from_xy(gdf, x='lon_lowestmode', y='lat_lowestmode', crs=4326))
gdf = dkg.from_dask_dataframe(gdf)
if reg is not None:
gdf = gdf.clip(reg.to_crs(4326))
print("## -- loading and exporting data")
opath = args.output
if args.merge:
opath = re.sub('/*$', '', opath)
gdf = gdf.persist()
progress(gdf)
print('')
gdf = gdf.compute()
if not opath.endswith(f'.{args.format}'):
opath += f'.{args.format}'
if args.geo and args.format not in ['parq', 'parquet', 'pq']:
gdf.to_file(opath)
else:
gdf.to_parquet(opath, engine='pyarrow')
else:
files = gh3.export_parts(gdf, opath, fmt=args.format)
files = files.persist()
progress(files)
print('')
try:
client.close()
except:
pass
sys.exit("## -- DONE")