Hello!
I have many very large numpy arrays, which I want to save to local disk quickly using dask’s parallel feature with high commpressive ratio. I plan to fistly convert the numpy array to dask array, and then save the dask array in zarr format using the following code:
import numpy as np
import dask.array as da
from dask.distributed import Client
from dask import delayed
import zarr as za
from numcodecs import Blosc
client = Client(n_workers=0, threads_per_worker=16)
for i in range(500):
a = np.random.rand((512*512,240*200*100))
filename = 'a'+str(i)+'.zarr'
compressor = Blosc(cname='zstd', clevel=3, shuffle=Blosc.BITSHUFFLE)
a_Zarr = da.from_array(a, chunks=(a.shape[0],int(a.shape[1]/40)))
a_Zarr = client.scatter(a_Zarr)
client.run(da.to_zarr,a_Zarr,filename,component='a_Zarr',compressor=compressor)
But when I run the above code, I confronted with the following problems:
1.when client.scatter is used, a_Zarr is converted into future objects which loses its original dask array attributes, thus da.to_zarr can no longer be used.
2.It seems that client.run cannot speed up the the
da.to_zarr(a_Zarr,filename,component='a_Zarr',compressor=compressor)
process. And it seems that the client.compute also cannot be used to speed up the saving process. If I want to manually control the number of threads or number of processes used in saving the a_Zarr dask array, which function of client should be used?
Thanks!!!