Performing Pairwise Correlation Coefficient Calculations Across Chunks (and map_blocks vs blockwise)

Hello everyone!

I’m currently updating a repository related to this project. One of the next things we need to do is perform a Pearson correlation coefficient calculation across the descriptors that the HOG function gives us. In this case I’m using the np.corrcoef function upon each array stored in a zarr.

Here’s a google drive link to an example zarr in question.

One way we could do it is:

for i in range(zarr_array):

    current_array = zarr_array[ i ]
    next_array = zarr_array [ i + 1]

    correlation = np.corrcoef(current_array, next_array)
    
    # save correlation matrix to disk, do other processing, etc...

But this takes about 5 minutes to complete. When we’re doing the number of subjects we’d like to get to, about 50 per day, that adds up to a lot of time really fast! Something like 4 hours! I want it to go nice and fast by parallelizing things with dask

The issue I’m running into is that while I feel like I could do this over each zarr chunk, I don’t know how to make sure that the coefficients between the two chunks get calculated!

Thankfully I’ll know the chunk sizes I’d have for each piece of data since it’s stored in a zarr array, but I’m not sure how to make sure that parallel processes know how to compare across chunk edges. I’m guessing that what I want is something like dask.array.map_overlap, but the combination of calculating those arrays in each chunk with map_blocks and then also having map_overlap is really tripping me up.

Any advice fellow Dask enthusiasts?

@jmdelahanty

I suppose there are several ways to go about this. Here’s one which uses slicing:

import dask
import numpy as np
import dask.array as da


def chunk_corrcoef(first, second):
    assert first.shape == second.shape
    n_zarrs = first.shape[0]
    first_out = np.corrcoef(first[0], second[0])
    out = np.empty((n_zarrs,) + first_out.shape)
    out[0] = first_out
    for i in range(1, n_zarrs):
        out[i] = np.corrcoef(first[i], second[i])
    return out


input_file = "/path/to/hog_descriptors/data.zarr"
output_file = "/path/to/corrcoef/data.zarr"

z = da.from_zarr(input_file)  # 2d array

# To prepare for ``map_blocks()``, ensure the same chunking for both input arrays
# (Not necessary if using ``blockwise()`` instead.)
new_chunks = {0: z.chunks[0][0], 1: -1}
firsts = z[0:-1, :].rechunk(new_chunks)
seconds = z[1:, :].rechunk(new_chunks)

delayed_results = da.map_blocks(
    chunk_corrcoef, firsts, seconds, new_axis=2, chunks=(firsts.chunks[0], 2, 2)
)
with dask.config.set(scheduler='threading'):  # compare to ``scheduler='processes'``?
    delayed_results.to_zarr(output_file, overwrite=True)
2 Likes

@ParticularMiner

My word, you’re amazing. That worked right out of the box!!! It took about 10 seconds. WHAT.

A couple things that I’m not sure I understand though. I think I’ll try to go line by line:

assert first.shape == second.shape

This is just to ensure that we have the same shapes we’re comparing for the data right?

n_zarrs is just the length of the zarr dataset I’m pretty sure.

first_out is to make sure that the shape of what we’re computing is known for dask to write out things properly later

new_chunks is something I’m not sure about. Is it creating a new dictionary that defines the chunk sizes for the first and second argument in the chunk_corrcoef? Since we’re doing these computations chunk-wise, I’m guessing that we need the chunks we’re computing upon to be the same size?

In firsts, isn’t doing z[0:-1] the same as z[0:, :]? I’m alittle confused by that part.

Lastly, I was wondering why you called delayed_results that. Does map_blocks know to have a delayed computation here? I’ve never used dask.delayed before and I don’t see you calling it explicitly in the code you wrote.

Lastly, could you also explain the difference between map_blocks and blockwise? From looking at docs, I’m not sure where you would want to use blockwise vs map_blocks. Maybe blockwise can be used when you already know your input and output chunks specifically? I had thought that map_blocks also expects that from what you taught me about how to make the HOG calculations work…

1 Like

@jmdelahanty

I hope the following explanation helps:

map_blocks()

The following diagram illustrates how map_blocks() associates chunks of two input arrays with shapes (9,) and (6,) which both have 3 chunks each.

Note that the input arrays do not need to have the same shape. They only need to have the same number of chunks (along corresponding axes) up to broadcasting rules (follow this link to learn about broadcasting).

Under the hood, map_blocks() uses blockwise() with the parameter align_arrays set to False in order to achieve this.

g26794

blockwise()

The following diagram illustrates how blockwise() associates chunks of two arrays of shape (9,) by default.

By default, blockwise() expects corresponding axes of the input arrays (that is, axes that have been given the same label, such as 'i' in the diagram) to have the same length.

But note that, unlike map_blocks(), input arrays do not need to have the same number of blocks, since blockwise() will automatically chop-up chunks where necessary and align them in such a way that a chunk of one input array is associated with corresponding chunks of the same size in other input arrays.

blockwise

3 Likes

Haven’t looked at this too much. For some metrics there is API Reference — xskillscore documentation which uses dask arrays as xarray objects

2 Likes

@ParticularMiner This explanation is excellent! I’d really like to make this more discoverable, shall we add this to the docs or write a short blog around this?

1 Like

Thank you @pavithraes !

Of course, feel free to use it as you deem fit! At the moment, I’m not sure where best in the docs it could be placed. Perhaps you have an idea?

1 Like