Parallelize or map chunks of arrays with different sizes, shapes and number of blocks

I’m using Dask Array to work with an image and some other data arrays with 2 or 3 dimensions, and with blocks not aligned to those of the image.
For every pixel/cell of the image array, I have to compute a function with every value on the data arrays.

In terms of Dask arrays, I need to find a way to divide the load per chunks of the data arrays but also per chunk of image, ideally without falling back to the use of a for-loop to iterate in every chunk of the image.

More particularly, I have to use the image indices

import dask.array as da
import numpy as np

n, m = (4096, 2048)  
x = np.linspace(0, 1, n)  
y = np.linspace(0, 1, m)  
xv, yv = da.meshgrid(x, y) 

The data arrays look like this:

n0, n1, n2 = (56000, 8, 4)
chunks = {0:10000}

# 2d arrays
a = da.random.random((n0, n1), chunks=chunks).astype(np.float32)
b = da.random.random((n0, n1), chunks=chunks).astype(np.float32)

# 3d arrays    
c = da.random.random((n0, n1, n2), chunks=chunks).astype(np.float32)
# complex array
dr = da.random.random((n0, n1, n2), chunks=chunks).astype(np.float32)    
di = 1.0j * da.random.random((n0, n1, n2), chunks=chunks).astype(np.float32)    
d = dr + di

Without using map_blocks/blockwise, the fully broadcasted version would look like the following example.

axv = a[:, :, None, None] * xv[None, None, :, :]
byv = b[:, :, None, None] * yv[None, None, :, :]

u = axv + byv
v = c * d
vr = v.real[:, :, :, None, None] * da.cos(2 * np.pi * u)[:, :, None, :, :]
vi = v.imag[:, :, :, None, None] * da.sin(2 * np.pi * u)[:, :, None, :, :]

w = vr - vi

# Reduce data to image dim

res = da.sum(w, axis=[0, 1, 2])

I’m not trying to do that, the process of broadcasting results in too large arrays.

What I have tried until now

I’ve been trying the program with images of size 256.

n, m = (256, 256)

With this size, the following strategy has been working without problems.

1.- Broadcast and multiply the data array with the index arrays, so the firsts dimensions come from the data array.
2.- Use blockwise to map the calculation on every block.

from numba import float32, guvectorize

def block_fun(u, c, d):
	vr = c[0] * d[0].real
	vi = c[0] * d[0].imag
	res = numba_fun(u, vr, vi)
	return res

@guvectorize(
	[(float32[:, :], float32[:], float32[:], float32[:, :])],
	'(n,m),(i),(i)->(n,m)',
	nopython=True,
)
def numba_fun(u, vr, vi, res):
	u_pi = 2 * np.pi * u
	cos_u = np.cos(u_pi)
	sin_u = np.sin(u_pi)
	for i in range(vr.shape[0]):
		res += vr[i] * cos_u - vi[i] * sin_u


n, m = (256, 256)
x = np.linspace(0, 1, n)  
y = np.linspace(0, 1, m)  
xv, yv = da.meshgrid(x, y) 

axv = a[:, :, None, None] * xv[None, None, :, :]
byv = b[:, :, None, None] * yv[None, None, :, :]

u = axv + byv

data = da.blockwise(
	block_fun,
	'ijnm',
	u,
	'ijnm',
	c,
	'ijk',
	d,
	'ijk',
	dtype=np.float32
)

res = da.sum(data, axis=[0, 1])

This works because the size of each block of the broadcasted array is small enough to fit in memory. Changing the size of the image with this version does not work due to memory limits.

What do I need

The expected result is get the modified image, without memory problems.
One option is to modify what has been working with smaller images, but as I mentioned before, the idea is to avoid a for-loop over the image/image-chunks, and I don’t know how to do it without the loop.
Any suggestion or modification on the existing code is greatly appreciated.

Hi @anemia,

First of all, welcome to Dask community!

Okay, so I’ve just gone through your post, which is already really detailed and with a lot of information, this seems to be some tricky problem. I fear it goes a bit beyond my technical knowledge, and it will need me more time to really understand what you’re trying to achieve and what are the limitations.

I’ve got some simple questions though:

  • Is this an embarrassingly parallel problem, or, as I’m thinking, it is much complicated than that?
  • In term of memory usage, where is really the limitation: the data arrays? The image? During the computation?
  • what is preventing you from using map_blocks?
  • Could you show how the for loop would look like?

