Using dask with numba parallelized jit function

I have an array and an already parallelized numba jit function. I want to feed that array into the function and save the result to the disk. That array is too huge to fit into the memory. So, I want to use dask as a task scheduler to automatic that process. Here is an minimum example:

import numba
from numba import jit, prange
import math
import numpy as np
import zarr

from dask.distributed import Client, LocalCluster
from dask import array as da

# the function
@jit(nopython=True,nogil=True,parallel=True)
def sin(x):
    out = np.empty_like(x)
    for i in prange(x.shape[0]):
        out[i] = math.sin(x[i])
    return out

# the input data
x = np.arange(100000)
x_zarr = zarr.open('x.zarr','w', shape=x.shape, chunks=(2000,), dtype=x.dtype)

cluster = LocalCluster()
client = Client(cluster)

# the whole processing
x_zarr = zarr.open('x.zarr','r')
da_x = da.from_array(x_zarr,chunks=(2000,))
da_sinx = da_x.map_blocks(sin)
da_sinx.to_zarr('sinx.zarr',overwrite=True)

I want to make the whole processing as fast as possible while the memory consuming is low. The best way that I can imaging is to have two workers. One reads and writes the data while another one does the computation. And they can work in parallel, i.e., while one worker is processing the current chunk of data, the other worker saves the previous processed chunk and reads the next chunk. In this way, I make full use of the CPUs since the numba function is already parallelized and the time to read and write data is saved. And the memory consuming is low because only at most two chunks are processing at one time.
Is it the best way to process the data? If so, how can I make dask schedule the task as I proposed? Is cluster = LocalCluster(n_workers=2, processes=False) the right way?

Thanks!

Hi @kanglcn,

I don’t think you could achieve this precive workflow using only Dask. But you can probably come close by using one worker and 2 threads:
cluster = LocalCluster(n_workers=1, threads_per_worker=2)

Actually, each thread will probably loop into a read/compute/write sequence, but this should be pretty close to what you want.

Hi @guillaumeeb , thanks for your reply. I tried your suggestions. I find with LocalCluster(n_workers=1, threads_per_worker=2), the overall memory consumption is low. And the processing is faster compared with LocalCluster(n_workers=1, threads_per_worker=1). Looks perfect to me. But I have a question, with LocalCluster(n_workers=1, threads_per_worker=2), will the threads compete for cpu resources and introduce too much context switch?

Depending on your code, they might yes. Either somewhere your threads will be bounded by the GIL, so only one Numba computation will happen at once, either it is not, and you might have two executions in parallel. I recommend watching the dashboard to verify this.

Ultimately, you might use resources or other things like semaphore to prevent to compute tasks to execute concurrently.

will the threads compete for cpu resources and introduce too much context switch?

@kanglcn that entirely depends on the nogil parameter you pass to numba (implicity or explicitly).
https://numba.pydata.org/numba-doc/latest/user/jit.html#nogil

1 Like