# Using da.map_overlap in unstructured interpolation

I am using depth argument in the function da.map_overlap to provide the buffer boundary for my interpolation method usage. However, some situations happened that I didn’t know how to solve. Could anyone help me with this?

Question 01: cannot directly using .compute()
Question 02: cannot trim the buffer

Please see my code below.

Code:

Step 01: Convert to dask.array

``````slon = da.from_array(lon, chunks=(400,400))
slat = da.from_array(lat, chunks=(400,400))
sdat = da.from_array(data,chunks=(400,400))
``````

Step 2: define a function for map_overlap use

``````def tantan2(lon,lat,data,  hex_res=10):

#-- Generate the parameters for h3 and shapely.geometry usage.
hex_col = 'hex' + str(hex_res)
lon_max, lon_min = lon.max(), lon.min()
lat_max, lat_min = lat.max(), lat.min()

#-- Using shapely.geometry.box to generate domain bounds for h3
b = box(lon_min, lat_min, lon_max, lat_max, ccw=True)
b = transform(lambda x, y: (y, x), b)
b = mapping(b)

#-- Using Pandas to generate h3 hexagonal grid with associated long/lat
target_df = pd.DataFrame(h3.polyfill( b, hex_res), columns=[hex_col])

target_df['lat'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x))
target_df['lon'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x))
tlon, tlat = target_df[['lon','lat']].values.T

#-- Using the input lon/lat and generated h3-lon/lat for interpoaltion
abc = lNDI(points=(lon.ravel(), lat.ravel()),
values= data.ravel())(tlon,tlat)

#-- stack target h3-lon/lat and interpolation output and send it to return.
dec = np.stack([tlon, tlat, abc],axis=1)
return dec
``````

Step 3: apply da.map_overlap

``````c = da.map_overlap(tantan2,
slon,
slat,
data,
depth=10,
trim=True,
boundary=None,
align_arrays=False,
dtype='float64',
)
``````

It looks like my output is changing the shape compared with the input array; therefore, my output always returns an empty array to me when trim=True. As such, because of the empty array, I kept receiving errors when using .compute().

In addition, if I turn the trim=False, the .compute() function is still not working. What I believe is because of the changing shape.

ValueError: could not broadcast input array from shape (1090,3) into shape (1065,3)

Any possible solution to trim the points in the buffer region?

Hi @cyhsu,

Have you tried using the keyword parameter `chunks` to `map_overlap()`?

The docs describe it as follows:

`chunks`: Chunk shape of resulting blocks if the function does not preserve shape. If not provided, the resulting array is assumed to have the same block structure as the first input array.

Though originally, a `map_blocks()` keyword parameter, I expect that it can be used in `map_overlap()` too, since its docstring says so about the `kwargs` input parameter:

**kwargs: Other keyword arguments valid in `map_blocks`

1 Like

Thanks for the response. Oh, yeah, I read it but haven’t tried it. I am not quite understood the meaning of that when testing the trim=True. Well, it sounds working if I turned the trim=False. Let me try it. The reply here described the testing result when adding chunks argument.

Reconstruct the function tantan3 with block_info argument to print out the block information.

``````def tantan3(lon,lat,data, hex_res=12, block_info=None):
print(block_info)
hex_col = 'hex' + str(hex_res)
lon_max, lon_min = lon.max(), lon.min()
lat_max, lat_min = lat.max(), lat.min()

b = box(lon_min, lat_min, lon_max, lat_max, ccw=True)
b = transform(lambda x, y: (y, x), b)
b = mapping(b)

target_df = pd.DataFrame(h3.polyfill( b, hex_res), columns=[hex_col])

target_df['lat'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x))
target_df['lon'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x))
tlon, tlat = target_df[['lon','lat']].values.T

abc = lNDI(points=(lon.ravel(), lat.ravel()),
values= data.ravel())(tlon,tlat)
target_df['interp'] = abc
dec = np.stack([tlon, tlat, abc],axis=1)
return dec
``````

Here, I added chunks=(400,1), since my outputs is in the shape of (xxxxx, 3)

``````z1 = da.map_overlap(tantan3,
slon[:1200,:1200], slat[:1200,:1200], data[:1200,:1200],
depth=10,trim=False,dtype=slon.dtype, align_arrays=False, chunks=(400,1)
)
``````

