Map_overlap() doesn't pass partitions in a chronological order

Dask’s df.map_overlap() function passes the partitions in non chronological order. This is extremely inefficient for computations on large datasets with a lot of overlap. Of course, we are running parallelized anyway, so the order of operations is not guaranteed, but this seems like a design flaw that contributes to a higher than necessary memory load. Does anyone have an idea of how this could be solved?

Here is a MVE that prints the start and end dates of each partition. You can see that the ordering seems to be random. We seem to be starting in the middle of the month, and ending at the beginning of the month. For a dataset spanning several years, this casues unnecessary memory load.

Can I order the partitions chronologically, or modify the scheduler to schedule the tasks in chronological order?

import pandas as pd
import dask.dataframe as dd
import numpy as np

date_range = pd.date_range(start='2023-06-01', end='2023-07-01', freq='T')
df = pd.DataFrame({
    'timestamp': date_range,
    'quantity': np.random.rand(len(date_range))
})

df = dd.from_pandas(df, npartitions=5)
df['timestamp'] = df['timestamp'].dt.ceil('1h')
df = df.set_index('timestamp', sorted=True)

df = df.repartition(freq='1h')

def custom_func(ser):
    print(f'from: {ser.index.min()}, to: {ser.index.max()}')
    return pd.Series(np.mean(ser), index=ser.index)

results = df['quantity'].map_overlap(custom_func, before=pd.Timedelta(days=7), after=0, meta=pd.Series(dtype='float64'), align_dataframes=False, enforce_metadata=False).drop_duplicates()
 
results.compute()

Hi @FredT, welcome to Dask community!

Well, this is a really complex problem, as map_overlap involves some repartitionning to generate each overlapped partition. In a distributed context, it would be really hard to ensure a maximum of input partitions contributing to the same overlapped partition are read on the same machine. I don’t think Dask Scheduler can handle this kind of data localisation as of now.

Anyway, as explained in this documentation page, Dask tries its best to order the computation correctly.

Taking your example and simplifying it a bit, I end with a very satisfying task ordering:

import pandas as pd
import dask.dataframe as dd
import numpy as np
import dask

date_range = pd.date_range(start='2023-06-30', end='2023-07-01', freq='T')
df = pd.DataFrame({
'timestamp': date_range,
'quantity': np.random.rand(len(date_range))
})

df = dd.from_pandas(df, npartitions=5)
df['timestamp'] = df['timestamp'].dt.ceil('1h')
df = df.set_index('timestamp', sorted=True)

df = df.repartition(freq='1h')

def custom_func(ser):
    print(f'from: {ser.index.min()}, to: {ser.index.max()}')
    return pd.Series(np.mean(ser), index=ser.index)

results = df['quantity'].map_overlap(custom_func, before=pd.Timedelta(hours=6), after=0, meta=pd.Series(dtype='float64'), align_dataframes=False, enforce_metadata=False).drop_duplicates()

dask.visualize(results,
               optimize_graph=True, color="order",
               cmap="autumn", node_attr={"penwidth": "4"})

output:

You can see the order is pretty well respecting left to right and vertical priority. This is also what I observe in the log.

However in your case, with 7 days overlap, the problem takes another scale! Maybe you could try using rolling instead of map_overlap? It might be better optimized for your use case, but no guarantee.

1 Like

Hi @guillaumeeb. Thanks for the reply! The problem with rolling is that it is not possible to use a stride (in the example above, it is 1 hour). So the stride becomes a single row, which in my case are billions. This is pretty inefficient. Do you know if there is any way to change the way in which the scheduler feeds data into map_overlap? I’ve also experimented with creating tasks in a for loop, but the result is often much slower and more memory inefficient than map_overlap

Well, I’m not sure…

You can try to have a look at optimizations functions from Optimization — Dask documentation or Advanced graph manipulation — Dask documentation.

The other thing would be to try playing with chunk size, but not sure if would really change the problem.

I believe the df = df.repartition(freq='1h') is confusing dask order (i.e. this is a bug) since it is generating many asymmetrical splits. Dask.order is having a hard time making the right decision and some partitions are held in memory longer than necessary causing your high memory footprint.

Even @guillaumeeb’s example is showing artifacts of this when inspecting closely, e.g. the branches on the bottom indicate that we have to hold the original partition in memory for quite a while since some of the splits have very low while the others have rather high priorities.
image

I think the culprit is repartition, not map_overlap.

You said that rolling is not an option since you cannot select strides and this was too inefficient for your case. How would you express this computation using pandas only? I wonder if there is a simpler way to express what you are trying to achieve.