Help with dask/zarr usage -- performance issues with dask

Hi,

I am just getting started with dask (and zarr), wanting to do some basic computations on larger-than-memory arrays.

I must be doing something wrong, because I am getting much worse performance with dask than if I use zarr>numpy functions directly, or loop over chunks myself. I’m guessing maybe it has to do with the fact that I’m operating on a much smaller section of the array than is allocated in total? Or maybe the chunk size is too small?

Code below, any help appreciated.
Thanks.


import zarr
import dask.array as da
import numpy as np

dz_arr=da.from_zarr(my_path)
dz_arr

Array Chunk
Bytes 62.00 TiB 3.81 MiB
Shape (31, 262143, 1048575) (1, 50, 10000)
Count 17065966 Tasks 17065965 Chunks
Type float64 numpy.ndarray 1048575 262143 31

z_arr=zarr.open(my_path,'r')
z_arr.info
Type zarr.core.Array
Data type float64
Shape (31, 262143, 1048575)
Chunk shape (1, 50, 10000)
Order C
Read-only True
Compressor None
Store type zarr.storage.DirectoryStore
No. bytes 68169395863800 (62.0T)
No. bytes stored 46848000256 (43.6G)
Storage ratio 1455.1
Chunks initialized 11712/17065965
%time m_z=z_arr[0,0:10,0:20000].max(axis=1)

CPU times: user 364 µs, sys: 6.48 ms, total: 6.84 ms
Wall time: 8.49 ms

%time m_dz=dz_arr[0,0:10,0:20000].max(axis=1).compute()

CPU times: user 21.7 s, sys: 3.17 s, total: 24.9 s
Wall time: 24.8 s

np.allclose(m_z,m_dz,equal_nan=True)

True

#manual loop over chunks
chunk=z_arr.chunks[1]
%time t_z1=np.concatenate([np.nanargmax(np.abs(z_arr[8,i:i+chunk,0:20000]),axis=1) for i in range(0,40000,chunk)])

CPU times: user 4.4 s, sys: 1.3 s, total: 5.69 s
Wall time: 6.66 s

#dask
%time t_dz=np.nanargmax(np.abs(dz_arr[8,0:40000,0:20000]),axis=1).compute()

CPU times: user 38.9 s, sys: 7.9 s, total: 46.8 s
Wall time: 30 s

#manual multithreading
from joblib import Parallel, delayed

%time t_z2=np.concatenate(Parallel(10)(delayed(lambda i:np.nanargmax(np.abs(z_arr[8,i:i+chunk,0:20000]),axis=1))(i) for i in range(0,40000,chunk)))

CPU times: user 540 ms, sys: 185 ms, total: 725 ms
Wall time: 2.82 s

#maybe this is helpful?
t=np.nanargmax(np.abs(dz_arr[8,0:40000,0:20000]),axis=1)
t.dask

HighLevelGraph

HighLevelGraph with 6 layers and 17071566 keys from all layers.

Layer1: original-from-zarr

original-from-zarr-e4ea86f27725c56890661cb8e9f6e12c

layer_type MaterializedLayer


is_materialized True
number of outputs 1

Layer2: from-zarr

from-zarr-e4ea86f27725c56890661cb8e9f6e12c

layer_type Blockwise


