Simple parallelism with numpy and ctypes functions

I have prepared some working code as a mockup for what I am trying to accomplish, and I am looking for feedback to see what parts of dask I could be using better/more efficiently. I added 4 questions in the code comments. Thanks in advance!

import numpy as np
import dask.array as da
from dask.distributed import wait
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)

data_size = int(1e6)
chunk_size = int(1e5)

# a class to illustrute similar data structuring to what I have
# in my case all array members will have the same shapes and chunk sizes
class MyData:
    def __init__(self, x, y):
        self.x = x
        self.y = y


# I need to push this data through a ctypes function, and I'd like to do it with dask
data1 = MyData(
    x=da.from_array(np.arange(0, data_size), chunks=chunk_size),
    y=da.from_array(np.arange(0, data_size), chunks=chunk_size),
)

# I assume my arbitrary chunks are sub-optimal since the chunks assigned
# to particular workers may not reside on the particular
# worker, making lots of network traffic!
# Q1. is there a better way (can I select a chunk based on the current worker)?
run_chunks = np.arange(0, data_size, chunk_size)
run_chunks = list(
    zip(run_chunks, run_chunks + [chunk_size - 1] * run_chunks.size)
)

# not truly a ctypes function, but here so I have a running example
def a_numpy_ctypes_func(data):
    data.x = data.x + 1
    data.y = data.y * 2


def get_the_job(data1: MyData):
    def the_job(run_chunk):
        # Q2. is extracting to a np.array necessary here for using ctypes pointers,
        # numba etc, and if so is this the right way to extract it?
        my_data_np = MyData(
            x=np.array(data1.x[run_chunk[0] : run_chunk[1]]),
            y=np.array(data1.x[run_chunk[0] : run_chunk[1]]),
        )
        a_numpy_ctypes_func(my_data_np)
        return MyData(
            x=da.from_array(my_data_np.x), y=da.from_array(my_data_np.y)
        )

    return the_job


futures = client.map(get_the_job(data1), run_chunks)

# Q3. do I need to wait here, or will this automatically happen as the result is consumed later?
wait(futures)

# finally stitch the result back together to pass to next processes
# Q4. is this safe in terms of preserving the original ordering?
data2 = MyData(
    x=da.concatenate([f.result().x for f in futures]),
    y=da.concatenate([f.result().y for f in futures]),
)

Hi @smorken, welcome here!

I had trouble understanding your questions at first, but then I understood a bit more why by looking at the code. I think you shouldn’t be mixing dask.array and futures, you probably don’t need it, and this makes your code hard to follow.

If you need to apply a function on blocks of 2 arrays of similar shape that must be aligned, couldn’t you just concatenate them from the start in a single dask.array with one more dimension?

It’s even simpler if the functions you need to apply on x and y are independent, you can create independent graphs.

I’m not sure if this can be generalized to your real use case, but with your example, I would do something like:

data_size = int(1e6)
chunk_size = int(1e5)

# Don't use a class, just put everything into a dask-array, or have a look at XArray?
# This allows to have chunks with both x and y on the same workers, especially with the rechunking
# Ideally, you would have the correct chunking from the start, without needing to rechunk
x = da.arange(0, data_size, chunks=chunk_size)
y = da.arange(0, data_size, chunks=chunk_size)
data1 = da.stack([x,y])
data1 = xy.rechunk((2, chunk_size))

# not truly a ctypes function, but here so I have a running example
def a_numpy_ctypes_func(data):
    data[0] = data[0] + 1
    data[1] = data[1] * 2
    return data

# Then it's a simple map_blocks
result = xy.map_blocks(a_numpy_ctypes_func, meta=np.array(()))

Thank you for taking the time to read the code and respond. I think I am starting to understand.

The feature I think I was searching for was a way in the API to efficiently create and enforce a common chunking and iteration structure for multiple dask arrays of the same dimensions.

Stacking them into one big array is certainly a solution for this, but that approach does not support multiple array types, unless I am mistaken, which is part of what I am after.

Since currently my arrays are all 1 dimensional but have different types, perhaps putting them into one or more dask dataframes is probably the right solution. My assumption there is that there will be potential costs with aligning multiple dataframes’ partitions and also costs just by virtue of using dataframes instead of basic arrays. I dont need “all” of the dataframe features, I’m just after the multiple array alignment, and that’s why I was trying to figure out how to do this with dask arrays.

Thanks for mentioning XArray, it looks like a good solution for this problem as well, and I’ll have to read up a bit more about this!