I have an array and an already parallelized numba jit function. I want to feed that array into the function and save the result to the disk. That array is too huge to fit into the memory. So, I want to use dask as a task scheduler to automatic that process. Here is an minimum example:
import numba
from numba import jit, prange
import math
import numpy as np
import zarr
from dask.distributed import Client, LocalCluster
from dask import array as da
# the function
@jit(nopython=True,nogil=True,parallel=True)
def sin(x):
out = np.empty_like(x)
for i in prange(x.shape[0]):
out[i] = math.sin(x[i])
return out
# the input data
x = np.arange(100000)
x_zarr = zarr.open('x.zarr','w', shape=x.shape, chunks=(2000,), dtype=x.dtype)
cluster = LocalCluster()
client = Client(cluster)
# the whole processing
x_zarr = zarr.open('x.zarr','r')
da_x = da.from_array(x_zarr,chunks=(2000,))
da_sinx = da_x.map_blocks(sin)
da_sinx.to_zarr('sinx.zarr',overwrite=True)
I want to make the whole processing as fast as possible while the memory consuming is low. The best way that I can imaging is to have two workers. One reads and writes the data while another one does the computation. And they can work in parallel, i.e., while one worker is processing the current chunk of data, the other worker saves the previous processed chunk and reads the next chunk. In this way, I make full use of the CPUs since the numba function is already parallelized and the time to read and write data is saved. And the memory consuming is low because only at most two chunks are processing at one time.
Is it the best way to process the data? If so, how can I make dask schedule the task as I proposed? Is cluster = LocalCluster(n_workers=2, processes=False)
the right way?
Thanks!