Distributed.nanny - ERROR - Error in Nanny killing Worker subprocess

The code works in USA region scale, but get error running at a lager region.
The code can finish at 100%, but gave me following error at last moment.

2023-07-21 22:07:12,771 - distributed.nanny - ERROR - Error in Nanny killing Worker subprocess
Traceback (most recent call last):
File “/gpfs/data1/vclgp/xiongl/env/ih3/lib/python3.9/asyncio/tasks.py”, line 452, in wait_for
return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File “/gpfs/data1/vclgp/xiongl/env/ih3/lib/python3.9/site-packages/distributed/nanny.py”, line 597, in close
await self.kill(timeout=timeout, reason=reason)
File “/gpfs/data1/vclgp/xiongl/env/ih3/lib/python3.9/site-packages/distributed/nanny.py”, line 380, in kill
await self.process.kill(reason=reason, timeout=0.8 * (deadline - time()))
File “/gpfs/data1/vclgp/xiongl/env/ih3/lib/python3.9/site-packages/distributed/nanny.py”, line 853, in kill
await process.join(max(0, deadline - time()))
File “/gpfs/data1/vclgp/xiongl/env/ih3/lib/python3.9/site-packages/distributed/process.py”, line 330, in join
await wait_for(asyncio.shield(self._exit_future), timeout)
File “/gpfs/data1/vclgp/xiongl/env/ih3/lib/python3.9/site-packages/distributed/utils.py”, line 1924, in wait_for
return await asyncio.wait_for(fut, timeout)
File “/gpfs/data1/vclgp/xiongl/env/ih3/lib/python3.9/asyncio/tasks.py”, line 454, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

Hi @linxiongumd, welcome to Dask community!

So your problem sounds like a network performance (or maybe configuration?) issue.

How large is the future value you are trying to get back? Do you have some sample of your code?

Did you try to use the timeout kwarg in the Future.result() function (if you are using that one)?

Thanks for the reply! I didn’t use the Future.result(). Codes work well at country scale. Maybe the future value is too large if I query globally. How can I get the future value?

I think we will need at least a snippet of your code to understand what you are doing, which Dask API you are using and so one.

#! python
import argparse, os, sys, re
os.environ['USE_PYGEOS'] = '0'

ALRORITHM_SETTINGS = [1,2,5,10]

