Help with dask/zarr usage -- performance issues with dask

Hi,

I am just getting started with dask (and zarr), wanting to do some basic computations on larger-than-memory arrays.

I must be doing something wrong, because I am getting much worse performance with dask than if I use zarr>numpy functions directly, or loop over chunks myself. I’m guessing maybe it has to do with the fact that I’m operating on a much smaller section of the array than is allocated in total? Or maybe the chunk size is too small?

Code below, any help appreciated.
Thanks.


import zarr
import dask.array as da
import numpy as np

dz_arr=da.from_zarr(my_path)
dz_arr

Array Chunk
Bytes 62.00 TiB 3.81 MiB
Shape (31, 262143, 1048575) (1, 50, 10000)
Count 17065966 Tasks 17065965 Chunks
Type float64 numpy.ndarray 1048575 262143 31

z_arr=zarr.open(my_path,'r')
z_arr.info
Type zarr.core.Array
Data type float64
Shape (31, 262143, 1048575)
Chunk shape (1, 50, 10000)
Order C
Read-only True
Compressor None
Store type zarr.storage.DirectoryStore
No. bytes 68169395863800 (62.0T)
No. bytes stored 46848000256 (43.6G)
Storage ratio 1455.1
Chunks initialized 11712/17065965
%time m_z=z_arr[0,0:10,0:20000].max(axis=1)

CPU times: user 364 µs, sys: 6.48 ms, total: 6.84 ms
Wall time: 8.49 ms

%time m_dz=dz_arr[0,0:10,0:20000].max(axis=1).compute()

CPU times: user 21.7 s, sys: 3.17 s, total: 24.9 s
Wall time: 24.8 s

np.allclose(m_z,m_dz,equal_nan=True)

True

#manual loop over chunks
chunk=z_arr.chunks[1]
%time t_z1=np.concatenate([np.nanargmax(np.abs(z_arr[8,i:i+chunk,0:20000]),axis=1) for i in range(0,40000,chunk)])

CPU times: user 4.4 s, sys: 1.3 s, total: 5.69 s
Wall time: 6.66 s

#dask
%time t_dz=np.nanargmax(np.abs(dz_arr[8,0:40000,0:20000]),axis=1).compute()

CPU times: user 38.9 s, sys: 7.9 s, total: 46.8 s
Wall time: 30 s

#manual multithreading
from joblib import Parallel, delayed

%time t_z2=np.concatenate(Parallel(10)(delayed(lambda i:np.nanargmax(np.abs(z_arr[8,i:i+chunk,0:20000]),axis=1))(i) for i in range(0,40000,chunk)))

CPU times: user 540 ms, sys: 185 ms, total: 725 ms
Wall time: 2.82 s

#maybe this is helpful?
t=np.nanargmax(np.abs(dz_arr[8,0:40000,0:20000]),axis=1)
t.dask

HighLevelGraph

HighLevelGraph with 6 layers and 17071566 keys from all layers.

Layer1: original-from-zarr

original-from-zarr-e4ea86f27725c56890661cb8e9f6e12c

layer_type MaterializedLayer


is_materialized True
number of outputs 1

Layer2: from-zarr

from-zarr-e4ea86f27725c56890661cb8e9f6e12c

layer_type Blockwise


is_materialized False
number of outputs 17065965
shape (31, 262143, 1048575)
dtype float64
chunksize (1, 50, 10000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on original-from-zarr-e4ea86f27725c56890661cb8e9f6e12c 1048575 262143 31

Layer3: getitem

getitem-fb18d5776259f8dc80d7e435567b6be9

layer_type MaterializedLayer


is_materialized True
number of outputs 1600
shape (40000, 20000)
dtype float64
chunksize (50, 10000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on from-zarr-e4ea86f27725c56890661cb8e9f6e12c 20000 40000

Layer4: absolute

absolute-8239f515e7e0744ea17848a6804a9559

layer_type Blockwise


is_materialized False
number of outputs 1600
shape (40000, 20000)
dtype float64
chunksize (50, 10000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on getitem-fb18d5776259f8dc80d7e435567b6be9 20000 40000

Layer5: arg-reduce

arg-reduce-6348fbc45216c6d4c2133d048cb66f2b

layer_type MaterializedLayer


is_materialized True
number of outputs 1600
shape (40000, 2)
dtype int64
chunksize (50, 1)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on absolute-8239f515e7e0744ea17848a6804a9559 2 40000

Layer6: nanarg_agg-aggregate

nanarg_agg-aggregate-1a7622f0e72b112855fcb48a9641209f

layer_type MaterializedLayer


is_materialized True
number of outputs 800
shape (40000,)
dtype int64
chunksize (50,)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on arg-reduce-6348fbc45216c6d4c2133d048cb66f2b 40000 1

Hi,

I’m no Zarr expert, but a small guess here is that you’ve really got too many chunks at the beginning, and so Dask might have a hard time going through the generated graph even if in the end it only reads a small portion of them.

It is clear that chunks are two small for any big data analysis here.

In the end, as you’re only using a really small portion of the complete array, you should look at a way to open and read only a portion of the Zarr ensemble. I’m not sure if this is possible with an option or some custom code.

You could also try to ask this question on discourse.pangeo.org.

Best,
Guillaume.

1 Like

Dask might have a hard time going through the generated graph even if in the end it only reads a small portion of them.

I agree with @guillaumeeb’s comment here. To confirm this, you can try:

%%time
m_dz=dz_arr[0,0:10,0:20000].max(axis=1).persist()

this persist time can tell us how long it tasks for Dask to generate the graph and give you back the control.

This issue could also be related to: Culling massive Blockwise graphs is very slow, not constant-time · Issue #8570 · dask/dask · GitHub

Thanks a lot for your comments @pavithraes @guillaumeeb

%time m_dz=dz_arr[0,0:10,0:20000].max(axis=1).persist()

CPU times: user 23.5 s, sys: 5.7 s, total: 29.2 s
Wall time: 45.3 s

The root cause makes sense that it generates the entire graph even though it doesn’t need to. I was just surprised that that’s the behavior

Curiously I don’t see this issue when using xarray + zarr + dask.

It is clear that chunks are two small for any big data analysis here.

I wanted to optimize for the most common use case of reading small slices (e.g. one row) across the first two dimensions, while also enabling dataset-wide analysis. I’d be thrilled to take any other suggestions of how to chunk the data.

Thanks again for the feedback.
V

Interesting! Maybe Xarray also do some optimization under the hood?

I’d say that even if you have some use case when you need only small slices, just create bigger chunks. The overhead of reading a bigger chunk is not that high. For more suggestions, you have a good blog post below:

https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes

In the end, it really depends on the analysis you want to perform, for example temporal slices vs spatial ones.

2 Likes