here is the printing output for first partition.

``````z1.to_delayed().flatten().compute()
``````

{0: {‘shape’: (1260, 1260), ‘num-chunks’: (3, 3), ‘array-location’: [(0, 420), (0, 420)], ‘chunk-location’: (0, 0)}, 1: {‘shape’: (1260, 1260), ‘num-chunks’: (3, 3), ‘array-location’: [(0, 420), (0, 420)], ‘chunk-location’: (0, 0)}, 2: {‘shape’: (1260, 1260), ‘num-chunks’: (3, 3), ‘array-location’: [(0, 420), (0, 420)], ‘chunk-location’: (0, 0)}, None: {‘shape’: (1200, 3), ‘num-chunks’: (3, 3), ‘array-location’: [(0, 400), (0, 1)], ‘chunk-location’: (0, 0), ‘chunk-shape’: (400, 1), ‘dtype’: dtype(‘float64’)}}
array([[ -47.81871841, -18.99560999, 2497.00400696],
[ -47.82417923, -18.97345468, 1983.57848172],
[ -47.83918928, -18.98161255, 1496.25165668],
…,
[ -47.8321107 , -18.98878508, 1819.65346035],
[ -47.82419657, -18.99904944, 1918.54184067],
[ -47.84360658, -18.98887746, 2446.75140221]])

The pretty dict info of block is reprint here.
{0: {‘shape’: (1560, 1560),
‘num-chunks’: (3, 3),
‘array-location’: [(0, 520), (0, 520)],
‘chunk-location’: (0, 0)},

1: {‘shape’: (1560, 1560),
‘num-chunks’: (3, 3),
‘array-location’: [(0, 520), (0, 520)],
‘chunk-location’: (0, 0)},

2: {‘shape’: (1560, 1560),
‘num-chunks’: (3, 3),
‘array-location’: [(0, 520), (0, 520)],
‘chunk-location’: (0, 0)},

None: {‘shape’: (1200, 1200),
‘num-chunks’: (3, 3),
‘array-location’: [(0, 400), (0, 400)],
‘chunk-location’: (0, 0),
‘chunk-shape’: (400, 400),
‘dtype’: np.array(, dtype=np.float64)}}

and the errors

@ParticularMiner still cannot do it.

@cyhsu

• If your outputs are in the shape of `(xxxxx, 3)` then the `chunks` parameter for `map_overlap()` should read `chunks=(xxxxx, 3)`. Why did you put `chunks=(400, 1)`?

• Thank you for the full traceback error log. Now I understand the origin of the error. The error means that your chunk-function `tantan3()` is returning chunks of incompatible shapes. In one instance, `tantan3()` returned a chunk shape of `(52188, 3)`, and in another instance it returned a chunk (to the right of the former) with a shape of `(53433, 3)`. Obviously, one cannot concatenate these two chunks into a 2D-array. So you need to amend your chunk function `tantan3()` so that along any row of blocks, `shape` is the same for all the blocks; and along any column of blocks, `shape` is the same.

I hope this helps.

I see. my issue is that I cannot have the same size of the axis-0. I guess I should just go with delayed( da.concatenate)([ inputs for inputs in z1.to_delayed().flatten()]).

In addition to this, I would like to ask one more question. When I put the argument depth=10, is this meaning that each partition will pull 5 data points from each boundary? for example, if array 10 is in the shape of [10, 10], it will pull 5 more points on the left boundary, and 5 more points on the right boundary, and the same for upper and lower boundaries? And if the partition is on the left-upper corner (in the position of [0,1]), the 5 more points on the left and upper boundaries, are filling up with NaN instead by default (since there are no more data points at that location)?

Good idea. But it is better not to delay `da.concatenate()` since it is itself delayed. You can rather do

``````z1 = da.concatenate(z1.blocks.ravel())
``````

Unfortunately, the `map_overlap()` function is not sufficiently documented. But the following code-snippet and its output should answer your questions:

``````import dask.array as da

def func(x, block_info=None):
loc = 'chunk-location'
print(f'\nx{block_info[loc]} =\n {x}')
return x

x = da.arange(8).reshape(2, 4).rechunk((1, 2))
print(f'x = \n{x.compute()}')
da.map_overlap(func, x, depth=1, meta=np.array([[]]), dtype=x.dtype).compute()
``````
``````x =
[[0 1 2 3]
[4 5 6 7]]

