Appropriate way of "np.apply_along_axis" in Dask

Which code is a appropriate to run “np.apply_along_axis” in Dask?
I assume time sereis data array like followings,

It can be done in Numpy, like np.apply_along_axis(filter, 1, x).

I tried various things, however it didn’t help process time well.

1 Like

Hi @Auto-Ken, welcome!

Dask does support apply_along_axis(), so if you have existing code that uses np.apply_along_axis(), it should be possible to translate it.

Can you provide a minimal reproducible example for what you are trying to accomplish?

Did it not work, or did it not have the performance characteristics you wanted? One thing that might be helpful: if the array is not chunked so that each strip you are applying along lives on the same block, then there will be some (potentially expensive) rechunking. So if you are careful in how the data is read in/prepared, you could get significant speedups.

Hello, I appriciate your quick response.
I’m sorry for taking time, got sample data for this.

Dataset DataFolder > eighthr.data

I couldn’t understand how “da.apply_along_axis” works, so I tried different way.
And I don’t care which function is used, as long as result is correnct and faster.

import dask
import dask.array as da
import dask.delayed as delayed

import numpy as np
import pandas as pd
import statsmodels.api as sm

df = pd.read_csv("data/eighthr.data", header=None, index_col=0).T
v = df.replace("?", 0.0).values
v = np.tile(v, (20, 1))  # bulk up, original data is too small
v = v.astype(np.float64)

def f(x): 
    # can get only 1d, and output same shape array
    return sm.tsa.filters.hpfilter(x, 1600)[1]  

@dask.delayed
def f_delayed(y):
    return sm.tsa.filters.hpfilter(y, 1600)[1]

# numpy
v_f = np.apply_along_axis(f, 0, v)   # Wall time: 2.49 s

da_v = da.from_array(v, chunks=(v.shape[0], 300))

image

# case01:
da.apply_along_axis(f_delayed, 0, da_v).compute()  # error
[ValueError: could not broadcast input array from shape (1460,300) into shape (1460,)]

# case02:
da_v = da.from_array(v, chunks=(v.shape[0], 300))
l = []
for i in range(v.shape[1]):
    l.append(f_delayed(da_v[:, i]))

l02 = dask.compute(*l)  # Wall time: 3.06 s

# case 03:
da_v = da.from_array(v, chunks=(v.shape[0], 1))
blocks = da_v.to_delayed().ravel()
l = [da.from_delayed(f_delayed(b), shape=(np.nan, 1), dtype=np.int64) for b in blocks]
l03 = dask.compute(*l)  # Wall time: 3.17 s

I tied also in 1 chuks size, not delayed function, combine them…

Hi @Auto-Ken, sorry for the slow response. I would avoid using dask.delayed here, since your problem can already be fully described using dask.array, and adding delayed just complicates things.

The ValueError you are seeing seems to be because something is failing in the shape inference for apply_along_axis. Briefly: your function f() could return an array of a different shape than what was fed into it. You can tell it what shape that will be by passing in shape, and if you don’t it will try to infer it based on a test application of the function. This inference is getting it wrong, for reasons I’m unsure about.

You can get around that problem by passing in shape manually:

import dask
import dask.array as da
import dask.delayed as delayed

import numpy as np
import pandas as pd
import statsmodels.api as sm

df = pd.read_csv("eighthr.data", header=None, index_col=0).T
v = df.replace("?", 0.0).values
v = np.tile(v, (200, 1))  # bulk up, original data is too small
v = v.astype(np.float64)

def f(x): 
    # can get only 1d, and output same shape array
    return sm.tsa.filters.hpfilter(x, 1600)[1]  


da_v = da.from_array(v, chunks=(v.shape[0], 300))
final = da.apply_along_axis(f, 0, da_v, dtype=da_v.dtype, shape=(da_v.shape[0],))
final.compute()

which produces the desired result.

Now, you are also interested in speed-ups. Keep in mind that there is some overhead to using Dask, in that there is a certain amount of time that needs to be spent serializing and sending chunks around to the various workers. So if the problem is not big enough, you will not see speed-ups.

With your tiling of 20x, I saw that raw numpy was winning. But when I jumped to 200x, Dask started to win. The specific performance characteristics will depend on your data and the size of your chunks.

Hope this helps!

1 Like

Hi @Auto-Ken , were you able to get your use-case working?

Hello, I appriciate your help. It works for me well.
It took about 1 hour to calculate for a month dataset, that should be enough big for dask.
Then calculation time has decreased by half in this setting. Awesome.

These parameter like shape argument to apply_along_axis is severe to set for user new to dask (like me).
Because my understanding is it’s not able to use debug function like “print” in dask.
Is there good resource to understand mechanism or the way to diagnosis?

Hi @Auto-Ken, sorry for the slow reply.

Good question! I don’t think it’s trivial to diagnose this issue. The API docs for apply_along_axis recommend manually setting the shape since the inference can get it wrong. When I look at the implementation, it seems that the test data is just a 1x1 array, so it’s unlikely to get the shape right unless the given function always returns a single value (like a sum or average).

In your case it seems that the filter operation needed a longer array than that to work (this is pretty common for filters, since they often will involve differences between adjacent values). When I looked at it, the traceback was pretty helpful for determining the issue, since it indicated that it came from the “inference” code path:

~/dask/dask/dask/array/routines.py in apply_along_axis(func1d, axis, arr, dtype, shape, *args, **kwargs)
    485     if shape is None or dtype is None:
    486         test_data = np.ones((1,), dtype=arr.dtype)
--> 487         test_result = np.array(func1d(test_data, *args, **kwargs))
    488         if shape is None:
    489             shape = test_result.shape

/tmp/ipykernel_96664/4136699301.py in f(x)
      1 def f(x):
      2     # can get only 1d, and output same shape array
----> 3     return sm.tsa.filters.hpfilter(x, 1600)[1]

~/miniconda3/envs/dask/lib/python3.8/site-packages/statsmodels/tsa/filters/hp_filter.py in hpfilter(x, lamb)
     93     offsets = np.array([0, 1, 2])
     94     data = np.repeat([[1.], [-2.], [1.]], nobs, axis=1)
---> 95     K = sparse.dia_matrix((data, offsets), shape=(nobs - 2, nobs))
     96 
     97     use_umfpack = True

~/miniconda3/envs/dask/lib/python3.8/site-packages/scipy/sparse/dia.py in __init__(self, arg1, shape, dtype, copy)
    125                                                           dtype=get_index_dtype(maxval=max(shape)),
    126                                                           copy=copy))
--> 127                     self._shape = check_shape(shape)
    128         else:
    129             #must be dense, convert to COO first, then to DIA

~/miniconda3/envs/dask/lib/python3.8/site-packages/scipy/sparse/sputils.py in check_shape(args, current_shape)
    282             raise ValueError('shape must be a 2-tuple of positive integers')
    283         elif new_shape[0] < 0 or new_shape[1] < 0:
--> 284             raise ValueError("'shape' elements cannot be negative")
    285 
    286     else: