I am trying to apply concurrency when computing trends on a (T x N) dask array.
# Reshaping xarray dataset in geographic space to (T x N)
arr = ds.values.reshape((len(YEARS), mask.size)).astype(np.float32)
# Extracting only valid pixels using a mask, then chunking
stack = da.from_array(arr[:,mask.astype(bool)])\
.rechunk({0: -1, 1: mask.sum() // (NUM_PROCESSES * 2)},
block_size_limit = 10e3)
result = stack.map_blocks(compute_trend, dtype = np.float32, drop_axis = 0)
trends = result.compute()
I can confirm this works fine with a trivial number of pixels (i.e., when mask.sum() is circa hundreds of pixels). However, when I apply this to the full dataset, I start to see warnings…
UserWarning: Sending large graph of size 18.50 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
warnings.warn(
I feel that I have carefully chunked the data into manageable pieces–even set an upper limit of 10 kb on each block size, so I am unclear why I get this warning. Also, I have been unable to find any examples of “scattering” or using futures when also using map_blocks(). There’s good dask documentation on scattering and futures, in general, but it’s unclear to me how to combine this with map_blocks(). The most relevant discourse topic has no code examples.