x(0, 0) =
[[0 0 1 2]
[0 0 1 2]
[4 4 5 6]]

x(1, 0) =
[[0 0 1 2]
[4 4 5 6]
[4 4 5 6]]

x(0, 1) =
[[1 2 3 3]
[1 2 3 3]
[5 6 7 7]]

x(1, 1) =
[[1 2 3 3]
[5 6 7 7]
[5 6 7 7]]
``````

Is using

z1 = da.concatenate(z1.blocks.ravel())

faster than the

delayed(da.concatenate)([ z1.to_delayed().flatten()[idx] for idx in range(z1.npartitions)])

It is somewhat weird to me that when I used delayed function for concat, it takes lots of time to complete the mission recently. This is not making sense to me.

chunks={‘x’:100, ‘y’:100}
file = ‘./Sentinel2_23KKU_Barest_B08_BSI.tif’
df = open_tiff(file, chunks=chunks).isel(x=slice(6000,10000), y = slice(4000,8000))

X, Y = np.meshgrid(df[‘x’], df[‘y’])
xloc = da.from_array(X, chunks=(100,100))
yloc = da.from_array(Y, chunks=(100,100))
data = df.isel(band=0).data

e = da.map_overlap(tantan4, xloc, yloc, data, crs=df.crs,
depth=10, trim=False, dtype=xloc.dtype,
align_arrays=False)

e2 = delayed(da.concatenate)([e.to_delayed().flatten()[idx] for idx in range(e.npartitions)])

@cyhsu

`da.concatenate()` returns a `dask` array, which is a lazy object (that is, its computation is delayed). So using it as an argument to `delayed` is not recommended since that would create a twice-delayed result (see the docs on best practices). It would also mean you have to call `compute()` twice: the first time to return the `dask` array from `da.concatenate()`, and a second time to compute the `dask` array.

Using `da.concatenate(e.blocks.ravel())` avoids all these problems and yes, it is much faster.

@ParticularMiner

I cannot do it since e doesn’t have the ravel() attribute.

e.blocks.ravel()

``````---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/tmp/ipykernel_412/225947827.py in <module>
----> 1 e.blocks.ravel()

AttributeError: 'IndexCallable' object has no attribute 'ravel'
``````

Thanks for your information. But somehow why do I feel dask.dataframe processing is faster than the dask.array. Am I wrong?

@cyhsu

Hmm … that is strange. I checked this myself where `e` was the return value of `map_overlap()`, and it works very well. Perhaps you’re using an older version of `dask`?

The `dask.dataframe` module typically addresses different issues from those addressed by `dask.array`. And those issues that `dask.array` addresses can usually be done orders of magnitude faster by `dask.array` than by `dask.dataframe`. See for example this post and those before it. It takes a bit of experience, but I’m sure you’ll soon see that to be true for yourself. @ParticularMiner

Interestingly saying that, my version is ‘2020.12.0’. Not very new, but it’s new enough to have this feature I believe.

@cysu

It turns out your version is too old. I’ve just checked and found out that version `2020.12.20` does not have the `.ravel()` feature. I hope you have access to the latest version to install it.

2 Likes

@ParticularMiner

I updated the new library and tried to run the blocks. But I met an error message like below.

Do you know what happened here?

PS01: data/xloc/yloc are like this PS02: If I chunk the array into (800,800), it is working.

@cyhsu

Ah yes, `xloc.shape = 4000` of your data is not is not divisible by a chunk-size of 1600 but is divisible by 800; and `concatenate()` needs `.shape` to be the same for all the blocks if it is concatenating along axis 0.

You have to be aware that if `.shape` is not the same for all the blocks then the result of `concatenate()` is strictly not an array. Perhaps you would want to use a different data-structure for `e1`, like `dask.bag` for instance.

Still if you want to force `concatenate()` to work despite the fact that the result will not be an array, then perhaps you can pass the parameter `chunks=1600` into `map_overlap()`. But if that works, the consequences could be “disastrous” later on depending on how `e1` is used after this because of the internal inconsistency between expected and actual chunk sizes.

1 Like

@ParticularMiner

