Hi,
I am just getting started with dask (and zarr), wanting to do some basic computations on larger-than-memory arrays.
I must be doing something wrong, because I am getting much worse performance with dask than if I use zarr>numpy functions directly, or loop over chunks myself. I’m guessing maybe it has to do with the fact that I’m operating on a much smaller section of the array than is allocated in total? Or maybe the chunk size is too small?
Code below, any help appreciated.
Thanks.
import zarr
import dask.array as da
import numpy as np
dz_arr=da.from_zarr(my_path)
dz_arr
Array Chunk
Bytes 62.00 TiB 3.81 MiB
Shape (31, 262143, 1048575) (1, 50, 10000)
Count 17065966 Tasks 17065965 Chunks
Type float64 numpy.ndarray 1048575 262143 31
z_arr=zarr.open(my_path,'r')
z_arr.info
Type | zarr.core.Array |
---|---|
Data type | float64 |
Shape | (31, 262143, 1048575) |
Chunk shape | (1, 50, 10000) |
Order | C |
Read-only | True |
Compressor | None |
Store type | zarr.storage.DirectoryStore |
No. bytes | 68169395863800 (62.0T) |
No. bytes stored | 46848000256 (43.6G) |
Storage ratio | 1455.1 |
Chunks initialized | 11712/17065965 |
%time m_z=z_arr[0,0:10,0:20000].max(axis=1)
CPU times: user 364 µs, sys: 6.48 ms, total: 6.84 ms
Wall time: 8.49 ms
%time m_dz=dz_arr[0,0:10,0:20000].max(axis=1).compute()
CPU times: user 21.7 s, sys: 3.17 s, total: 24.9 s
Wall time: 24.8 s
np.allclose(m_z,m_dz,equal_nan=True)
True
#manual loop over chunks
chunk=z_arr.chunks[1]
%time t_z1=np.concatenate([np.nanargmax(np.abs(z_arr[8,i:i+chunk,0:20000]),axis=1) for i in range(0,40000,chunk)])
CPU times: user 4.4 s, sys: 1.3 s, total: 5.69 s
Wall time: 6.66 s
#dask
%time t_dz=np.nanargmax(np.abs(dz_arr[8,0:40000,0:20000]),axis=1).compute()
CPU times: user 38.9 s, sys: 7.9 s, total: 46.8 s
Wall time: 30 s
#manual multithreading
from joblib import Parallel, delayed
%time t_z2=np.concatenate(Parallel(10)(delayed(lambda i:np.nanargmax(np.abs(z_arr[8,i:i+chunk,0:20000]),axis=1))(i) for i in range(0,40000,chunk)))
CPU times: user 540 ms, sys: 185 ms, total: 725 ms
Wall time: 2.82 s
#maybe this is helpful?
t=np.nanargmax(np.abs(dz_arr[8,0:40000,0:20000]),axis=1)
t.dask
HighLevelGraph
HighLevelGraph with 6 layers and 17071566 keys from all layers.
Layer1: original-from-zarr
original-from-zarr-e4ea86f27725c56890661cb8e9f6e12c
layer_type MaterializedLayer
is_materialized True
number of outputs 1
Layer2: from-zarr
from-zarr-e4ea86f27725c56890661cb8e9f6e12c
layer_type Blockwise
is_materialized False
number of outputs 17065965
shape (31, 262143, 1048575)
dtype float64
chunksize (1, 50, 10000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on original-from-zarr-e4ea86f27725c56890661cb8e9f6e12c 1048575 262143 31
Layer3: getitem
getitem-fb18d5776259f8dc80d7e435567b6be9
layer_type MaterializedLayer
is_materialized True
number of outputs 1600
shape (40000, 20000)
dtype float64
chunksize (50, 10000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on from-zarr-e4ea86f27725c56890661cb8e9f6e12c 20000 40000
Layer4: absolute
absolute-8239f515e7e0744ea17848a6804a9559
layer_type Blockwise
is_materialized False
number of outputs 1600
shape (40000, 20000)
dtype float64
chunksize (50, 10000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on getitem-fb18d5776259f8dc80d7e435567b6be9 20000 40000
Layer5: arg-reduce
arg-reduce-6348fbc45216c6d4c2133d048cb66f2b
layer_type MaterializedLayer
is_materialized True
number of outputs 1600
shape (40000, 2)
dtype int64
chunksize (50, 1)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on absolute-8239f515e7e0744ea17848a6804a9559 2 40000
Layer6: nanarg_agg-aggregate
nanarg_agg-aggregate-1a7622f0e72b112855fcb48a9641209f
layer_type MaterializedLayer
is_materialized True
number of outputs 800
shape (40000,)
dtype int64
chunksize (50,)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on arg-reduce-6348fbc45216c6d4c2133d048cb66f2b 40000 1