I have prepared some working code as a mockup for what I am trying to accomplish, and I am looking for feedback to see what parts of dask I could be using better/more efficiently. I added 4 questions in the code comments. Thanks in advance!
import numpy as np
import dask.array as da
from dask.distributed import wait
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
data_size = int(1e6)
chunk_size = int(1e5)
# a class to illustrute similar data structuring to what I have
# in my case all array members will have the same shapes and chunk sizes
class MyData:
def __init__(self, x, y):
self.x = x
self.y = y
# I need to push this data through a ctypes function, and I'd like to do it with dask
data1 = MyData(
x=da.from_array(np.arange(0, data_size), chunks=chunk_size),
y=da.from_array(np.arange(0, data_size), chunks=chunk_size),
)
# I assume my arbitrary chunks are sub-optimal since the chunks assigned
# to particular workers may not reside on the particular
# worker, making lots of network traffic!
# Q1. is there a better way (can I select a chunk based on the current worker)?
run_chunks = np.arange(0, data_size, chunk_size)
run_chunks = list(
zip(run_chunks, run_chunks + [chunk_size - 1] * run_chunks.size)
)
# not truly a ctypes function, but here so I have a running example
def a_numpy_ctypes_func(data):
data.x = data.x + 1
data.y = data.y * 2
def get_the_job(data1: MyData):
def the_job(run_chunk):
# Q2. is extracting to a np.array necessary here for using ctypes pointers,
# numba etc, and if so is this the right way to extract it?
my_data_np = MyData(
x=np.array(data1.x[run_chunk[0] : run_chunk[1]]),
y=np.array(data1.x[run_chunk[0] : run_chunk[1]]),
)
a_numpy_ctypes_func(my_data_np)
return MyData(
x=da.from_array(my_data_np.x), y=da.from_array(my_data_np.y)
)
return the_job
futures = client.map(get_the_job(data1), run_chunks)
# Q3. do I need to wait here, or will this automatically happen as the result is consumed later?
wait(futures)
# finally stitch the result back together to pass to next processes
# Q4. is this safe in terms of preserving the original ordering?
data2 = MyData(
x=da.concatenate([f.result().x for f in futures]),
y=da.concatenate([f.result().y for f in futures]),
)