def getCmdArgs():
    p = argparse.ArgumentParser(description = "Filter and export spatially indexed GEDI shots from multiple products using the h3 (hexagons) or egi (pixels) system")
    
    p.add_argument("-o", "--output", dest="output", required=True, type=str, help="output directory or file path")
    p.add_argument("-r", "--region", dest="region", required=False, type=str, default=None, help="path to vector (.shp, .gpkg, .kml etc.) or raster (.tif) file with region of interest to extract shots from OR iso3 country code - if not provided, all land surface data will be queried")
    p.add_argument("-l2a", "--l2a", dest="l2a", nargs='+', type=str, default=None, required=False, help="GEDI L2A variables to export")
    p.add_argument("-l2b", "--l2b", dest="l2b", nargs='+', type=str, default=None, required=False, help="GEDI L2B variables to export")
    p.add_argument("-l4a", "--l4a", dest="l4a", nargs='+', type=str, default=None, required=False, help="GEDI L4A variables to export")
    p.add_argument("-a", "--anci", dest="anci", type=str, default=None, required=False, help="quoted dictionary of ancillary variables to export - e.g. \"{'glad_forest_loss':['loss','lossyear']}\"")
    p.add_argument("-rh", "--rh", dest="rh", nargs='+', type=int, default=None, required=False, help="RH metrics to extract from the selected algorithm setting for each shot [space separated list of percentiles or -1 for all rh metrics]")
    p.add_argument("-t0", "--time_start", dest="time_start", type=str, default=None, required=False, help="start date to filter shots [YYYY-MM-DD]")
    p.add_argument("-t1", "--time_end", dest="time_end", type=str, default=None, required=False, help="end date to filter shots [YYYY-MM-DD]")
    p.add_argument("-q", "--query", dest="query", required=False, type=str, default=None, help="single string with custom filters upon listed variables - use pandas.DataFrame.query notation")    
    p.add_argument("-y", "--quality", dest="quality", required=False, action='store_true', help="apply latest quality filter recipe")
    p.add_argument("-e", "--egi", dest="egi", required=False, action='store_true', help="export shots with EGI spatial index (exact pixels) instead of H3 (approximate hexagons) - much slower!")
    p.add_argument("-m", "--merge", dest="merge", required=False, action='store_true', help="merge outputs and export to single file")
    p.add_argument("-g", "--geo", dest="geo", required=False, action='store_true', help="export file as georreferenced points")
    p.add_argument("-f", "--format", dest="format", required=False, type=str, default='parquet', help="output files format [default = parquet]")
    p.add_argument("-D", "--debug", dest="debug", required=False, action='store_true', help="allow dask to display error messages")

    n = os.cpu_count() // 4
    p.add_argument("-n", "--cores", dest="cores", required=False, type=int, default=n, help=f"number of cpu cores to use [default = {n}]")
    p.add_argument("-s", "--threads", dest="threads", required=False, type=int, default=1, help="number of threads per cpu [default = 1]")
    p.add_argument("-A", "--ram", dest="ram", required=False, type=int, default=20, help="maximum RAM usage per cpu - in Giga Bytes [default = 20]")
    p.add_argument("-p", "--port", dest="port", required=False, type=int, default=10000, help="port where to open dask dashboard [default = 10000]")
    
    cmdargs = p.parse_args()
    return cmdargs

def parse_algorithm_selection(var_list, prod='l2a'):
    has_star = any(['_a*' in i for i in var_list])
    
    if not has_star: 
        return var_list, []
    
    gedi_vars = []
    algo_mapper = []
    for i in var_list:
        if '_a*' not in i:
            gedi_vars.append(i)
            continue
        
        q = i.replace('_a*', '_a\d*')
        out_name = i.replace('_a*', '')
        q_vars = gh3.list_gedi_variables(q).loc[prod].column.to_list()
        gedi_vars += q_vars
        
        obj = {'in': i, 'out': out_name, 'merge': q_vars}
        algo_mapper.append(obj)
    
    return gedi_vars, algo_mapper
    

