Difference in loading performance between dask array and numpy/joblib

Hi,

I recently started working with large ome-zarr files. I’m currently focusing on getting an idea of how long different processes take.

To start, I’m testing a simple function that loads an individual chunk in the zarr file and calculates the mean. Then I run the function on all the chunks using multiple threads.

My current observation is that the same process takes around 10 times more time using Dask compared to numpy+joblib. I want to know if that looks normal or if there is a way to obtain a better performance with Dask.

Here is the code to reproduce what I’m doing:

import os
import numpy as np
import dask.array as da
from joblib import Parallel, delayed
# create random array
a_shape = (1,1,434, 10240, 3072)
a_type = np.float32
chunk_shape = (1,1,1, 1024,1024)
a = da.random.random(a_shape, chunks=chunk_shape).astype(a_type)
# save array to disk as a zarr file
afn = "/data/a.zarr"
a.to_zarr(afn, compressor = None)
# lazy load array using dask array to access some metadata
az = da.from_zarr(afn)
# list with chunk ids that helps us build the paths to individual chunks
chunks_path_coords = [np.arange(len(n_chunks)) for n_chunks in az.chunks]
chunk_ids = list(zip(*[c.reshape(-1) for c in np.meshgrid(*chunks_path_coords)]))
def chunk_mean(zarr_path, chunk_id, ch_type, ch_shape):
    """ Load an uncompressed zarr array chunk using numpy and returns its
        mean
    """
    chunk_path = os.path.join(zarr_path, '.'.join([str(id) for id in chunk_id]))
    chunk = np.fromfile(file=chunk_path, dtype=ch_type).reshape(ch_shape)
    return chunk.mean()
# run chunk mean function in parallel using joblib and time it
%time vals_np = Parallel(n_jobs=16)(delayed(chunk_mean)(afn, cid, a_type, chunk_shape) for cid in chunk_ids)

CPU times: user 768 ms, sys: 60.2 ms, total: 829 ms
Wall time: 1.22 s

# run equivalent operation using Dask array and time it
%time vals_d = da.map_blocks(lambda x: x.mean().reshape(1,1,1,1,1), az, drop_axis=(0,1)).compute(num_workers=16)

CPU times: user 32.2 s, sys: 19.5 s, total: 51.7 s
Wall time: 15.1 s

# verify that returned values are the same
np.allclose(vals_d.reshape(-1), vals_np)

True

I’m using a nvme storage that is capable of 25-35 GB/s read or write.

Doing more complicated tasks like using overlap between chunks or using compressed chunks is much more difficult without Dask. That is why I want to understand this performance difference.

Hi @JorgeGtz, welcome to Dask community!

No, that’s not normal!

Thanks a lot for your reproducible example, that really helps!
I used it on my laptop, and I didn’t have the same results, but still Dask was 2 times slower.

I was able to make it even by removing the drop_axis kwarg, could you try that?

In any case, I’m not really happy with this code, it feels awkward to reshape the mean, it should be possible to end with a 1 dimensional Array with Dask…

Thank you @guillaumeeb for the quick reply!
Removing drop_axis cuts the time in half for me, but it stays considerably slower than numpy+joblib:

%time vals_d2 = da.map_blocks(lambda x: x.mean().reshape(1,1,1,1,1), az).compute(num_workers=16)

CPU times: user 19.4 s, sys: 15.9 s, total: 35.3 s
Wall time: 8.89 s

If I don’t use reshape I get a IndexError: tuple index out of range

And the bottleneck does not seem to be the computation of the mean. I tested with a function that instead of computing the mean, it returns the value of a single pixel:

%time vals_d3 = da.map_blocks(lambda x: x[0,0,0,0,0].reshape(1,1,1,1,1), az).compute(num_workers=16)

CPU times: user 13.4 s, sys: 15.7 s, total: 29.1 s
Wall time: 7.99 s

def chunk_pixel0(zarr_path, chunk_id, ch_type, ch_shape):
    """ Load an uncompressed zarr array chunk and return the value of the pixel 
        closest to the origin
    """
    chunk_path = os.path.join(zarr_path, '.'.join([str(id) for id in chunk_id]))
    chunk = np.fromfile(file=chunk_path, dtype=ch_type).reshape(ch_shape)
    return chunk[0,0,0,0,0]