is_materialized False
number of outputs 17065965
shape (31, 262143, 1048575)
dtype float64
chunksize (1, 50, 10000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on original-from-zarr-e4ea86f27725c56890661cb8e9f6e12c 1048575 262143 31

Layer3: getitem

getitem-fb18d5776259f8dc80d7e435567b6be9

layer_type MaterializedLayer


is_materialized True
number of outputs 1600
shape (40000, 20000)
dtype float64
chunksize (50, 10000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on from-zarr-e4ea86f27725c56890661cb8e9f6e12c 20000 40000

Layer4: absolute

absolute-8239f515e7e0744ea17848a6804a9559

layer_type Blockwise


is_materialized False
number of outputs 1600
shape (40000, 20000)
dtype float64
chunksize (50, 10000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on getitem-fb18d5776259f8dc80d7e435567b6be9 20000 40000

Layer5: arg-reduce

arg-reduce-6348fbc45216c6d4c2133d048cb66f2b

layer_type MaterializedLayer


is_materialized True
number of outputs 1600
shape (40000, 2)
dtype int64
chunksize (50, 1)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on absolute-8239f515e7e0744ea17848a6804a9559 2 40000

Layer6: nanarg_agg-aggregate

nanarg_agg-aggregate-1a7622f0e72b112855fcb48a9641209f

layer_type MaterializedLayer


is_materialized True
number of outputs 800
shape (40000,)
dtype int64
chunksize (50,)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on arg-reduce-6348fbc45216c6d4c2133d048cb66f2b 40000 1

Hi,

I’m no Zarr expert, but a small guess here is that you’ve really got too many chunks at the beginning, and so Dask might have a hard time going through the generated graph even if in the end it only reads a small portion of them.

It is clear that chunks are two small for any big data analysis here.

In the end, as you’re only using a really small portion of the complete array, you should look at a way to open and read only a portion of the Zarr ensemble. I’m not sure if this is possible with an option or some custom code.

You could also try to ask this question on discourse.pangeo.org.

Best,
Guillaume.

1 Like

Dask might have a hard time going through the generated graph even if in the end it only reads a small portion of them.

I agree with @guillaumeeb’s comment here. To confirm this, you can try:

%%time
m_dz=dz_arr[0,0:10,0:20000].max(axis=1).persist()

this persist time can tell us how long it tasks for Dask to generate the graph and give you back the control.

This issue could also be related to: Culling massive Blockwise graphs is very slow, not constant-time · Issue #8570 · dask/dask · GitHub

Thanks a lot for your comments @pavithraes @guillaumeeb

%time m_dz=dz_arr[0,0:10,0:20000].max(axis=1).persist()

CPU times: user 23.5 s, sys: 5.7 s, total: 29.2 s
Wall time: 45.3 s

The root cause makes sense that it generates the entire graph even though it doesn’t need to. I was just surprised that that’s the behavior

Curiously I don’t see this issue when using xarray + zarr + dask.

It is clear that chunks are two small for any big data analysis here.

I wanted to optimize for the most common use case of reading small slices (e.g. one row) across the first two dimensions, while also enabling dataset-wide analysis. I’d be thrilled to take any other suggestions of how to chunk the data.

Thanks again for the feedback.
V

Interesting! Maybe Xarray also do some optimization under the hood?

I’d say that even if you have some use case when you need only small slices, just create bigger chunks. The overhead of reading a bigger chunk is not that high. For more suggestions, you have a good blog post below:

https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes

In the end, it really depends on the analysis you want to perform, for example temporal slices vs spatial ones.

2 Likes

Hi all,

I find even when the chunk size is not small, the performance is still not good. Here is the example:

I have a big zarr data:

!ls emperical_coherence.zarr
0.0.0.0  0.1.0.0  1.0.0.0  1.1.0.0  2.0.0.0  2.1.0.0

It has 6 chunks.

I want to read it in parallel:

coh = da.from_zarr('emperical_coherence.zarr')
coh

Array Chunk
Bytes 9.87 GiB 2.15 GiB
Shape (2500, 1834, 17, 17) (1000, 1000, 17, 17)
Count 2 Graph Layers 6 Chunks
Type complex64 numpy.ndarray

%%time
cpu_coh_result = coh.compute()

CPU times: user 12.3 s, sys: 18 s, total: 30.3 s
Wall time: 57.3 s

When I directly read it without dask:

%%time
cpu_coh_result = zarr.load('emperical_coherence.zarr')

It is faster:
CPU times: user 2.52 s, sys: 8.07 s, total: 10.6 s
Wall time: 10.1 s

There are only 6 chunks so generating the computing graph should not be the bottleneck:

%%time 
coh.persist()

CPU times: user 117 ms, sys: 113 ms, total: 229 ms
Wall time: 223 ms

The time usage with Dask is 6 times of direct reading. It seems I read the whole data 6 times. I definitely not read the data smartly. Does anybody know how to speed it up?

Hi @kanglcn,

I’m not sure using Dask to read a Zarr file as a Numpy array in the end is the most optimized way for loading a file. I understand you want to read each chunk concurrently, but maybe Zarr is already doing it for you. Still, 6 times slower is weird.

Before trying to speed up, I would try to understand what happens with Dask by using a LocalCluster and looking at the Dashboard. Dask is maybe spending some times to serialize Numpy arrays from Worker/processes back to the main thread or Client.

I tried to monitor the CPU usage with htop but it turns out only one core is used for both approaches. I have no idea why.

Reading a file is IO bound, it uses really a few CPU, so this is not a real surprise that your not viewing a lot of cpu usage. What we should ensure is that several IO operations occurs in parallel, either with threads or process.

Again, I would recommend you try to use a LocalCluster and see on the Dashboard what is happening.

Thanks for your suggestion! You are right. Dask spends much more time to transfer data data to the main thread. If replace compute() to persist(), the time cost is much shorter.

Just be careful with persist in case you don’t know, it’s a non blocking opération: persist will free your Python process, but there will be ongoing work in the background.

A good way to test data loading speed on a distributed cluster is to use an operation that is negligible in computational cost compared to IO like a mean on your array.