Using da.map_overlap in unstructured interpolation

I am using depth argument in the function da.map_overlap to provide the buffer boundary for my interpolation method usage. However, some situations happened that I didn’t know how to solve. Could anyone help me with this?

Question 01: cannot directly using .compute()
Question 02: cannot trim the buffer

Please see my code below.

Code:

Step 01: Convert to dask.array

slon = da.from_array(lon, chunks=(400,400))
slat = da.from_array(lat, chunks=(400,400))
sdat = da.from_array(data,chunks=(400,400))

Step 2: define a function for map_overlap use

def tantan2(lon,lat,data,  hex_res=10):

    #-- Generate the parameters for h3 and shapely.geometry usage.
    hex_col = 'hex' + str(hex_res)
    lon_max, lon_min = lon.max(), lon.min()
    lat_max, lat_min = lat.max(), lat.min()
    
    #-- Using shapely.geometry.box to generate domain bounds for h3
    b = box(lon_min, lat_min, lon_max, lat_max, ccw=True)
    b = transform(lambda x, y: (y, x), b)
    b = mapping(b)
    
    #-- Using Pandas to generate h3 hexagonal grid with associated long/lat
    target_df = pd.DataFrame(h3.polyfill( b, hex_res), columns=[hex_col])    

    target_df['lat'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x)[0])
    target_df['lon'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x)[1])
    tlon, tlat = target_df[['lon','lat']].values.T    

    #-- Using the input lon/lat and generated h3-lon/lat for interpoaltion
    abc = lNDI(points=(lon.ravel(), lat.ravel()), 
               values= data.ravel())(tlon,tlat)

    #-- stack target h3-lon/lat and interpolation output and send it to return.
    dec = np.stack([tlon, tlat, abc],axis=1)
    return dec

Step 3: apply da.map_overlap

c = da.map_overlap(tantan2, 
                   slon, 
                   slat, 
                   data, 
                   depth=10,
                   trim=True,
                   boundary=None,
                   align_arrays=False,
                   dtype='float64', 
                  )

It looks like my output is changing the shape compared with the input array; therefore, my output always returns an empty array to me when trim=True. As such, because of the empty array, I kept receiving errors when using .compute().

In addition, if I turn the trim=False, the .compute() function is still not working. What I believe is because of the changing shape.

ValueError: could not broadcast input array from shape (1090,3) into shape (1065,3)

Any possible solution to trim the points in the buffer region?

Hi @cyhsu,

Have you tried using the keyword parameter chunks to map_overlap()?

The docs describe it as follows:

chunks: Chunk shape of resulting blocks if the function does not preserve shape. If not provided, the resulting array is assumed to have the same block structure as the first input array.

Though originally, a map_blocks() keyword parameter, I expect that it can be used in map_overlap() too, since its docstring says so about the kwargs input parameter:

**kwargs: Other keyword arguments valid in map_blocks

1 Like

Hi @ParticularMiner

Thanks for the response. Oh, yeah, I read it but haven’t tried it. I am not quite understood the meaning of that when testing the trim=True. Well, it sounds working if I turned the trim=False. Let me try it. :slight_smile:

The reply here described the testing result when adding chunks argument.

Reconstruct the function tantan3 with block_info argument to print out the block information.

def tantan3(lon,lat,data, hex_res=12, block_info=None):
    print(block_info)
    hex_col = 'hex' + str(hex_res)
    lon_max, lon_min = lon.max(), lon.min()
    lat_max, lat_min = lat.max(), lat.min()
    
    b = box(lon_min, lat_min, lon_max, lat_max, ccw=True)
    b = transform(lambda x, y: (y, x), b)
    b = mapping(b)
    
    target_df = pd.DataFrame(h3.polyfill( b, hex_res), columns=[hex_col])    

    target_df['lat'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x)[0])
    target_df['lon'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x)[1])
    tlon, tlat = target_df[['lon','lat']].values.T    

    abc = lNDI(points=(lon.ravel(), lat.ravel()), 
               values= data.ravel())(tlon,tlat)
    target_df['interp'] = abc
    dec = np.stack([tlon, tlat, abc],axis=1)   
    return dec

Here, I added chunks=(400,1), since my outputs is in the shape of (xxxxx, 3)

