Speeding up (indexed) column operations?

Thanks @chartl.

That’s all very clear now.

Sorry, this is my first time using the sparse library and earlier I had created a small (2 by 2) sparse matrix for the operation and that error had not occurred, presumably because it was too small.

You could try the following code snippet. Feel free to ask for clarification if you need it. A very similar example can be found at this link.

import numpy as np
import dask.array as da
import sparse

from functools import reduce


def _dot_column(y, x, a):
    return np.vstack([
        y.T.dot((x[:, i] < a[i]).todense())
        for i in range(x.shape[1])
    ]).T

    
def _sum(x, axis=None, keepdims=None):
    return reduce(np.add, x)


X_dense = da.from_array(np.random.binomial(5, 0.1, size=(100, 10)))
X = X_dense.map_blocks(sparse.COO)
a = X.mean(axis=0)
Y = X.copy()

batch_sz = 2   # tunable parallelism parameter

X = X.rechunk(chunks=(X.chunks[0], batch_sz))
a = a.rechunk(chunks=batch_sz)

counts = da.core.blockwise(
    *(_dot_column, 'ikj'),
    *(Y, 'ik'),
    *(X, 'ij'),
    *(a, 'j'),
    adjust_chunks={'i': 1},
    dtype=Y.dtype,
    meta=np.array([]),
)
counts = da.reduction(
    counts,
    lambda x, axis, keepdims: x,
    _sum,
    axis=0,
    concatenate=False,
    dtype=Y.dtype,
    meta=sparse.COO
)
counts.compute()
3 Likes