%time vals_np3 = Parallel(n_jobs=16)(delayed(chunk_pixel0)(afn, cid, a_type, chunk_shape) for cid in chunk_ids)

CPU times: user 948 ms, sys: 87.8 ms, total: 1.04 s
Wall time: 1.84 s

np.allclose(vals_d3.reshape(-1), vals_np3)

True

On your end, what number of cores did you use?
We are using a dual AMD EPYC 7H12 btw.

Yeah, I tried to change this part but didn’t found the correct syntax.

I’m not surprised by this conclusion, but this is nice to just confirm it!

I’m on my laptop, 8 cores and a single NVme SSD, not quite the same performance as your machine.

I’m not really familiar with joblib, does your code use threads or processes? Could you try to use threads with Dask, using .compute(num_threads=16)

Another thing would be to try with the Distributed Scheduler and have a look at the Dashboard to better understand performances.

I believe joblib uses processes by default:

%time vals_np_pr = Parallel(n_jobs=16, prefer="processes")(delayed(chunk_pixel0)(afn, cid, a_type, chunk_shape) for cid in chunk_ids)

CPU times: user 943 ms, sys: 52.6 ms, total: 996 ms
Wall time: 1.19 s

%time vals_np_th = Parallel(n_jobs=16, prefer="threads")(delayed(chunk_pixel0)(afn, cid, a_type, chunk_shape) for cid in chunk_ids)

CPU times: user 2 s, sys: 12.5 s, total: 14.5 s
Wall time: 2.33 s

Using processes in Dask seems to improve things a little, but still slower than joblib:

%time vals_d_pr = da.map_blocks(lambda x: x[0,0,0,0,0].reshape(1,1,1,1,1), az).compute(num_workers=16, scheduler="processes")

CPU times: user 3.73 s, sys: 601 ms, total: 4.34 s
Wall time: 5.84 s

And this is specifying threads in Dask:

%time vals_d_nt = da.map_blocks(lambda x: x[0,0,0,0,0].reshape(1,1,1,1,1), az).compute(num_threads=16)

CPU times: user 16.4 s, sys: 19.5 s, total: 35.9 s
Wall time: 10.5 s

%time vals_d_th = da.map_blocks(lambda x: x[0,0,0,0,0].reshape(1,1,1,1,1), az).compute(num_workers=16, scheduler="threads")

CPU times: user 12.5 s, sys: 15 s, total: 27.5 s
Wall time: 7.33 s

I have to admit I’m at a loss here, I can imagine Dask has more overhead than joblib, but in your system it is a bit too much!

I tried to use the Distributed Scheduler by creating a Client/LocalCluster, and see how it goes with it by looking at the Dashboard:

from distributed import Client
client = Client(n_workers=16)

It took longer using this Scheduler, I thought that was because tasks were too short, so chunk size too small. I increased the chunk size to be 32MiB (chunk_shape = (1,1,2, 4096,1024)), that helped the distributed Scheduler, but the multiprocessing scheduler and joblib test cases were slower. Still, almost the same timings between the two on my laptop…

What happens if you increase the overall Zarr file size if you can do it on your system?

I increased the size of the file to ~200GiB and the chunk size to 32MiB

print(a)

dask.array<astype, shape=(1, 1, 434, 20480, 6144), dtype=float32, chunksize=(1, 1, 2, 4096, 1024), chunktype=numpy.ndarray>

This is the performance of joblib processes:

%time vals_np_pr = Parallel(n_jobs=16, prefer="processes")(delayed(chunk_pixel0)(afn, cid, a_type, chunk_shape) for cid in chunk_ids)

CPU times: user 1.35 s, sys: 292 ms, total: 1.64 s
Wall time: 5.54 s

And this is Dask using processes:

%time vals_d_pr = da.map_blocks(lambda x: x[0,0,0,0,0].reshape(1,1,1,1,1), az).compute(num_workers=16, scheduler="processes")

CPU times: user 2.62 s, sys: 315 ms, total: 2.94 s
Wall time: 12.4 s

That is a little closer