Hi,
I want to apply a basic features extraction step on an array reprensenting epochs of a signal. The epochs are of shape: (n_epochs, n_channels, n_times). I saved the array so the chunksize is approximately 200MB. I have read the following docs (among others):
Now I was hoping that multiple processes would take one chunk at a time, extract my features, and get my results back. I thought I only needed to delay and compute to do so, but when testing my solution, it takes a really long time to execute so I guess I am missing something.
Here is my code:
import os
import numpy as np
from datetime import datetime
from distributed import Client
import dask
import dask.array as da
# my fake features extraction function
def my_features(x):
"""x is a flatten timeseries of shape (4096,)"""
# we use numpy std for the example
return np.array([np.mean(x), np.std(x)])
# create fake epochs of shape (n_epochs, n_channels, n_times)
epochs = da.random.random((100000, 2, 4096), chunks=(7800, 1, 4096))
results = []
for i_epoch in range(100): # when I use the full epochs.shape[0], it takes hours...
epoch_res = []
for i_channel in range(epochs.shape[1]):
epoch_res.append(dask.delayed(my_features)(epochs[i_epoch,i_channel, :]))
results.append(epoch_res)
results = np.array(dask.compute(*results))
First I thought compute()
would do the job for me. Is it ?
I aslo tried using map()
and explicitly adding a client:
client = Client(
processes=True,
threads_per_worker=4,
n_workers=os.cpu_count() - 1,
memory_limit="auto",
)
But nothing seems to work… I have looked into the persist()
direction, but don’t see how to tell the compute to work on one chunk, persist it, do the computation, and release it.
So I definitely think that I am missing something…
Any help would be much appreciated ! Thanks