Never have experience in bag function. I guess I will just stick at blocks or delayed and look for bag in the future. Sorry I readdressed this issue after coupled months. In the previous discussion, we already solved the issue that is based on the 2-dimensional dataset, saying xloc – 2d, yloc – 2d, and data – 2d.

However, when I tried to unstack the xarray into “unstructured dataset”, I received the similar errors

This error is really interesting because when I got back to 2020.12.0 version, it is working.

So, now,
if I wanna use latest version, 2022.5.1, then the unstructured dataset is not working, while if I wanna use version 2020.12.0, then the structured dataset is not working (because there is no ravel() function existed.

Do you have any idea why this is happened?

Following is the code I used.

``````def interp(xloc, yloc ,data, hex_res=10, projection =  Union[CRS, str], method='linear'): #, block_info=None, block_id=None):
#     block_0 = block_info
#     print(block_info,'\n\n')
#     print(block_0,'\n\n')
#     print(block_id,'\n\n\n')

interpolator = check_structured(xloc, yloc, data, )

hex_col = 'hex' + str(hex_res)

lon, lat = projection.to_lonlat(xloc, yloc)
lon_max, lon_min = lon.max(), lon.min()
lat_max, lat_min = lat.max(), lat.min()

b = box(lon_min, lat_min, lon_max, lat_max, ccw=True)
b = transform(lambda x, y: (y, x), b)
b = mapping(b)

target_df = pd.DataFrame(h3.polyfill( b, hex_res), columns=[hex_col])

target_df['lat'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x))
target_df['lon'] = target_df[hex_col].apply(lambda x: h3.h3_to_geo(x))
tlon, tlat = target_df[['lon','lat']].values.T

xtag, ytag = projection.to_xy(tlon,tlat)
f = interpolator(xloc, yloc, data, method=method)
abc = np.squeeze([f(x, y) for x, y in zip(xtag, ytag)])
#     abc = f(xtag, ytag)

return np.squeeze(np.stack([xtag, ytag, tlon, tlat, abc],axis=1))
``````
``````def parallel_interp(df: Union[pd.DataFrame, xr.DataArray], projection=None,
method: str = 'bilinear', hex_res: int = 12, chunks: tuple = (100, 100),
depth: int = 10, trim: bool = False, align_arrays: bool = False):
"""
Parallel Interpolation By Using Dask Array
Usage:
interp_out = parallel(df)
"""
columns = df.columns.tolist()
columns = [item for item in columns if item not in ['x', 'y']]
if not projection:
raise ValueError(f'CRS/projection is not settle well.\n' +

if ('data' in columns) or (len(columns) == 1):
xloc = da.from_array(df['x'].values, chunks=chunks)
yloc = da.from_array(df['y'].values, chunks=chunks)
data = da.from_array(df[columns].values, chunks=chunks)
projection = projection
else:
raise ValueError(f'Cannot distinguish which column is data column.\n' +
f'Please specify the name of the data column as "data"')

e1 = da.map_overlap(interp,
xloc, yloc, data, projection=projection, hex_res=hex_res,
method=method, depth=depth, trim=trim,
align_arrays=align_arrays, dtype='float64').compute()

e1 = pd.DataFrame(e1, columns=['x', 'y', 'lon', 'lat', 'output'])
return e1[e1['output'] > 0].drop_duplicates(subset=['x', 'y']).sort_values(by=['x', 'y'])
``````
``````df = xr.open_rasterio(data_file, chunks = {'x':1000, 'y':1000}).isel(x=slice(8000,10000), y=slice(8000,10000)).sortby(['x','y'])
projection =  get_crs_projection(df)

method= 'bilinear'
hex_res = 12
depth, trim, align_arrays = 10, False, False

df = df.isel(band=0).to_pandas().unstack().reset_index()
chunks=(40000)
xloc, yloc, data = df['x'].values, df['y'].values, df.values
xloc = da.from_array(xloc, chunks=chunks)
yloc = da.from_array(yloc, chunks=chunks)
data = da.from_array(data, chunks=chunks)
interp_output = da.map_overlap(interp,
xloc, yloc, data, projection=projection, hex_res=hex_res,
method='linear', depth=depth, trim=trim,
align_arrays=align_arrays, dtype=np.array(()).dtype)
e1 = da.concatenate(interp_output.blocks.ravel()).compute()
``````