I have prepared some working code as a mockup for what I am trying to accomplish, and I am looking for feedback to see what parts of dask I could be using better/more efficiently. I added 4 questions in the code comments. Thanks in advance!
import numpy as np import dask.array as da from dask.distributed import wait from dask.distributed import Client, LocalCluster cluster = LocalCluster() client = Client(cluster) data_size = int(1e6) chunk_size = int(1e5) # a class to illustrute similar data structuring to what I have # in my case all array members will have the same shapes and chunk sizes class MyData: def __init__(self, x, y): self.x = x self.y = y # I need to push this data through a ctypes function, and I'd like to do it with dask data1 = MyData( x=da.from_array(np.arange(0, data_size), chunks=chunk_size), y=da.from_array(np.arange(0, data_size), chunks=chunk_size), ) # I assume my arbitrary chunks are sub-optimal since the chunks assigned # to particular workers may not reside on the particular # worker, making lots of network traffic! # Q1. is there a better way (can I select a chunk based on the current worker)? run_chunks = np.arange(0, data_size, chunk_size) run_chunks = list( zip(run_chunks, run_chunks + [chunk_size - 1] * run_chunks.size) ) # not truly a ctypes function, but here so I have a running example def a_numpy_ctypes_func(data): data.x = data.x + 1 data.y = data.y * 2 def get_the_job(data1: MyData): def the_job(run_chunk): # Q2. is extracting to a np.array necessary here for using ctypes pointers, # numba etc, and if so is this the right way to extract it? my_data_np = MyData( x=np.array(data1.x[run_chunk : run_chunk]), y=np.array(data1.x[run_chunk : run_chunk]), ) a_numpy_ctypes_func(my_data_np) return MyData( x=da.from_array(my_data_np.x), y=da.from_array(my_data_np.y) ) return the_job futures = client.map(get_the_job(data1), run_chunks) # Q3. do I need to wait here, or will this automatically happen as the result is consumed later? wait(futures) # finally stitch the result back together to pass to next processes # Q4. is this safe in terms of preserving the original ordering? data2 = MyData( x=da.concatenate([f.result().x for f in futures]), y=da.concatenate([f.result().y for f in futures]), )