Simple parallelism with numpy and ctypes functions

I have prepared some working code as a mockup for what I am trying to accomplish, and I am looking for feedback to see what parts of dask I could be using better/more efficiently. I added 4 questions in the code comments. Thanks in advance!

import numpy as np
import dask.array as da
from dask.distributed import wait
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)

data_size = int(1e6)
chunk_size = int(1e5)

# a class to illustrute similar data structuring to what I have
# in my case all array members will have the same shapes and chunk sizes
class MyData:
    def __init__(self, x, y):
        self.x = x
        self.y = y


# I need to push this data through a ctypes function, and I'd like to do it with dask
data1 = MyData(
    x=da.from_array(np.arange(0, data_size), chunks=chunk_size),
    y=da.from_array(np.arange(0, data_size), chunks=chunk_size),
)

# I assume my arbitrary chunks are sub-optimal since the chunks assigned
# to particular workers may not reside on the particular
# worker, making lots of network traffic!
# Q1. is there a better way (can I select a chunk based on the current worker)?
run_chunks = np.arange(0, data_size, chunk_size)
run_chunks = list(
    zip(run_chunks, run_chunks + [chunk_size - 1] * run_chunks.size)
)

# not truly a ctypes function, but here so I have a running example
def a_numpy_ctypes_func(data):
    data.x = data.x + 1
    data.y = data.y * 2


def get_the_job(data1: MyData):
    def the_job(run_chunk):
        # Q2. is extracting to a np.array necessary here for using ctypes pointers,
        # numba etc, and if so is this the right way to extract it?
        my_data_np = MyData(
            x=np.array(data1.x[run_chunk[0] : run_chunk[1]]),
            y=np.array(data1.x[run_chunk[0] : run_chunk[1]]),
        )
        a_numpy_ctypes_func(my_data_np)
        return MyData(
            x=da.from_array(my_data_np.x), y=da.from_array(my_data_np.y)
        )

    return the_job


futures = client.map(get_the_job(data1), run_chunks)

# Q3. do I need to wait here, or will this automatically happen as the result is consumed later?
wait(futures)

# finally stitch the result back together to pass to next processes
# Q4. is this safe in terms of preserving the original ordering?
data2 = MyData(
    x=da.concatenate([f.result().x for f in futures]),
    y=da.concatenate([f.result().y for f in futures]),
)

Hi @smorken, welcome here!

I had trouble understanding your questions at first, but then I understood a bit more why by looking at the code. I think you shouldn’t be mixing dask.array and futures, you probably don’t need it, and this makes your code hard to follow.

If you need to apply a function on blocks of 2 arrays of similar shape that must be aligned, couldn’t you just concatenate them from the start in a single dask.array with one more dimension?

It’s even simpler if the functions you need to apply on x and y are independent, you can create independent graphs.

I’m not sure if this can be generalized to your real use case, but with your example, I would do something like:

data_size = int(1e6)
chunk_size = int(1e5)

# Don't use a class, just put everything into a dask-array, or have a look at XArray?
# This allows to have chunks with both x and y on the same workers, especially with the rechunking
# Ideally, you would have the correct chunking from the start, without needing to rechunk
x = da.arange(0, data_size, chunks=chunk_size)
y = da.arange(0, data_size, chunks=chunk_size)
data1 = da.stack([x,y])
data1 = xy.rechunk((2, chunk_size))

# not truly a ctypes function, but here so I have a running example
def a_numpy_ctypes_func(data):
    data[0] = data[0] + 1
    data[1] = data[1] * 2
    return data

# Then it's a simple map_blocks
result = xy.map_blocks(a_numpy_ctypes_func, meta=np.array(()))