Hello everyone!
I’m currently updating a repository related to this project. One of the next things we need to do is perform a Pearson correlation coefficient calculation across the descriptors that the HOG
function gives us. In this case I’m using the np.corrcoef
function upon each array stored in a zarr.
Here’s a google drive link to an example zarr in question.
One way we could do it is:
for i in range(zarr_array):
current_array = zarr_array[ i ]
next_array = zarr_array [ i + 1]
correlation = np.corrcoef(current_array, next_array)
# save correlation matrix to disk, do other processing, etc...
But this takes about 5 minutes to complete. When we’re doing the number of subjects we’d like to get to, about 50 per day, that adds up to a lot of time really fast! Something like 4 hours! I want it to go nice and fast by parallelizing things with dask
The issue I’m running into is that while I feel like I could do this over each zarr chunk, I don’t know how to make sure that the coefficients between the two chunks get calculated!
Thankfully I’ll know the chunk sizes I’d have for each piece of data since it’s stored in a zarr array, but I’m not sure how to make sure that parallel processes know how to compare across chunk edges. I’m guessing that what I want is something like dask.array.map_overlap
, but the combination of calculating those arrays in each chunk with map_blocks
and then also having map_overlap
is really tripping me up.
Any advice fellow Dask enthusiasts?
@jmdelahanty
I suppose there are several ways to go about this. Here’s one which uses slicing:
import dask
import numpy as np
import dask.array as da
def chunk_corrcoef(first, second):
assert first.shape == second.shape
n_zarrs = first.shape[0]
first_out = np.corrcoef(first[0], second[0])
out = np.empty((n_zarrs,) + first_out.shape)
out[0] = first_out
for i in range(1, n_zarrs):
out[i] = np.corrcoef(first[i], second[i])
return out
input_file = "/path/to/hog_descriptors/data.zarr"
output_file = "/path/to/corrcoef/data.zarr"
z = da.from_zarr(input_file) # 2d array
# To prepare for ``map_blocks()``, ensure the same chunking for both input arrays
# (Not necessary if using ``blockwise()`` instead.)
new_chunks = {0: z.chunks[0][0], 1: -1}
firsts = z[0:-1, :].rechunk(new_chunks)
seconds = z[1:, :].rechunk(new_chunks)
delayed_results = da.map_blocks(
chunk_corrcoef, firsts, seconds, new_axis=2, chunks=(firsts.chunks[0], 2, 2)
)
with dask.config.set(scheduler='threading'): # compare to ``scheduler='processes'``?
delayed_results.to_zarr(output_file, overwrite=True)
2 Likes
@ParticularMiner
My word, you’re amazing. That worked right out of the box!!! It took about 10 seconds. WHAT.
A couple things that I’m not sure I understand though. I think I’ll try to go line by line:
assert first.shape == second.shape
This is just to ensure that we have the same shapes we’re comparing for the data right?
n_zarrs
is just the length of the zarr dataset I’m pretty sure.
first_out
is to make sure that the shape of what we’re computing is known for dask to write out things properly later
new_chunks
is something I’m not sure about. Is it creating a new dictionary that defines the chunk sizes for the first
and second
argument in the chunk_corrcoef
? Since we’re doing these computations chunk-wise, I’m guessing that we need the chunks we’re computing upon to be the same size?
In firsts
, isn’t doing z[0:-1]
the same as z[0:, :]
? I’m alittle confused by that part.
Lastly, I was wondering why you called delayed_results
that. Does map_blocks
know to have a delayed computation here? I’ve never used dask.delayed
before and I don’t see you calling it explicitly in the code you wrote.
Lastly, could you also explain the difference between map_blocks
and blockwise
? From looking at docs, I’m not sure where you would want to use blockwise
vs map_blocks
. Maybe blockwise
can be used when you already know your input and output chunks specifically? I had thought that map_blocks
also expects that from what you taught me about how to make the HOG
calculations work…
1 Like
@jmdelahanty
I hope the following explanation helps:
map_blocks()
The following diagram illustrates how map_blocks()
associates chunks of two input arrays with shapes (9,)
and (6,)
which both have 3 chunks each.
Note that the input arrays do not need to have the same shape. They only need to have the same number of chunks (along corresponding axes) up to broadcasting rules (follow this link to learn about broadcasting).
Under the hood, map_blocks()
uses blockwise()
with the parameter align_arrays
set to False
in order to achieve this.
blockwise()
The following diagram illustrates how blockwise()
associates chunks of two arrays of shape (9,)
by default.
By default, blockwise()
expects corresponding axes of the input arrays (that is, axes that have been given the same label, such as 'i'
in the diagram) to have the same length.
But note that, unlike map_blocks()
, input arrays do not need to have the same number of blocks, since blockwise()
will automatically chop-up chunks where necessary and align them in such a way that a chunk of one input array is associated with corresponding chunks of the same size in other input arrays.
3 Likes
Haven’t looked at this too much. For some metrics there is API Reference — xskillscore documentation which uses dask arrays as xarray objects
2 Likes
@ParticularMiner This explanation is excellent! I’d really like to make this more discoverable, shall we add this to the docs or write a short blog around this?
1 Like
Thank you @pavithraes !
Of course, feel free to use it as you deem fit! At the moment, I’m not sure where best in the docs it could be placed. Perhaps you have an idea?
1 Like