How to Parallel Saving Many Large Dask Arrays

Hello!
I have many very large numpy arrays, which I want to save to local disk quickly using dask’s parallel feature with high commpressive ratio. I plan to fistly convert the numpy array to dask array, and then save the dask array in zarr format using the following code:

import numpy as np
import dask.array as da
from dask.distributed import Client
from dask import delayed
import zarr as za
from numcodecs import Blosc
client = Client(n_workers=0, threads_per_worker=16)
for i in range(500):
    a = np.random.rand((512*512,240*200*100))
    filename = 'a'+str(i)+'.zarr'
    compressor = Blosc(cname='zstd', clevel=3, shuffle=Blosc.BITSHUFFLE)
    a_Zarr = da.from_array(a, chunks=(a.shape[0],int(a.shape[1]/40)))
    a_Zarr = client.scatter(a_Zarr)
    client.run(da.to_zarr,a_Zarr,filename,component='a_Zarr',compressor=compressor)

But when I run the above code, I confronted with the following problems:
1.when client.scatter is used, a_Zarr is converted into future objects which loses its original dask array attributes, thus da.to_zarr can no longer be used.
2.It seems that client.run cannot speed up the the

da.to_zarr(a_Zarr,filename,component='a_Zarr',compressor=compressor)

process. And it seems that the client.compute also cannot be used to speed up the saving process. If I want to manually control the number of threads or number of processes used in saving the a_Zarr dask array, which function of client should be used?

Thanks!!!

Can anyone provide solution to this problem? :smiley: :smiley: :smiley:

Hi @tjk,

Based on your code, I would simplify a bit, and just parallelize on each Numpy array in input.
I would first create a function which takes a Numpy array in input and use Zarr directly to save it to disk (so without using Dask). Then I would use your for loop and Dask Delayed to just delayed the 500 calls of this function, and in the end call compute on all the delayed objects.

does that make any sense to you?

Hello @guillaumeeb ,
I am new to dask, could you show me how to use dask’s compute function to calculated the

da.to_zarr(a_Zarr,filename,component='a_Zarr',compressor=compressor) 

function with all the input parameters? I have seen the documentation on the forum, but the format of this

da.to_zarr

function is different from simple numerical add or multiple functions.
By the way, I found that if we write the function in this way:

da.from_array(a, chunks=(a.shape[0]/2,int(a.shape[1]/20)))

and save it using da.to_zarr, it will be two times faster than create the array using

da.from_array(a, chunks=(a.shape[0],int(a.shape[1]/40)))

I wonder if you could also help me explain this phenomeon.

Actually, what I’m suggesting above is to not use Dask Arrays. Just use Numpy and Zarr, and then wrap all that in delayed. Something like (not tested code):

def save_to_zarr(i, a):
    filename = 'a'+str(i)+'.zarr'
    compressor = Blosc(cname='zstd', clevel=3, shuffle=Blosc.BITSHUFFLE)
    zarr.save(filename, a,compressor=compressor)

delayed_results = []

for i in range(500):
    a = np.random.rand((512*512,240*200*100))
    delayed_results.append(delayed(save_to_zarr)(i,a)

results = client.compute(delayed_results)

This way you would have an embarrassingly parallel code that launch 500 file writes in parallel (hope your file system can handle it). If you really want to save each Numpy array sequentially, but using dask-array for each write, this is another problem.

But maybe the solution would only to change your code a bit:

for i in range(500):
    a = np.random.rand((512*512,240*200*100))
    filename = 'a'+str(i)+'.zarr'
    compressor = Blosc(cname='zstd', clevel=3, shuffle=Blosc.BITSHUFFLE)
    a_Zarr = da.from_array(a, chunks=(a.shape[0],int(a.shape[1]/40)))
    da.to_zarr(a_Zarrfilename,component='a_Zarr',compressor=compressor)#You don't need to use client.run or compute here

client.run is not needed, Dask will use the LocalCluster by default to launch this operation.

I’ve just seen that you launch 0 workers here

client = Client(n_workers=0, threads_per_worker=16)

You’ll need at least one!

This is unexpected, I’ve no time to test, but this should not be the case…