Sorry, this might be a bit silly question, but it’s a bit hard to all grasp for me.

Hi @guillaumeeb, thank you for your interest.
I’ll try to answer each point as clearly as possible.

  • This problem is parallelizable. Although the input has many arrays, the output is the size of the image. So for each image pixel in the input, we have only one pixel as a result. If we isolate a pixel, the result is the sum of applying the function (with sines and cosines) in each index of the “data” arrays. Thinking of it as parallel computing, if a task were to apply the function on one index of the arrays, there must be a final communication of the results of each task to perform the addition.
  • This problem is quite intensive in terms of memory usage (or compute time if you wanted to reduce memory usage). Currently, the largest increase in memory usage is from broadcasting data arrays, increasing in 2 dimensions with the size of the image. So, we could say that the limitation is the size of the image during the computation. Since the broadcast version works for small images, I want to know if it is possible to process and parallelize by chunk of the image. I think a non-broadcast version might be possible, but would it be just as fast?
  • About map_blocks, I don’t use it because with blockwise the dimensions of the result and the input are visually clearer/more explicit.
  • Continuing with what was shown in What I have tried until now, the for-loop would look like this:
import itertools

# Function that yields list of slices with index limits per chunk
def chunks_slices(size, chunksize):
    n_chunks = tuple(map(lambda x, y: x // y + 1 if x % y else x // y, size, chunksize))
    residue = tuple(map(lambda x, y: x % y, size, chunksize))
    for idx_chunk in itertools.product(
        *map(lambda x, y: range(x // y + 1 if x % y else x // y), size, chunksize)
    ):
        j = list(
            map(
                lambda i, x, r, n:
                slice(i * x, i * x + r if (i + 1 == n) and r > 0 else
                      (i + 1) * x), idx_chunk, chunksize, residue, n_chunks
            )
        )
        yield j

# Array to store the result
res = da.zeros_like(xv)

# Loop over the each block of the image
for inds in chunks_slices(xv.shape, xv.chunksize):
    xv_chunk = xv[inds[0], inds[1]]
    yv_chunk = yv[inds[0], inds[1]]
    
    # Continues with what was already working
    axv = a[:, :, None, None] * xv_chunk[None, None, :, :] 
    byv = b[:, :, None, None] * yv_chunk[None, None, :, :] 

    u = (axv + byv ).astype(np.float32)

    data = da.blockwise(
        block_fun, 
        'ijnm', 
        u, 
        'ijnm', 
        c, 
        'ijk', 
        d, 
        'ijk', 
        dtype=np.float32
    )

    res_chunk = da.sum(data, axis=[0, 1])
    res[inds[0], inds[1]] = res_chunk

In this example, block_fun, numba_fun, and the arrays are the same as before.
We could also iterate over the blocks of xv/yv, but I think we would have to store the res_chunks arrays and concatenate them, or calculate the index of each chunk, which would lead to a result similar as the one on the example.

I hope this clarifies the problem a bit. As I said in the post, any modification/suggestion is accepted. I am willing to scrap part of what has already been done if there is a better or more efficient solution.

Hi @anemia,

Just to say thank your for your detailed answer, and sorry I didn’t have to dig in this complicated question yet… I didn’t forget you, but it will take some time to test and understand this.

1 Like

I am struggling a little with the details of this but have you tried using much smaller input chunks (both for the meshgrid but also for the actual payload arrays)? With the provided (4096, 2048) you have intermediate chunks of almost 5TiB, for example

You can calculate the bytes requirement for your chunks easily yourself: 10000 * 8 * 2048 * 4096 * 64/8 You’ll notice that the numbers here are the chose chunk size and the grid (you can also chunk the grid, of course) so you have flexibility here. Also notice that you’ll end up with float64 again (I believe your grid is 64bit which is upcasting everything else)

I didn’t go through your custom code but FWIW dask is using blockwise internally as well to describe these multiplications but at some point the intermediate chunks just blow up out of proportion.

If you significantly reduce the chunksize you’ll also get manageable chunks and dask has a chance to work on the problem. Of course, this comes at the cost of having more tasks and possibly network communication. However, considering that most things here are blockwise, I would expect the additional network to not be a cost driver.