Slicing a dask array with a dask dataframe in one compute

Hello there :wave:

I’ve got a large Zarr array stored in google cloud, along with a parquet file with metadata that I can load as a dask dataframe.

What I’d like is to efficiently slice out a piece of that array using some logical operations that use the dataframe–e.g. yielding a Series of boolean values of which rows I need. An example snippet would look like this:

my_dask_df = dd.from_parquet("gs://...")
my_dask_arr = da.from_zarr("gs://...")

some_data = my_dask_arr[my_dask_df["label"].isin(some_labels), :].compute()

I’d prefer to only call compute once, but this raises an error. I think this is because the dimensions of the index are not known until the logic is computed, so the scheduler it’s sure how to collect the data.

If I compute the index first then I can get the array in another compute, but this feels a little wasteful to me as I need to bring that index to my local machine and then send it back out to the workers. If I only need a tiny fraction of the data, this could a cause a memory spike (storing the full index) that would be nice to avoid.

Is there a way to do this in one computation, perhaps using delayed?

Thanks!

First off, dask Arrays just don’t support using a dask DataFrame as the indexer. But you can easily convert a DataFrame to an Array via .values, or to_dask_array.

But when you do so, the length of that Array is unknown, because the lengths of dask DataFrames are unknown. Dask needs to know not just the total length of the Array, but the length of each partition, in order to figure out which partition of the indexer to line up with which partition of my_dask_arr.

However, it seems in this case that you do know something about the length of the DataFrame, since you made it yourself. In particular, if you know the length of each of the Parquet partitions, then you could do:

idx = my_dask_df["label"].isin(some_labels)
partition_lengths = [PARTITION_LENGTH] * rows.npartitions
# or maybe partition_lengths = [100, 100, 250, 90, ...]
idx_arr = rows.to_dask_array(lengths=partition_lengths).squeeze()
some_data = my_dask_arr[idx_arr]

Something to consider is that if you don’t compute idx up front, then computing some_data will always require processing the entire Zarr array, even if only a tiny handful of chunks ended up matching your labels. If Dask doesn’t know which rows do or down’t match, it obviously has no choice but to process all of them.

So if 90% of the values in idx end up being False, it might actually be worth computing it up front, because then my_dask_arr[idx_arr] will let you skip loading 90% of the Zarr data. Whereas if idx_arr is delayed, you’ll always load all the Zarr data, even if you throw 90% of it away immediately.

Sorry, why is that? Are you saying that dask_arr[idx].compute() will download the entire array into memory and then slice it? Avoiding this is the whole motivation for this question.

My thought process was that the first task is to figure out the rows, and the Dask scheduler could immediately apply that information to the next task, which is fetching the data. Maybe the trick here is to make explicit the one-to-one matching between partitions in the dataframe and chunks in the row dimension of the array.

Sadly, this is not how Dask works. Simplified, the graph of tasks looks like:

Load parquet
      |
      v             Load zarr
get matching rows      |
       \               |
        v              v
       slice rows from array

Loading data happens before slicing. It is currently impossible, given the very basic definition of how dask works (graphs are statically defined before computation; each chunk is 1 key in the graph), to have an optimization that would rewrite that graph to something like:

Load parquet
      |
      v
get matching rows
      |
      v
decide which zarr chunks to load
      |
      v
load zarr
      |
      v
slice rows from array

This would be cool, but it would require a very different system.

At a very, very abstract level, you can’t have conditional logic in a Dask graph. It’s impossible to express something like “execute task A, then only if A is true, execute task B”. (In this case, imagine A = load_parquet_partition(54), B = load_zarr_chunk(54).) If you need to do something like this, you have to basically run A first, then figure out client-side whether you want to go ahead with B, then run B (or not).

This is exactly what you’d be doing if you did:

idx = my_dask_df["label"].isin(some_labels)
idx_arr = idx.values.squeeze().compute()
some_data = my_dask_arr[idx_arr]

By computing your indices first, you can decide up front which tasks in my_dask_arr's graph to keep, and which ones to drop. Then, the un-needed tasks will never even be sent to the scheduler.

It sounds like you’re worried that if you do:

idx = my_dask_df["label"].isin(some_labels)
idx.compute()

idx will be pretty large and take up a lot of memory, yet most of the rows will be False.

Instead, what if you do (edit: fixed):

matches = my_dask_df["label"].isin(some_labels).reset_index(drop=True)
idx = matches[matches].index
# or, using `query`:
idx = my_dask_df.reset_index().query(f"label in {some_labels!r}").index

idx.compute()

Now, instead of a full-length boolean Series, you’ll have an integer Series of just the row numbers that match. Presumably, that will be much smaller.

I see. So maybe there’s a way to this by writing some delayed functions that can be dispatched and then aggregated, but that’s more complicated.

Now, instead of a full-length boolean Series, you’ll have an integer Series of just the row numbers that match. Presumably, that will be much smaller.

Yep I realized this later and this reply was sitting in the browser all night because I forgot to send it. I was trying to use nonzero on the values but reset_index might be better.

edit: Oh, dropna doesn’t drop the False values though, so that index is still the full size. I need to use nonzero on the values.

Thanks for your help!

Just realized that too; I fixed the original post to use either [...] selection syntax or query.

Yes, at best it would be possible (probably with map_partitions) to write in a custom way where you run a task for each partition of my_dask_df["label"].isin(some_labels), and that task either uses Zarr to load the corresponding array partition, or skips Zarr and returns a NumPy array of size 0 if all the rows are False. Note that no matter what, you’d still have the same number of tasks in the graph, it’s just that internally, many of them would do nothing.

But if you did this, you’d need to represent it as a dask Array with unknown chunksizes, because some of your chunks would be full-size, and some would be empty. And many operations fail on arrays with unknown chunksizes, so it might not do you much good. At that point, it’s possible you’d be better off skippping Array entirely, and doing the whole thing in delayed.

1 Like