Hello!
import dask.array as da
import dask.distributed as dd
import numpy as np
arr = da.random.normal(size=(100_000, 100_000), chunks=(2000, 2000))
def is_neg(chunk):
dd.print('running')
return (chunk < 0).sum() > 0
arr.map_blocks(is_neg, drop_axis=(0, 1)).any().compute()
I would expect this to be able to run in any environment but it doesn’t. Basically, I want a non-negativity check for my data, and in theory, I believe this operation should be doable in a way that bails on the first chunk encountered with a negative value. But this gives a
MemoryError: Task ('any-is_neg-any-aggregate-24e0b3e9b637b5719096548e643f64b7',) has 74.51 GiB worth of input dependencies, but worker tcp://127.0.0.1:42023 has memory_limit set to 25.60 GiB.
If I reduce the size to (10_000, 10_000)
, dask
will run the computation and bail out as expected, so seems like a memory check issue.
P.S the drop_axis
business is funny but makes things work…not sure how else to describe things. I will look at that.
Something “correct” (hopefully) like
arr.map_blocks(is_neg, meta=np.array([], dtype=bool), chunks=tuple(tuple(1 for _ in chunk) for chunk in arr.chunks)).any().compute()
or
arr.map_blocks(is_neg, drop_axis=(0, 1), new_axis=(0, 1), meta=np.array([], dtype=bool), chunks=tuple(tuple(1 for _ in chunk) for chunk in arr.chunks)).any().compute()
seems to make things worse in that it tries running everything
Hi @ilan-gold,
I have to admit I still don’t really get this point, but map_blocks does not seem to be really working when you return something different than an array in the function output. I think this has been discussed several times, for example here. Using drop_axis
seems to make things even worse by trying to load it all to memory.
Good news is, doing something like:
def is_neg(chunk):
dd.print('running')
return np.array([[(chunk < 0).sum() > 0]])
arr.map_blocks(is_neg).any().compute()
seems to work.
However, this does not return as soon as a negative value in a chunk is detected, which it seems is what you want?
I think for this, you should go through to_delayed
and as_completed
pattern.
Hi @guillaumeeb,
I will write up that route but I also tried that and didn’t have good results (it seemed tough to cancel the jobs properly if I remember). But yes, I would want immediate cancellation
You should be able to cancel any Delayed (at least not running). Let me know if you build a reproducer and have trouble with it!