`dask.array.any` memory behavior

Hello!

import dask.array as da
import dask.distributed as dd
import numpy as np

arr = da.random.normal(size=(100_000, 100_000), chunks=(2000, 2000))
def is_neg(chunk):
    dd.print('running')
    return (chunk < 0).sum() > 0
arr.map_blocks(is_neg, drop_axis=(0, 1)).any().compute()

I would expect this to be able to run in any environment but it doesn’t. Basically, I want a non-negativity check for my data, and in theory, I believe this operation should be doable in a way that bails on the first chunk encountered with a negative value. But this gives a

MemoryError: Task ('any-is_neg-any-aggregate-24e0b3e9b637b5719096548e643f64b7',) has 74.51 GiB worth of input dependencies, but worker tcp://127.0.0.1:42023 has memory_limit set to 25.60 GiB.

If I reduce the size to (10_000, 10_000), dask will run the computation and bail out as expected, so seems like a memory check issue.

P.S the drop_axis business is funny but makes things work…not sure how else to describe things. I will look at that.

Something “correct” (hopefully) like

arr.map_blocks(is_neg, meta=np.array([], dtype=bool), chunks=tuple(tuple(1 for _ in chunk) for chunk in arr.chunks)).any().compute()

or

arr.map_blocks(is_neg, drop_axis=(0, 1), new_axis=(0, 1), meta=np.array([], dtype=bool), chunks=tuple(tuple(1 for _ in chunk) for chunk in arr.chunks)).any().compute()

seems to make things worse in that it tries running everything

Hi @ilan-gold,

I have to admit I still don’t really get this point, but map_blocks does not seem to be really working when you return something different than an array in the function output. I think this has been discussed several times, for example here. Using drop_axis seems to make things even worse by trying to load it all to memory.

Good news is, doing something like:

def is_neg(chunk):
    dd.print('running')
    return np.array([[(chunk < 0).sum() > 0]])

arr.map_blocks(is_neg).any().compute()

seems to work.

However, this does not return as soon as a negative value in a chunk is detected, which it seems is what you want?

I think for this, you should go through to_delayed and as_completed pattern.

Hi @guillaumeeb,
I will write up that route but I also tried that and didn’t have good results (it seemed tough to cancel the jobs properly if I remember). But yes, I would want immediate cancellation

You should be able to cancel any Delayed (at least not running). Let me know if you build a reproducer and have trouble with it!