z1 = da.map_overlap(tantan3, 
                   slon[:1200,:1200], slat[:1200,:1200], data[:1200,:1200], 
                   depth=10,trim=False,dtype=slon.dtype, align_arrays=False, chunks=(400,1)
                  )

here is the printing output for first partition.

z1.to_delayed().flatten()[0].compute()

{0: {‘shape’: (1260, 1260), ‘num-chunks’: (3, 3), ‘array-location’: [(0, 420), (0, 420)], ‘chunk-location’: (0, 0)}, 1: {‘shape’: (1260, 1260), ‘num-chunks’: (3, 3), ‘array-location’: [(0, 420), (0, 420)], ‘chunk-location’: (0, 0)}, 2: {‘shape’: (1260, 1260), ‘num-chunks’: (3, 3), ‘array-location’: [(0, 420), (0, 420)], ‘chunk-location’: (0, 0)}, None: {‘shape’: (1200, 3), ‘num-chunks’: (3, 3), ‘array-location’: [(0, 400), (0, 1)], ‘chunk-location’: (0, 0), ‘chunk-shape’: (400, 1), ‘dtype’: dtype(‘float64’)}}
array([[ -47.81871841, -18.99560999, 2497.00400696],
[ -47.82417923, -18.97345468, 1983.57848172],
[ -47.83918928, -18.98161255, 1496.25165668],
…,
[ -47.8321107 , -18.98878508, 1819.65346035],
[ -47.82419657, -18.99904944, 1918.54184067],
[ -47.84360658, -18.98887746, 2446.75140221]])

The pretty dict info of block is reprint here.
{0: {‘shape’: (1560, 1560),
‘num-chunks’: (3, 3),
‘array-location’: [(0, 520), (0, 520)],
‘chunk-location’: (0, 0)},

1: {‘shape’: (1560, 1560),
‘num-chunks’: (3, 3),
‘array-location’: [(0, 520), (0, 520)],
‘chunk-location’: (0, 0)},

2: {‘shape’: (1560, 1560),
‘num-chunks’: (3, 3),
‘array-location’: [(0, 520), (0, 520)],
‘chunk-location’: (0, 0)},

None: {‘shape’: (1200, 1200),
‘num-chunks’: (3, 3),
‘array-location’: [(0, 400), (0, 400)],
‘chunk-location’: (0, 0),
‘chunk-shape’: (400, 400),
‘dtype’: np.array(, dtype=np.float64)}}

and the errors

@ParticularMiner still cannot do it.

@cyhsu

  • If your outputs are in the shape of (xxxxx, 3) then the chunks parameter for map_overlap() should read chunks=(xxxxx, 3). Why did you put chunks=(400, 1)?

  • Thank you for the full traceback error log. Now I understand the origin of the error. The error means that your chunk-function tantan3() is returning chunks of incompatible shapes. In one instance, tantan3() returned a chunk shape of (52188, 3), and in another instance it returned a chunk (to the right of the former) with a shape of (53433, 3). Obviously, one cannot concatenate these two chunks into a 2D-array. So you need to amend your chunk function tantan3() so that along any row of blocks, shape[0] is the same for all the blocks; and along any column of blocks, shape[1] is the same.

I hope this helps.

I see. my issue is that I cannot have the same size of the axis-0. I guess I should just go with delayed( da.concatenate)([ inputs for inputs in z1.to_delayed().flatten()]).

In addition to this, I would like to ask one more question. When I put the argument depth=10, is this meaning that each partition will pull 5 data points from each boundary? for example, if array 10 is in the shape of [10, 10], it will pull 5 more points on the left boundary, and 5 more points on the right boundary, and the same for upper and lower boundaries? And if the partition is on the left-upper corner (in the position of [0,1]), the 5 more points on the left and upper boundaries, are filling up with NaN instead by default (since there are no more data points at that location)?

Good idea. But it is better not to delay da.concatenate() since it is itself delayed. You can rather do

z1 = da.concatenate(z1.blocks.ravel())

Unfortunately, the map_overlap() function is not sufficiently documented. But the following code-snippet and its output should answer your questions:

import dask.array as da


def func(x, block_info=None):
    loc = 'chunk-location'
    print(f'\nx{block_info[0][loc]} =\n {x}')
    return x


x = da.arange(8).reshape(2, 4).rechunk((1, 2))
print(f'x = \n{x.compute()}')
da.map_overlap(func, x, depth=1, meta=np.array([[]]), dtype=x.dtype).compute()
x = 
[[0 1 2 3]
 [4 5 6 7]]

