How can I reindex in Dask dataframe with timeseries index

Hi everyone, I am currently dealing with a timeseries data and the unit is mircrosecond (us). The example is shown as below:

                            size
2021-09-01 00:00:00.000001     0
2021-09-01 00:00:00.000004     1
2021-09-01 00:00:00.000007     2
2021-09-01 00:00:00.000010     3
2021-09-01 00:00:00.000013     4

What I need to is to compute the sum of size columns of every moving 2 microseconds (1,2 are in a group, 3,4 are in a group, etc.). The start time will be the timestamp of the first row. As you can see that, there are some missing timestamps, and the size column value is regarded as 0 for those missing timestamps. I currently can use Pandas reindex function and date_range to insert rows for those missing timestamps. After that, I can use rolling window to compute what I want easily. However, since there’s no reindex function in Dask DataFrame, I don’t have an idea how to do this in Dask. Can someone enlight me some ways to implement this feature?

                            size
2021-09-01 00:00:00.000001     0
2021-09-01 00:00:00.000002     0
2021-09-01 00:00:00.000003     0
2021-09-01 00:00:00.000004     1
2021-09-01 00:00:00.000005     0
2021-09-01 00:00:00.000006     0
2021-09-01 00:00:00.000007     2
2021-09-01 00:00:00.000008     0
2021-09-01 00:00:00.000009     0
2021-09-01 00:00:00.000010     3
2021-09-01 00:00:00.000011     0
2021-09-01 00:00:00.000012     0
2021-09-01 00:00:00.000013     4

The reason why I need to use Dask to do this is because my original dataset is huge, so I want to use Dask to compute.

Hi @vava24680,

I’m afraid there is no easy solution to do this with Dask currently.

However, maybe some workaround can be found? In what format is your original dataset? Couldn’t you use Pandas dataframe calls on already ordered parts of the dataset (you could even distribute those with Dask), and when it’s done use dask dataframe to process all the time serie?

Dask supports resampling timeseries data.
I think your case can be expressed by something like

df.resample("2us").sum()

Example (using ms since its easier to compute, us is very granular but possible)

In [1]: import pandas as pd

In [2]: import dask.dataframe as dd

In [3]: index = pd.date_range('1/1/2000', periods=9, freq='T')
   ...: series = pd.Series(range(9), index=index)

In [4]: ddf = dd.from_pandas(series, npartitions=2).to_frame()

In [5]: ddf
Out[5]:
Dask DataFrame Structure:
                         0
npartitions=2
2000-01-01 00:00:00  int64
2000-01-01 00:05:00    ...
2000-01-01 00:08:00    ...
Dask Name: to_frame, 2 graph layers

In [6]: ddf.resample("2ms").sum().compute()
Out[6]:
                         0
2000-01-01 00:00:00.000  0
2000-01-01 00:00:00.002  0
2000-01-01 00:00:00.004  0
2000-01-01 00:00:00.006  0
2000-01-01 00:00:00.008  0
...                     ..
2000-01-01 00:07:59.992  0
2000-01-01 00:07:59.994  0
2000-01-01 00:07:59.996  0
2000-01-01 00:07:59.998  0
2000-01-01 00:08:00.000  8

[240001 rows x 1 columns]