Hi,
I recently started working with large ome-zarr files. I’m currently focusing on getting an idea of how long different processes take.
To start, I’m testing a simple function that loads an individual chunk in the zarr file and calculates the mean. Then I run the function on all the chunks using multiple threads.
My current observation is that the same process takes around 10 times more time using Dask compared to numpy+joblib. I want to know if that looks normal or if there is a way to obtain a better performance with Dask.
Here is the code to reproduce what I’m doing:
import os
import numpy as np
import dask.array as da
from joblib import Parallel, delayed
# create random array
a_shape = (1,1,434, 10240, 3072)
a_type = np.float32
chunk_shape = (1,1,1, 1024,1024)
a = da.random.random(a_shape, chunks=chunk_shape).astype(a_type)
# save array to disk as a zarr file
afn = "/data/a.zarr"
a.to_zarr(afn, compressor = None)
# lazy load array using dask array to access some metadata
az = da.from_zarr(afn)
# list with chunk ids that helps us build the paths to individual chunks
chunks_path_coords = [np.arange(len(n_chunks)) for n_chunks in az.chunks]
chunk_ids = list(zip(*[c.reshape(-1) for c in np.meshgrid(*chunks_path_coords)]))
def chunk_mean(zarr_path, chunk_id, ch_type, ch_shape):
""" Load an uncompressed zarr array chunk using numpy and returns its
mean
"""
chunk_path = os.path.join(zarr_path, '.'.join([str(id) for id in chunk_id]))
chunk = np.fromfile(file=chunk_path, dtype=ch_type).reshape(ch_shape)
return chunk.mean()
# run chunk mean function in parallel using joblib and time it
%time vals_np = Parallel(n_jobs=16)(delayed(chunk_mean)(afn, cid, a_type, chunk_shape) for cid in chunk_ids)
CPU times: user 768 ms, sys: 60.2 ms, total: 829 ms
Wall time: 1.22 s
# run equivalent operation using Dask array and time it
%time vals_d = da.map_blocks(lambda x: x.mean().reshape(1,1,1,1,1), az, drop_axis=(0,1)).compute(num_workers=16)
CPU times: user 32.2 s, sys: 19.5 s, total: 51.7 s
Wall time: 15.1 s
# verify that returned values are the same
np.allclose(vals_d.reshape(-1), vals_np)
True
I’m using a nvme storage that is capable of 25-35 GB/s read or write.
Doing more complicated tasks like using overlap between chunks or using compressed chunks is much more difficult without Dask. That is why I want to understand this performance difference.