Filtering big dataframe by index

I have a big dataset data.parquet indexed by id, partition by year and by group. So a single file would be:

data.parquet/year=2000/group=A/part.0.parquet

Now, given a list of ids, I want to filter the id´s. Within each partition, the index id is sorted.

For magnitudes, I want to filter 3 milion IDS, every year has around 300 milion IDS and I have 30 years of data. And I want to do this 100 times. The resulting data should be id, year columns, and ordered by id and year.

There are several options on how to do it (read the file and then use .loc, read the id´s, set them as index, and join the big dataset, or do read_parquet with a filter). What is the most appropriate way to perform such a standard task?

Hi @marcdelabarrera, welcome to Dask Discourse forum!

I’m not sure what you can do with Parquet filters, but I think the closer you apply the filter to the reads, the better it is. So in order, I would try:

  1. read_parquet filters,
  2. .loc directly on the Dask Dataframe, since the id is the index
  3. map_partition and .loc on the Pandas partition, which should be equivalent.

I wouldn’t advise doing a join, this should not be necessary.

cc @martindurant.

If you are not filtering using the directory structure (years, group), then you will of course end up accessing every one of the files, since (as I understand) a given ID can occur in any one of the files.

Using the pyarrow backend (which is the default), you can do row-filtering on load, which ought to decrease the memory footprint of the initial load function. However, because of the structure of parquet, you will read every byte of every column of interest anyway, and the difference is only how quickly you throw them away,

If your list of IDs is small enough that you can do the filter and concatenate all the results into memory, then your job is done by passing the filter to read_parquet (preferable) or doing a .map_maptitions(filter), and then .compute(). There is no need to set the index first, that would just cause massive shuffling of data between partitions.

If your filtered datasets would be too big for memory, I would recommend doing the same as above but instead of compute, write the data back to temporary parquet files.

Thank you both of you. For some reason, when I try to apply a filter in dd.read_parquet, I get an error (I´m using dask version 2022.6.1 and can´t update it)

Loading the dataset and using loc to find on particular id, takes 3 minutes. However, if I try to look for more than one ide using data.loc on a list of id, as I would do in pandas, I get an error. If I pass a list of id, I get ´Cannot index with list against unknown division.´, if I pass ids in tuple, I get a too many indices.

In any case, if you know a better way to structure the dataframe, I´m happy to implement it. I have a panel of id and years. Some time I want all id´s from a given year, others I want a subset of ids for all years. I guess the best approach is to split the sample first by id and then by year?

This conversation helped me think harder about the dataset, I´m redesigning it. In Dask, is there any reasonable way to handle panel data? I have not managed to find an appropriate workflow. I mean a dataset of the form

id; date; variables
1; 2000; …
1; 2001;
2; 2000;
2; 2001;

I am aware that dask does not work well with multiindex, so I guess the index should be the id.

I see two approaches, none of them fully satisfying:

  • Partition the dataset by date. Then when I load the file I get

id; date; variables
1; 2000; …
2; 2000;
1; 2001;
2; 2001;

which is undesirable because I lose the panel dimension (follow an id over time). This is what prompted this topic, then I need to find the ids across partitions, and sort by id and date to recover the initial dataset. Another way to put it. If I sort the dataset by id and year, but then I partition the dataset by year, it is equivalent to sort by year and id, which is not what I want.

  • Alternatively, I can split the data by groups of id. This way, when I load the data I get the correct sorting (id first, date next). However, if then I want all observations of a given date, I am forced to scan the entire dataset as @martindurant says.

  • Third alternative is to duplicate the dataset and have it partition in different ways, but this is costly in terms of storage space.

Is there a way to get either: a bunch of id´s, and follow them over time, or all id´s in a given time period, that does not involve scan the entire dataset? At least for economists, that would be a gamechanger. Any advise helps!

If you want all-the-ids-at-date as well as all-the-dates-at-id, then there is no optimal partitioning strategy. You can go to either extreme as you mention, or you could perhaps have partitioning on both variables somehow. The latter might end up with many more partitions, which also raises overhead, so no ideal solutions :slight_smile: .