Which code is a appropriate to run “np.apply_along_axis” in Dask?

I assume time sereis data array like followings,

It can be done in Numpy, like np.apply_along_axis(filter, 1, x).

I tried various things, however it didn’t help process time well.

Which code is a appropriate to run “np.apply_along_axis” in Dask?

I assume time sereis data array like followings,

It can be done in Numpy, like np.apply_along_axis(filter, 1, x).

I tried various things, however it didn’t help process time well.

1 Like

Hi @Auto-Ken, welcome!

Dask does support `apply_along_axis()`

, so if you have existing code that uses `np.apply_along_axis()`

, it should be possible to translate it.

Can you provide a minimal reproducible example for what you are trying to accomplish?

Did it not work, or did it not have the performance characteristics you wanted? One thing that might be helpful: if the array is not chunked so that each strip you are applying along lives on the same block, then there will be some (potentially expensive) rechunking. So if you are careful in how the data is read in/prepared, you could get significant speedups.

Hello, I appriciate your quick response.

I’m sorry for taking time, got sample data for this.

Dataset DataFolder > eighthr.data

I couldn’t understand how “da.apply_along_axis” works, so I tried different way.

And I don’t care which function is used, as long as result is correnct and faster.

```
import dask
import dask.array as da
import dask.delayed as delayed
import numpy as np
import pandas as pd
import statsmodels.api as sm
df = pd.read_csv("data/eighthr.data", header=None, index_col=0).T
v = df.replace("?", 0.0).values
v = np.tile(v, (20, 1)) # bulk up, original data is too small
v = v.astype(np.float64)
def f(x):
# can get only 1d, and output same shape array
return sm.tsa.filters.hpfilter(x, 1600)[1]
@dask.delayed
def f_delayed(y):
return sm.tsa.filters.hpfilter(y, 1600)[1]
# numpy
v_f = np.apply_along_axis(f, 0, v) # Wall time: 2.49 s
da_v = da.from_array(v, chunks=(v.shape[0], 300))
```

```
# case01:
da.apply_along_axis(f_delayed, 0, da_v).compute() # error
[ValueError: could not broadcast input array from shape (1460,300) into shape (1460,)]
# case02:
da_v = da.from_array(v, chunks=(v.shape[0], 300))
l = []
for i in range(v.shape[1]):
l.append(f_delayed(da_v[:, i]))
l02 = dask.compute(*l) # Wall time: 3.06 s
# case 03:
da_v = da.from_array(v, chunks=(v.shape[0], 1))
blocks = da_v.to_delayed().ravel()
l = [da.from_delayed(f_delayed(b), shape=(np.nan, 1), dtype=np.int64) for b in blocks]
l03 = dask.compute(*l) # Wall time: 3.17 s
```

I tied also in 1 chuks size, not delayed function, combine them…

Hi @Auto-Ken, sorry for the slow response. I would avoid using `dask.delayed`

here, since your problem can already be fully described using `dask.array`

, and adding `delayed`

just complicates things.

The `ValueError`

you are seeing seems to be because something is failing in the shape inference for `apply_along_axis`

. Briefly: your function `f()`

could return an array of a different shape than what was fed into it. You can tell it what shape that will be by passing in `shape`

, and if you don’t it will try to infer it based on a test application of the function. This inference is getting it wrong, for reasons I’m unsure about.

You can get around that problem by passing in `shape`

manually:

```
import dask
import dask.array as da
import dask.delayed as delayed
import numpy as np
import pandas as pd
import statsmodels.api as sm
df = pd.read_csv("eighthr.data", header=None, index_col=0).T
v = df.replace("?", 0.0).values
v = np.tile(v, (200, 1)) # bulk up, original data is too small
v = v.astype(np.float64)
def f(x):
# can get only 1d, and output same shape array
return sm.tsa.filters.hpfilter(x, 1600)[1]
da_v = da.from_array(v, chunks=(v.shape[0], 300))
final = da.apply_along_axis(f, 0, da_v, dtype=da_v.dtype, shape=(da_v.shape[0],))
final.compute()
```

which produces the desired result.

Now, you are also interested in speed-ups. Keep in mind that there is some overhead to using Dask, in that there is a certain amount of time that needs to be spent serializing and sending chunks around to the various workers. So if the problem is not big enough, you will not see speed-ups.

With your tiling of 20x, I saw that raw numpy was winning. But when I jumped to 200x, Dask started to win. The specific performance characteristics will depend on your data and the size of your chunks.

Hope this helps!

1 Like

Hello, I appriciate your help. It works for me well.

It took about 1 hour to calculate for a month dataset, that should be enough big for dask.

Then calculation time **has decreased by half** in this setting. Awesome.

These parameter like shape argument to apply_along_axis is severe to set for user new to dask (like me).

Because my understanding is it’s not able to use debug function like “print” in dask.

Is there good resource to understand mechanism or the way to diagnosis?

Hi @Auto-Ken, sorry for the slow reply.

Good question! I don’t think it’s trivial to diagnose this issue. The API docs for `apply_along_axis`

recommend manually setting the `shape`

since the inference can get it wrong. When I look at the implementation, it seems that the test data is just a 1x1 array, so it’s unlikely to get the shape right unless the given function always returns a single value (like a sum or average).

In your case it seems that the filter operation needed a longer array than that to work (this is pretty common for filters, since they often will involve differences between adjacent values). When I looked at it, the traceback was pretty helpful for determining the issue, since it indicated that it came from the “inference” code path:

```
~/dask/dask/dask/array/routines.py in apply_along_axis(func1d, axis, arr, dtype, shape, *args, **kwargs)
485 if shape is None or dtype is None:
486 test_data = np.ones((1,), dtype=arr.dtype)
--> 487 test_result = np.array(func1d(test_data, *args, **kwargs))
488 if shape is None:
489 shape = test_result.shape
/tmp/ipykernel_96664/4136699301.py in f(x)
1 def f(x):
2 # can get only 1d, and output same shape array
----> 3 return sm.tsa.filters.hpfilter(x, 1600)[1]
~/miniconda3/envs/dask/lib/python3.8/site-packages/statsmodels/tsa/filters/hp_filter.py in hpfilter(x, lamb)
93 offsets = np.array([0, 1, 2])
94 data = np.repeat([[1.], [-2.], [1.]], nobs, axis=1)
---> 95 K = sparse.dia_matrix((data, offsets), shape=(nobs - 2, nobs))
96
97 use_umfpack = True
~/miniconda3/envs/dask/lib/python3.8/site-packages/scipy/sparse/dia.py in __init__(self, arg1, shape, dtype, copy)
125 dtype=get_index_dtype(maxval=max(shape)),
126 copy=copy))
--> 127 self._shape = check_shape(shape)
128 else:
129 #must be dense, convert to COO first, then to DIA
~/miniconda3/envs/dask/lib/python3.8/site-packages/scipy/sparse/sputils.py in check_shape(args, current_shape)
282 raise ValueError('shape must be a 2-tuple of positive integers')
283 elif new_shape[0] < 0 or new_shape[1] < 0:
--> 284 raise ValueError("'shape' elements cannot be negative")
285
286 else:
```