Implementing chunk-wise gufuncs

How do I go about creating a ufunc that takes chunks as an input and returns chunk-wise statistics as an output?

import numpy as np
import dask.array as da
import dask a = da.random.normal(size=(10,20,30), chunks=(5, 10, 30)) @da.as_gufunc("(i,j,k)->(i,j,k),()", output_dtypes=(a.dtype, float), allow_rechunk=False)
def stats(x):
print(x.shape)
return x+1, np.sum(x) b = stats(a)
c = b[0].compute()
d = b[1].compute()
print(c)
print(d)

For instance, here I want to c==a+1, but I want d to be of shape (2, 2, 1)

My particular use case also prevents me from rechunking “a” into a single chunk

@jasonkena Welcome!

Does it need to be a ufunc? If yes, I’ll keep looking into this!

You can also use Dask Array’s map_blocks to keep track of chunk information:

import dask.array as da

my_arr = da.random.normal(size=(10,20,30), chunks=(5, 10, 30))

def func(block, block_info=None):
    print(f"chunk location = {block_info[0]['chunk-location']}")
    print(f"chunk shape = {block_info[None]['chunk-shape']}\n")
    return block

x = my_arr.map_blocks(func, dtype='float64').compute()
# chunk location = (0, 0, 0)
# chunk shape = (5, 10, 30)

# chunk location = (0, 1, 0)
# chunk shape = (5, 10, 30)

# chunk location = (1, 0, 0)
# chunk shape = (5, 10, 30)

# chunk location = (1, 1, 0)
# chunk shape = (5, 10, 30)

Would this help?

Thank you @pavithraes! I ended up using numpy object arrays to handle ragged outputs like so:

def ragged_func(x, block_info=None):
    print(block_info)
    a = np.empty(1,dtype=object)
    a[0] = np.arange(np.random.randint(1, 7))
    return a.reshape(1,1)
1 Like