Da.random.permutation on masked array

Hi everyone,
please consider the following code snippet, where I create an array mask and use this mask to extract the indices of items matching my query. These indices are then permutated to form a random order:

raw = np.arange(4, dtype=np.int32).repeat(4)
arr = da.from_array(raw)
masked_array = da.ma.masked_equal(arr, 1)
permutation = da.random.permutation(masked_array.nonzero()[0])

Running this code results in the following error:

TypeError                                 Traceback (most recent call last)
undefined in <module>
      2 arr = da.from_array(raw)
      3 masked_array = da.ma.masked_equal(arr, 1)
----> 4 permutation = da.random.permutation(masked_array.nonzero()[0])

~\Miniconda3\envs\venv\lib\site-packages\dask\array\random.py in permutation(self, x)
    363             x = arange(x, chunks="auto")
--> 365         index = np.arange(len(x))
    366         self._numpy_state.shuffle(index)
    367         return shuffle_slice(x, index)

TypeError: 'float' object cannot be interpreted as an integer

I get no error if I do not call .nonzero()[0] on my masked_array, but then the entire array is permutated.
The issue is that in my original code the used array has ~ 3mio entries, but the valid entries are only ~300-500 items. So permutating only a small subset of 300 samples is therefore more efficient.
Also, I do not really understand the resulting error. My initial guess is that dask can not perform the operation as the resulting size/type etc. of the selection is unknown.
Any help is appreciated, thanks in advance.


Hi @jannk and welcome! Thank you for including the example, I was able to quickly and easily reproduce your problem. Still working on a solution, but there indeed seems to be something up with dask.array.nonzero(). Will update here soon with more info!

1 Like

After taking a closer look, the issue does come from dask.array.nonzero(), here is a nice explanation on why this happens. Using your example, a solution would be:

import dask.array as da
import numpy as np
import numpy.ma as ma

raw = np.arange(4, dtype=np.int32).repeat(4)
arr = da.from_array(raw)
masked_array = da.ma.masked_equal(arr, 1)
permutation = da.random.permutation(masked_array.nonzero()[0].compute_chunk_sizes())

The tricky part is, that error message is entirely unhelpful! Luckily, a contributor recently made a PR to improve it, so if you’re using Dask 2021.12.0 you’ll get:

ValueError: Cannot call len() on object with unknown chunk size.

A possible solution: https://docs.dask.org/en/latest/array-chunks.html#unknown-chunks
Summary: to compute chunks sizes, use

   x.compute_chunk_sizes()  # for Dask Array `x`
   ddf.to_dask_array(lengths=True)  # for Dask DataFrame `ddf`
1 Like

Hi @scharlottej13,
thank you very much for your effort and help.
I also had a look on some documentation and read, that when using a dask.array.Array which is put inside a delayed function, the function itself only receives the numpy equivalent of this array.
So instead of using dask.array.permutation (or da in general) inside my dask.delayed method, I am currently using only the numpy module.
Thanks again.

Hi @jannk! No problem, happy to help! To see if I understand, are you saying you were previously calling delayed on a Dask array? And now you’re able to just use the Dask array (and the associated Dask array methods) directly?

In my use case I have a set of IDs, where each ID corresponds to a person and a set of rows, where each row entry is the row/unique-id of a sample in the dataset. For each sample still in my dataset I want to match n other samples of the same ID and m samples of other IDs.
First I used the da.random.permutation on my masked rows of the same ID (roughly 50-300 samples) and than take the first n elements:

def permutation_example(ID, ids, rows, n):
    rows = rows[ID == ids]
    return da.random.permutation(rows)[:n]

ids = da.from_array(dataset.ids, chunks=10000)
rows = da.from_array(dataset.rows, chunks=10000)
data = permutation_example(2, ids, rows, 10).compute()

As this did not work I had a look on the inputs/outputs of a delayed function and the documentation and saw, that I get the input as a numpy array anyway.
So now I am using my toolbox, which uses numpy and numba, inside a delayed function and split the work over several workers. To reduce the overhead the computation is not split up for each ID of the ~8000 IDs in my dataset, but for a batch (whose optimal size I still have to tweak):

def get_pairs_for_id_batch(id_batch : np.array, ids : np.array, rows : np.array, n : int, m : int)
    data = np.array([], dtype=np.int32)
    for ID in id_batch:
        id_data = toolbox.sampling_function(ID, ids, rows, n, m)
        data = np.append(data, id_data)

    return data

ids     = da.from_array(dataset.ids, chunks=10000)
rows    = da.from_array(dataset.rows, chunks=10000)
batches = [da.from_array(b) for b in np.array_split(np.unique(ids),100)]

results = []
for id_batch in batches:
    results.append(get_pairs_for_id_batch(id_batch, ids, rows, n, m))

data = client.gather(client.compute(results))

Thanks @jannk for providing more information!

In your second snippet, it looks like you’re still passing a Dask array to get_pairs_for_id_batch, which is a Dask delayed. One recommended approach would be to instead use map_blocks to apply get_pairs_for_id_batch to each block in your Dask array.

Also, depending on the type of client you’re using, you may not need client.gather. If you’re not using an asynchronous client (the default) then you don’t need to wrap client.compute with client.gather.

Happy to help troubleshoot further, in which case more context on the bigger picture of what you’re doing would be helpful :).