Dask’s df.map_overlap()
function passes the partitions in non chronological order. This is extremely inefficient for computations on large datasets with a lot of overlap. Of course, we are running parallelized anyway, so the order of operations is not guaranteed, but this seems like a design flaw that contributes to a higher than necessary memory load. Does anyone have an idea of how this could be solved?
Here is a MVE that prints the start and end dates of each partition. You can see that the ordering seems to be random. We seem to be starting in the middle of the month, and ending at the beginning of the month. For a dataset spanning several years, this casues unnecessary memory load.
Can I order the partitions chronologically, or modify the scheduler to schedule the tasks in chronological order?
import pandas as pd
import dask.dataframe as dd
import numpy as np
date_range = pd.date_range(start='2023-06-01', end='2023-07-01', freq='T')
df = pd.DataFrame({
'timestamp': date_range,
'quantity': np.random.rand(len(date_range))
})
df = dd.from_pandas(df, npartitions=5)
df['timestamp'] = df['timestamp'].dt.ceil('1h')
df = df.set_index('timestamp', sorted=True)
df = df.repartition(freq='1h')
def custom_func(ser):
print(f'from: {ser.index.min()}, to: {ser.index.max()}')
return pd.Series(np.mean(ser), index=ser.index)
results = df['quantity'].map_overlap(custom_func, before=pd.Timedelta(days=7), after=0, meta=pd.Series(dtype='float64'), align_dataframes=False, enforce_metadata=False).drop_duplicates()
results.compute()