if __name__ == '__main__':
    
    args = getCmdArgs()

    from datetime import datetime
    import pandas as pd
    import geopandas as gpd
    import gedidriver as gdr
    import gedih3 as gh3
    import dask, logging, ast
    import dask_geopandas as dkg
    from dask.distributed import Client, progress

    is_greedy = gh3.safe.tell_user(os.cpu_count() // 2, verbose=False)
    is_greedy = gh3.safe.tell_user(os.cpu_count() // 2) # 2nd time gets cpu percent correctly
    if is_greedy and not gh3.safe.clear_user(): 
        print("## -- too many resources used by your current spawned processes - try again once those are finished")
        sys.exit("## -- EXIT")
    else:
        print("## -- user cleared - opening distributed backend")

    client_args = dict(n_workers=args.cores, threads_per_worker=args.threads, dashboard_address=f':{args.port}', memory_limit=f'{args.ram}GB')
    if not args.debug:
        client_args['silence_logs'] = logging.ERROR
    
    client = Client(**client_args)
    print("## -- dask dashboard available at:", client.dashboard_link)

    anci = None if args.anci is None else ast.literal_eval(args.anci)
    preq = args.query
    reg = None
    
    selection_mapper = []
    has_select = False
    if args.l2a:    
        args.l2a, l2a_select = parse_algorithm_selection(args.l2a, 'l2a')
        selection_mapper += l2a_select
        has_select |= ('selected_algorithm' in args.l2a)
    if args.l2b:
        args.l2b, l2b_select = parse_algorithm_selection(args.l2b, 'l2b') 
        selection_mapper += l2b_select
    if args.l4a:
        args.l4a, l4a_select = parse_algorithm_selection(args.l4a, 'l4a')
        selection_mapper += l4a_select
        has_select |= ('selected_algorithm' in args.l4a)        
    if len(selection_mapper) > 0 and not has_select:
        if args.l2a:
            args.l2a.append('selected_algorithm')
        else:
            args.l2a = ['selected_algorithm']
    
               
    if args.region is not None:
        print("## -- loading region of interest")
        if not os.path.isfile(args.region):
            try:
                countries = gdr.execute_query(f"select iso3, geometry as geom from fia.countries where iso3 = '{ args.region.upper() }'", True)
                if len(countries) > 0:
                    reg = countries
                else:
                    sys.exit(f"## -- file not found: {args.region}")
            except:
                sys.exit("## -- no access to the postgres database")
        elif args.region.lower().endswith('parquet') or args.region.lower().endswith('pq') or args.region.lower().endswith('parq'):
            reg = gpd.read_parquet(args.region)
        elif args.region.lower().endswith('kml'):
            import fiona
            fiona.drvsupport.supported_drivers['KML'] = 'rw'
            reg = gpd.read_file(args.region, driver='KML')
        elif args.region.lower().endswith('tif'):
            import rioxarray
            from shapely.geometry import box
            with rioxarray.open_rasterio(args.region) as img:
                reg = gpd.GeoSeries([box(*img.rio.bounds())], crs=img.rio.crs)                        
        else:
            reg = gpd.read_file(args.region)
    
    if args.quality:
        if preq is None:
            preq = 'quality_flag'
        else:
            preq += ' and quality_flag'
            
        if anci is None:
            anci = {"quality_flags":["quality_flag"]}
        elif 'quality_flags' not in anci:
            anci['quality_flags'] = ['quality_flag']
        elif 'quality_flags' in anci and 'quality_flag' not in anci['quality_flags']:
            anci['quality_flags'] += ['quality_flag']
    
    if args.geo:
        has_geo = False
        if args.l2a:
            has_geo |= ('lat_lowestmode' in args.l2a and 'lon_lowestmode' in args.l2a)
        if args.l2b:
            has_geo |= ('lat_lowestmode' in args.l2b and 'lon_lowestmode' in args.l2b)
        if args.l4a:
            has_geo |= ('lat_lowestmode' in args.l4a and 'lon_lowestmode' in args.l4a)
        
        if not has_geo:
            if args.l2a:
               args.l2a += ['lon_lowestmode', 'lat_lowestmode']
            elif args.l2b:
                args.l2b += ['lon_lowestmode', 'lat_lowestmode']
            elif args.l4a:
                args.l4a += ['lon_lowestmode', 'lat_lowestmode']
                
    if args.time_start or args.time_end:
        has_time = False
        l2b_time = False
        tvar = 'delta_time'
        
        if args.l2a:
            has_time |= ('delta_time' in args.l2a)
        elif args.l2b:
            l2b_time = ('geolocation/delta_time' in args.l2b)
            has_time |= l2b_time
        elif args.l4a:
            has_time |= ('delta_time' in args.l4a)
            
        if not has_time:
            if args.l2a:
               args.l2a += ['delta_time']
            elif args.l2b:
                l2b_time = True
                args.l2b += ['geolocation/delta_time']
            elif args.l4a:
                args.l4a += ['delta_time']
                
        if l2b_time:
            tvar = '`geolocation/delta_time`'
        
        t_query = None                
        if args.time_start:
            t0 = datetime.strptime(args.time_start, '%Y-%m-%d').timestamp() - gdr.START_DATE.timestamp()
            t_query = f"{tvar} > {t0}"
        
        if args.time_end:
            t1 = datetime.strptime(args.time_end, '%Y-%m-%d').timestamp() - gdr.START_DATE.timestamp()
            if t_query:
                t_query += f" and {tvar} < {t1}"
            else:
                t_query = f"{tvar} < {t1}"        
            
        if preq is None:
            preq = t_query
        else:
            preq += ' and ' + t_query
    
    if args.rh is not None:
        if args.rh[0] < 0:
            args.rh = list(range(101))

        rh_cols = [f'geolocation/rh_a{i}_{j:03d}' for i in ALRORITHM_SETTINGS for j in args.rh]        
        
        if args.l4a is None or 'selected_algorithm' not in args.l4a:
            rh_cols.append('selected_algorithm')        
        args.l2a = list(rh_cols) if args.l2a is None else list(set(args.l2a + rh_cols))
    
    print("## -- reading distributed data frame")
    gdf = gh3.load_gedi(l2a=args.l2a, l2b=args.l2b, l4a=args.l4a, anci=anci, square_tiles=args.egi, region=reg)
    
    if len(selection_mapper) > 0:
        print("## -- applying algorithm selection on flagged variables")
        gdf['algo'] = gdf.selected_algorithm.apply(lambda x: x if x in ALRORITHM_SETTINGS else 10)
        for i in selection_mapper:
            gdf[i['out']] = 0
            for a in ALRORITHM_SETTINGS:
                a_var = i['in'].replace('*', str(a))
                gdf[i['out']] += (gdf.algo == a) * (gdf[a_var])
            gdf = gdf.drop(columns=i['merge'])
    
    if args.rh is not None:
        print("## -- selecting RH metrics")
        keep_vars = [i for i in gdf.columns if i not in rh_cols]
        if 'algo' not in gdf.columns: 
            gdf['algo'] = gdf.selected_algorithm.apply(lambda x: x if x in ALRORITHM_SETTINGS else 10)
        meta = gh3.rh_filter(gdf.head(), keep=keep_vars, rh_list=args.rh, algo_column='algo')
        gdf = gdf.map_partitions(gh3.rh_filter, keep=keep_vars, rh_list=args.rh, algo_column='algo', meta=meta)
        
    if 'algo' in gdf.columns:
        gdf = gdf.drop(columns='algo')
        
    if preq is not None:
        print("## -- applying data filters")
        gdf = gdf.query(preq)

    if args.geo:
        print("## -- scheduling georreferencing")
        gdf = gdf.assign(geometry = dkg.points_from_xy(gdf, x='lon_lowestmode', y='lat_lowestmode', crs=4326))
        gdf = dkg.from_dask_dataframe(gdf)
        if reg is not None:
            gdf = gdf.clip(reg.to_crs(4326))

    print("## -- loading and exporting data")        
    opath = args.output
    if args.merge:
        opath = re.sub('/*$', '', opath)
        
        gdf = gdf.persist()
        progress(gdf)
        print('')
        
        gdf = gdf.compute()
        if not opath.endswith(f'.{args.format}'):
            opath += f'.{args.format}'
        if args.geo and args.format not in ['parq', 'parquet', 'pq']:
            gdf.to_file(opath)
        else:
            gdf.to_parquet(opath, engine='pyarrow')
    else:
        files = gh3.export_parts(gdf, opath, fmt=args.format)
        files = files.persist()
        progress(files)
        print('')

    try:
        client.close()
    except:
        pass
    
    sys.exit("## -- DONE")

Sorry for late reply. Here are the codes for the main function. Hopefully, this will help us to resolve the issue. The command needs to run one hour for global processing.

Hi @linxiongumd,

Sorry it took so long to reply.

This code snippet is really too complex for us to understand things. Moreover, I don’t have any idea of where in this code the error is raised.

The only advice I can give here is to try to build some minimum reproducible example, or at least trying to remove part of your code to see what is causing the failure when the dataset gets bigger.

Got it. Thanks for the suggestion.