x(0, 0) =
 [[0 0 1 2]
 [0 0 1 2]
 [4 4 5 6]]

x(1, 0) =
 [[0 0 1 2]
 [4 4 5 6]
 [4 4 5 6]]

x(0, 1) =
 [[1 2 3 3]
 [1 2 3 3]
 [5 6 7 7]]

x(1, 1) =
 [[1 2 3 3]
 [5 6 7 7]
 [5 6 7 7]]

Is using

z1 = da.concatenate(z1.blocks.ravel())

faster than the

delayed(da.concatenate)([ z1.to_delayed().flatten()[idx] for idx in range(z1.npartitions)])

It is somewhat weird to me that when I used delayed function for concat, it takes lots of time to complete the mission recently. This is not making sense to me.

chunks={‘x’:100, ‘y’:100}
file = ‘./Sentinel2_23KKU_Barest_B08_BSI.tif’
df = open_tiff(file, chunks=chunks).isel(x=slice(6000,10000), y = slice(4000,8000))

X, Y = np.meshgrid(df[‘x’], df[‘y’])
xloc = da.from_array(X, chunks=(100,100))
yloc = da.from_array(Y, chunks=(100,100))
data = df.isel(band=0).data

e = da.map_overlap(tantan4, xloc, yloc, data, crs=df.crs,
depth=10, trim=False, dtype=xloc.dtype,
align_arrays=False)

e2 = delayed(da.concatenate)([e.to_delayed().flatten()[idx] for idx in range(e.npartitions)])

@cyhsu

da.concatenate() returns a dask array, which is a lazy object (that is, its computation is delayed). So using it as an argument to delayed is not recommended since that would create a twice-delayed result (see the docs on best practices). It would also mean you have to call compute() twice: the first time to return the dask array from da.concatenate(), and a second time to compute the dask array.

Using da.concatenate(e.blocks.ravel()) avoids all these problems and yes, it is much faster.

@ParticularMiner

I cannot do it since e doesn’t have the ravel() attribute.

e.blocks.ravel()

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/tmp/ipykernel_412/225947827.py in <module>
----> 1 e.blocks.ravel()

AttributeError: 'IndexCallable' object has no attribute 'ravel'

Thanks for your information. But somehow why do I feel dask.dataframe processing is faster than the dask.array. Am I wrong?

@cyhsu

Hmm … that is strange. I checked this myself where e was the return value of map_overlap(), and it works very well. Perhaps you’re using an older version of dask?

The dask.dataframe module typically addresses different issues from those addressed by dask.array. And those issues that dask.array addresses can usually be done orders of magnitude faster by dask.array than by dask.dataframe. See for example this post and those before it. It takes a bit of experience, but I’m sure you’ll soon see that to be true for yourself. :slight_smile:

@ParticularMiner

Interestingly saying that, my version is ‘2020.12.0’. Not very new, but it’s new enough to have this feature I believe.

@cysu

It turns out your version is too old. I’ve just checked and found out that version 2020.12.20 does not have the .ravel() feature. I hope you have access to the latest version to install it.

2 Likes

@ParticularMiner

I updated the new library and tried to run the blocks. But I met an error message like below.

Do you know what happened here?

PS01: data/xloc/yloc are like this
Screen Shot 2022-04-22 at 9.06.24 AM

PS02: If I chunk the array into (800,800), it is working.

@cyhsu

Ah yes, xloc.shape[1] = 4000 of your data is not is not divisible by a chunk-size of 1600 but is divisible by 800; and concatenate() needs .shape[1] to be the same for all the blocks if it is concatenating along axis 0.

You have to be aware that if .shape[1] is not the same for all the blocks then the result of concatenate() is strictly not an array. Perhaps you would want to use a different data-structure for e1, like dask.bag for instance.

Still if you want to force concatenate() to work despite the fact that the result will not be an array, then perhaps you can pass the parameter chunks=1600 into map_overlap(). But if that works, the consequences could be “disastrous” later on depending on how e1 is used after this because of the internal inconsistency between expected and actual chunk sizes.

1 Like

@ParticularMiner

Never have experience in bag function. I guess I will just stick at blocks or delayed and look for bag in the future. :slight_smile:

Hi @ParticularMiner

