Map_blocks unexpected behavior adds rows to dim when specifying chunks

I have a 3 dimensional array that I send to map_blocks, do some manipulations on the 3rd dimension and alter its shape. Accordingly, I specify chunks in map_blocks.

The shape of the output is a multiple of the chunksize for the 1st dimension, whereas it should remain unchanged from the input. I have also tried adding the drop_axis=2 and new_axis=2, but it doesn’t get the job done.

import dask.array as da
import numpy as np

n_cols = 12
n_params = 11
n_rows = 104599

test_arr = da.random.uniform(low=0, high=1, size=(n_rows, n_cols, n_params), chunks=(100, n_cols, n_params)).astype('float32')

def custom_func_dummy(inp):
    i_arrays = []
    for i in range(inp.shape[0]):
        j_arrays = []
        for j in range(inp.shape[1]):
            res = inp[i,j,:] * 5 - 4
            res = res.sum()
            repeat_res = np.repeat(res, 8)
            j_arrays.append(repeat_res)
        j_stack = np.stack(j_arrays, axis=0)
    i_arrays.append(j_stack)
    res = np.stack(i_arrays, axis=1)
    return res

test_arr.shape
(104599, 12, 11)

testing = da.map_blocks(custom_func_dummy, test_arr, chunks=(100,12, 8), dtype='float32')

testing.shape
(104600,12, 8)

The custom_function adequately approximates what I’m doing in my actual code for the purpose of this issue.

How do I get the output shape to be (104599, 12, 8)?

By the way, I have a hunch that if I run compute on this (if it weren’t for the memory issues that I would encounter in my code, where the 3rd dimension is >80K), then I would get the right shape for that dimension. However, the problem is that this is only the first step of the code. Later on, I’m stacking outputs from different manipulations based on these results before doing compute. The shapes on those objects don’t match up by 1 row, so I need to resolve this issue here.

Thanks a lot for the detailed post and the reproducer. I tried to play a bit with it but found no solution yet.

I’m a bit concerned, because according to the documentation, you should be able to specify the list of all the output chunks shape using the chunks kwarg. But I did not managed to make it work, and looking at the source code, I’m not sure if this works anymore.

For reference, in https://docs.dask.org/en/stable/generated/dask.array.map_blocks.html:

If the function changes shape of the blocks then you must provide chunks explicitly.

y = x.map_blocks(lambda x: x[::2], chunks=((2, 2),))

You have a bit of freedom in specifying chunks. If all of the output chunk sizes are the same, you can provide just that chunk size as a single tuple.

And in Overlapping Computations — Dask documentation

If your function does not preserve the shape of the block, then you will need to provide a chunks keyword argument. If your block size is regular, then this argument can take a block shape of, for example, (1000, 1000). In case of irregular block sizes, it must be a tuple with the full chunks shape like ((1000, 700, 1000), (200, 300)).

So I’ve tried:

testing = da.map_blocks(custom_func_dummy, test_arr, chunks=((100,12, 8),)*1045+((99,12, 8),), dtype='float32')

But got an error:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In [24], line 1
----> 1 testing = da.map_blocks(custom_func_dummy, test_arr, chunks=((100,12, 8),)*1045+((99,12, 8),), dtype='float32')
      2 testing

File /softs/rh7/conda-envs/pangeo_stable/lib/python3.9/site-packages/dask/array/core.py:873, in map_blocks(func, name, token, dtype, chunks, drop_axis, new_axis, enforce_ndim, meta, *args, **kwargs)
    857     out = blockwise(
    858         apply_and_enforce,
    859         out_ind,
   (...)
    870         **kwargs,
    871     )
    872 else:
--> 873     out = blockwise(
    874         func,
    875         out_ind,
    876         *concat(argpairs),
    877         name=name,
    878         new_axes=new_axes,
    879         dtype=dtype,
    880         concatenate=True,
    881         align_arrays=False,
    882         adjust_chunks=adjust_chunks,
    883         meta=meta,
    884         **kwargs,
    885     )
    887 extra_argpairs = []
    888 extra_names = []

File /softs/rh7/conda-envs/pangeo_stable/lib/python3.9/site-packages/dask/array/blockwise.py:269, in blockwise(func, out_ind, name, token, dtype, adjust_chunks, new_axes, align_arrays, concatenate, meta, *args, **kwargs)
    267 elif isinstance(adjust_chunks[ind], (tuple, list)):
    268     if len(adjust_chunks[ind]) != len(chunks[i]):
--> 269         raise ValueError(
    270             f"Dimension {i} has {len(chunks[i])} blocks, adjust_chunks "
    271             f"specified with {len(adjust_chunks[ind])} blocks"
    272         )
    273     chunks[i] = tuple(adjust_chunks[ind])
    274 else:

ValueError: Dimension 1043 has 1046 blocks, adjust_chunks specified with 3 blocks

I’m not sure this is expected and the doc should be updated, or if it is a bug.

1 Like

I would recommend you to open an issue on Dask github with the reproducer and what I’ve tried in the answer.

1 Like