Please explain sorting

Hi,

I’m trying to understand how sorting works in Dask. ChatGPT is giving me contradictory answers and what I’m observing doesn’t make sense to me. I have a large dataframe (obviously) that I need sorted by EntryDate. Now EntryDate is NOT unique, so I can’t really set it as an index (as far as I know). So I’m just called the sort_values method directly.

Here is the code:

notes_df = dd.read_parquet(file_path)
notes_df = notes_df.repartition(npartitions=30)
notes_df.drop('VisitSID', axis=1)
notes_df['EntryDate'] = dd.to_datetime(notes_df['EntryDate'])
notes_df['ReportText'] = notes_df['ReportText'].apply(clean_ws)
notes_df = notes_df.sort_values(by='EntryDate')
notes_df.to_parquet(file_path, schema)

Now, up until the sort_values call, everything is executed lazily i.e., no action in the Dask dashboard. However, when sort_values is called, computation is triggers and Dask seems to run through all the previous computations along with the sort_values. According to ChatGPT, this is because sort_values requires the entire dataset and cannot be executed lazily (which makes sense). Is that true?

My intuition says that after sort_values, the dataframe is in a state where all the operations including sort has been performed. So when I call to_parquet, it just needs to write them to disk. However, when I call to_parquet, Dask seems to trigger all previous operations again (including sort_values I think). Now, ChatGPT says, I need to call persist so that it doesn’t need to run the operations again. Is that correct? If it is, why is that? All the operations are already done, why can’t Dask just write the parquet file to disk?

Is there an easier/better way to accomplish what I’m trying to do?

Thanks.

I think it would be reasonable to set EntryDate as the index column for this, and non-uniqueness is not a problem. In order to plan a sort, dask needs to know the range of values in the column of interest, which means having to load and process the previous elements f the compute up to that point. This just establishes reasonable start-stop bounds for each partition. From the set_index doc:

Under normal operation this function does an initial pass over the index column to compute approximate quantiles to serve as future divisions. It then passes over the data a second time, splitting up each input partition into several pieces and sharing those pieces to all of the output partitions now in sorted order.

sort_values does the same thing; and then when you write your output, dask applies those bounds, as well as sorting within partitions. Since this is a shuffle operation - each output partition contains elements from each input partition - it is costly to perform.

Thank you for your reply. Are you suggesting I just make the EntryDate as index and the dataframe gets sorted automatically?

For example,

notes_df = dd.read_parquet(file_path)
notes_df = notes_df.repartition(npartitions=30)
notes_df.drop('VisitSID', axis=1)
notes_df['EntryDate'] = dd.to_datetime(notes_df['EntryDate'])
notes_df['ReportText'] = notes_df['ReportText'].apply(clean_ws)
notes_df.set_index('EntryDate')
notes_df.to_parquet(file_path, schema)

would ensure the rows are sorted according to EntryDate or do I need to do anything extra?

I believe it would end up about the same either way - sort_values uses the same logic.

Would this solve the problem of double compute? Specifically, lets say I use set_index to sort the dataframe according to EntryDate. Then I want to write the sorted dataframe to disk and I call notes_df.to_parquet(...). Would this trigger all the computation again? I have not yet checked this. I have started set_index, it’ll take another hour to complete.

Referring again to the quoted text from before: dask requires two passes, one to get the bounds, and another to do the actual dorting.