I recently started working with large ome-zarr files. I’m currently focusing on getting an idea of how long different processes take.
To start, I’m testing a simple function that loads an individual chunk in the zarr file and calculates the mean. Then I run the function on all the chunks using multiple threads.
My current observation is that the same process takes around 10 times more time using Dask compared to numpy+joblib. I want to know if that looks normal or if there is a way to obtain a better performance with Dask.
Here is the code to reproduce what I’m doing:
import os import numpy as np import dask.array as da from joblib import Parallel, delayed
# create random array a_shape = (1,1,434, 10240, 3072) a_type = np.float32 chunk_shape = (1,1,1, 1024,1024) a = da.random.random(a_shape, chunks=chunk_shape).astype(a_type)
# save array to disk as a zarr file afn = "/data/a.zarr" a.to_zarr(afn, compressor = None) # lazy load array using dask array to access some metadata az = da.from_zarr(afn)
# list with chunk ids that helps us build the paths to individual chunks chunks_path_coords = [np.arange(len(n_chunks)) for n_chunks in az.chunks] chunk_ids = list(zip(*[c.reshape(-1) for c in np.meshgrid(*chunks_path_coords)]))
def chunk_mean(zarr_path, chunk_id, ch_type, ch_shape): """ Load an uncompressed zarr array chunk using numpy and returns its mean """ chunk_path = os.path.join(zarr_path, '.'.join([str(id) for id in chunk_id])) chunk = np.fromfile(file=chunk_path, dtype=ch_type).reshape(ch_shape) return chunk.mean()
# run chunk mean function in parallel using joblib and time it %time vals_np = Parallel(n_jobs=16)(delayed(chunk_mean)(afn, cid, a_type, chunk_shape) for cid in chunk_ids)
CPU times: user 768 ms, sys: 60.2 ms, total: 829 ms
Wall time: 1.22 s
# run equivalent operation using Dask array and time it %time vals_d = da.map_blocks(lambda x: x.mean().reshape(1,1,1,1,1), az, drop_axis=(0,1)).compute(num_workers=16)
CPU times: user 32.2 s, sys: 19.5 s, total: 51.7 s
Wall time: 15.1 s
# verify that returned values are the same np.allclose(vals_d.reshape(-1), vals_np)
I’m using a nvme storage that is capable of 25-35 GB/s read or write.
Doing more complicated tasks like using overlap between chunks or using compressed chunks is much more difficult without Dask. That is why I want to understand this performance difference.