# Performing Pairwise Correlation Coefficient Calculations Across Chunks

Hello everyone!

I’m currently updating a repository related to this project. One of the next things we need to do is perform a Pearson correlation coefficient calculation across the descriptors that the `HOG` function gives us. In this case I’m using the `np.corrcoef` function upon each array stored in a zarr.

One way we could do it is:

``````for i in range(zarr_array):

current_array = zarr_array[ i ]
next_array = zarr_array [ i + 1]

correlation = np.corrcoef(current_array, next_array)

# save correlation matrix to disk, do other processing, etc...

``````

But this takes about 5 minutes to complete. When we’re doing the number of subjects we’d like to get to, about 50 per day, that adds up to a lot of time really fast! Something like 4 hours! I want it to go nice and fast by parallelizing things with `dask`

The issue I’m running into is that while I feel like I could do this over each zarr chunk, I don’t know how to make sure that the coefficients between the two chunks get calculated!

Thankfully I’ll know the chunk sizes I’d have for each piece of data since it’s stored in a zarr array, but I’m not sure how to make sure that parallel processes know how to compare across chunk edges. I’m guessing that what I want is something like `dask.array.map_overlap`, but the combination of calculating those arrays in each chunk with `map_blocks` and then also having `map_overlap` is really tripping me up.

@jmdelahanty

``````import dask
import numpy as np

def chunk_corrcoef(first, second):
assert first.shape == second.shape
n_zarrs = first.shape[0]
first_out = np.corrcoef(first[0], second[0])
out = np.empty((n_zarrs,) + first_out.shape)
out[0] = first_out
for i in range(1, n_zarrs):
out[i] = np.corrcoef(first[i], second[i])
return out

input_file = "/path/to/hog_descriptors/data.zarr"
output_file = "/path/to/corrcoef/data.zarr"

z = da.from_zarr(input_file)  # 2d array

# To prepare for ``map_blocks()``, ensure the same chunking for both input arrays
# (Not necessary if using ``blockwise()`` instead.)
new_chunks = {0: z.chunks[0][0], 1: -1}
firsts = z[0:-1, :].rechunk(new_chunks)
seconds = z[1:, :].rechunk(new_chunks)

delayed_results = da.map_blocks(
chunk_corrcoef, firsts, seconds, new_axis=2, chunks=(firsts.chunks[0], 2, 2)
)
delayed_results.to_zarr(output_file, overwrite=True)
``````
2 Likes

@ParticularMiner

My word, you’re amazing. That worked right out of the box!!! It took about 10 seconds. WHAT.

A couple things that I’m not sure I understand though. I think I’ll try to go line by line:

`assert first.shape == second.shape`

This is just to ensure that we have the same shapes we’re comparing for the data right?

`n_zarrs` is just the length of the zarr dataset I’m pretty sure.

`first_out` is to make sure that the shape of what we’re computing is known for dask to write out things properly later

`new_chunks` is something I’m not sure about. Is it creating a new dictionary that defines the chunk sizes for the `first` and `second` argument in the `chunk_corrcoef`? Since we’re doing these computations chunk-wise, I’m guessing that we need the chunks we’re computing upon to be the same size?

In `firsts`, isn’t doing `z[0:-1]` the same as `z[0:, :]`? I’m alittle confused by that part.

Lastly, I was wondering why you called `delayed_results` that. Does `map_blocks` know to have a delayed computation here? I’ve never used `dask.delayed` before and I don’t see you calling it explicitly in the code you wrote.

Lastly, could you also explain the difference between `map_blocks` and `blockwise`? From looking at docs, I’m not sure where you would want to use `blockwise` vs `map_blocks`. Maybe `blockwise` can be used when you already know your input and output chunks specifically? I had thought that `map_blocks` also expects that from what you taught me about how to make the `HOG` calculations work…

1 Like

@jmdelahanty

I hope the following explanation helps:

# `map_blocks()`

The following diagram illustrates how `map_blocks()` associates chunks of two input arrays with shapes `(9,)` and `(6,)` which both have 3 chunks each.

Note that the input arrays do not need to have the same shape. They only need to have the same number of chunks (along corresponding axes) up to broadcasting rules (follow this link to learn about broadcasting).

Under the hood, `map_blocks()` uses `blockwise()` with the parameter `align_arrays` set to `False` in order to achieve this.

# `blockwise()`

The following diagram illustrates how `blockwise()` associates chunks of two arrays of shape `(9,)` by default.

By default, `blockwise()` expects corresponding axes of the input arrays (that is, axes that have been given the same label, such as `'i'` in the diagram) to have the same length.

But note that, unlike `map_blocks()`, input arrays do not need to have the same number of blocks, since `blockwise()` will automatically chop-up chunks where necessary and align them in such a way that a chunk of one input array is associated with corresponding chunks of the same size in other input arrays.

3 Likes

Haven’t looked at this too much. For some metrics there is API Reference — xskillscore documentation which uses dask arrays as xarray objects

2 Likes

@ParticularMiner This explanation is excellent! I’d really like to make this more discoverable, shall we add this to the docs or write a short blog around this?

1 Like

Thank you @pavithraes !

Of course, feel free to use it as you deem fit! At the moment, I’m not sure where best in the docs it could be placed. Perhaps you have an idea?

1 Like