How to properly extract features of a large array on cluster?

Hi,

I want to apply a basic features extraction step on an array reprensenting epochs of a signal. The epochs are of shape: (n_epochs, n_channels, n_times). I saved the array so the chunksize is approximately 200MB. I have read the following docs (among others):

Now I was hoping that multiple processes would take one chunk at a time, extract my features, and get my results back. I thought I only needed to delay and compute to do so, but when testing my solution, it takes a really long time to execute so I guess I am missing something.

Here is my code:

import os
import numpy as np
from datetime import datetime
from distributed import Client
import dask
import dask.array as da

# my fake features extraction function
def my_features(x):
    """x is a flatten timeseries of shape (4096,)"""
    # we use numpy std for the example
    return np.array([np.mean(x), np.std(x)])

# create fake epochs of shape (n_epochs, n_channels, n_times)
epochs = da.random.random((100000, 2, 4096), chunks=(7800, 1, 4096))

results = []
for i_epoch in range(100): # when I use the full epochs.shape[0], it takes hours...
    epoch_res = []
    for i_channel in range(epochs.shape[1]):
        epoch_res.append(dask.delayed(my_features)(epochs[i_epoch,i_channel, :]))
    results.append(epoch_res)

results = np.array(dask.compute(*results))

First I thought compute() would do the job for me. Is it ?

I aslo tried using map() and explicitly adding a client:

client = Client(
    processes=True,
    threads_per_worker=4,
    n_workers=os.cpu_count() - 1,
    memory_limit="auto",
)

But nothing seems to work… I have looked into the persist() direction, but don’t see how to tell the compute to work on one chunk, persist it, do the computation, and release it.
So I definitely think that I am missing something…
Any help would be much appreciated ! Thanks

Ok, I think I just found exactly what I needed: map_blocks()

I struggled a bit using it (I had an IndexError resolved thanks to this post), and now I have my code working fine:

import os
import numpy as np
from distributed import Client
import dask
import dask.array as da

# my fake features extraction function
def my_features(x):
    """x is a flatten timeseries of shape (4096,)"""
    # we use numpy std for the example
    return np.array([np.mean(x), np.std(x)])

def extract_features(X):
    """apply the my_features features extraction along the right dimension"""
    X = X[:,0,:]
    return np.apply_along_axis(my_features, 1, X)[:, None, :]

# create fake epochs of shape (n_epochs, n_channels, n_times)
epochs = da.random.random((300000, 2, 4096), chunks=(7800, 1, 4096))

# optional but faster
client = Client(
    processes=True,
    threads_per_worker=4,
    n_workers=os.cpu_count() - 1,
    memory_limit="2GB",
)

# map blocks
results = da.map_blocks(extract_features, epochs)
res = results.compute()

Hope it helps someone.

1 Like