Sorry I readdressed this issue after coupled months. In the previous discussion, we already solved the issue that is based on the 2-dimensional dataset, saying xloc – 2d, yloc – 2d, and data – 2d.

However, when I tried to unstack the xarray into “unstructured dataset”, I received the similar errors

This error is really interesting because when I got back to 2020.12.0 version, it is working.

So, now,
if I wanna use latest version, 2022.5.1, then the unstructured dataset is not working, while if I wanna use version 2020.12.0, then the structured dataset is not working (because there is no ravel() function existed.

Do you have any idea why this is happened?

Following is the code I used.

def interp(xloc, yloc ,data, hex_res=10, projection =  Union[CRS, str], method='linear'): #, block_info=None, block_id=None):
#     block_0 = block_info[0]
#     print(block_info,'\n\n')
#     print(block_0,'\n\n')
#     print(block_id,'\n\n\n')

    interpolator = check_structured(xloc, yloc, data, )
    
    hex_col = 'hex' + str(hex_res)
    
    lon, lat = projection.to_lonlat(xloc, yloc)
    lon_max, lon_min = lon.max(), lon.min()
    lat_max, lat_min = lat.max(), lat.min()
    
    b = box(lon_min, lat_min, lon_max, lat_max, ccw=True)
    b = transform(lambda x, y: (y, x), b)
    b = mapping(b)
    
    target_df = pd.DataFrame(h3.polyfill( b, hex_res), columns=[hex_col])    

    target_df['lat'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x)[0])
    target_df['lon'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x)[1])
    tlon, tlat = target_df[['lon','lat']].values.T    

    xtag, ytag = projection.to_xy(tlon,tlat)
    f = interpolator(xloc, yloc, data, method=method)
    abc = np.squeeze([f(x, y) for x, y in zip(xtag, ytag)])
#     abc = f(xtag, ytag)    

    return np.squeeze(np.stack([xtag, ytag, tlon, tlat, abc],axis=1))
def parallel_interp(df: Union[pd.DataFrame, xr.DataArray], projection=None,
                    method: str = 'bilinear', hex_res: int = 12, chunks: tuple = (100, 100),
                    depth: int = 10, trim: bool = False, align_arrays: bool = False):
    """
        Parallel Interpolation By Using Dask Array
        Usage:
            interp_out = parallel(df)
    """
    columns = df.columns.tolist()
    columns = [item for item in columns if item not in ['x', 'y']]
    if not projection:
        raise ValueError(f'CRS/projection is not settle well.\n' +
                         f'Please added the argument.')

    if ('data' in columns) or (len(columns) == 1):
        xloc = da.from_array(df['x'].values, chunks=chunks)
        yloc = da.from_array(df['y'].values, chunks=chunks)
        data = da.from_array(df[columns[0]].values, chunks=chunks)
        projection = projection
    else:
        raise ValueError(f'Cannot distinguish which column is data column.\n' +
                         f'Please specify the name of the data column as "data"')

    e1 = da.map_overlap(interp,
                        xloc, yloc, data, projection=projection, hex_res=hex_res,
                        method=method, depth=depth, trim=trim,
                        align_arrays=align_arrays, dtype='float64').compute()


    e1 = pd.DataFrame(e1, columns=['x', 'y', 'lon', 'lat', 'output'])
    return e1[e1['output'] > 0].drop_duplicates(subset=['x', 'y']).sort_values(by=['x', 'y'])
df = xr.open_rasterio(data_file, chunks = {'x':1000, 'y':1000}).isel(x=slice(8000,10000), y=slice(8000,10000)).sortby(['x','y'])
projection =  get_crs_projection(df)

method= 'bilinear'
hex_res = 12
depth, trim, align_arrays = 10, False, False

df = df.isel(band=0).to_pandas().unstack().reset_index()
chunks=(40000)
xloc, yloc, data = df['x'].values, df['y'].values, df[0].values
xloc = da.from_array(xloc, chunks=chunks)
yloc = da.from_array(yloc, chunks=chunks)
data = da.from_array(data, chunks=chunks)
interp_output = da.map_overlap(interp,
                               xloc, yloc, data, projection=projection, hex_res=hex_res,
                               method='linear', depth=depth, trim=trim,
                               align_arrays=align_arrays, dtype=np.array(()).dtype)
e1 = da.concatenate(interp_output.blocks.ravel()).compute()