How can I reindex in Dask dataframe with timeseries index

Hi everyone, I am currently dealing with a timeseries data and the unit is mircrosecond (us). The example is shown as below:

                            size
2021-09-01 00:00:00.000001     0
2021-09-01 00:00:00.000004     1
2021-09-01 00:00:00.000007     2
2021-09-01 00:00:00.000010     3
2021-09-01 00:00:00.000013     4

What I need to is to compute the sum of size columns of every moving 2 microseconds (1,2 are in a group, 3,4 are in a group, etc.). The start time will be the timestamp of the first row. As you can see that, there are some missing timestamps, and the size column value is regarded as 0 for those missing timestamps. I currently can use Pandas reindex function and date_range to insert rows for those missing timestamps. After that, I can use rolling window to compute what I want easily. However, since there’s no reindex function in Dask DataFrame, I don’t have an idea how to do this in Dask. Can someone enlight me some ways to implement this feature?

                            size
2021-09-01 00:00:00.000001     0
2021-09-01 00:00:00.000002     0
2021-09-01 00:00:00.000003     0
2021-09-01 00:00:00.000004     1
2021-09-01 00:00:00.000005     0
2021-09-01 00:00:00.000006     0
2021-09-01 00:00:00.000007     2
2021-09-01 00:00:00.000008     0
2021-09-01 00:00:00.000009     0
2021-09-01 00:00:00.000010     3
2021-09-01 00:00:00.000011     0
2021-09-01 00:00:00.000012     0
2021-09-01 00:00:00.000013     4

The reason why I need to use Dask to do this is because my original dataset is huge, so I want to use Dask to compute.

Hi @vava24680,

I’m afraid there is no easy solution to do this with Dask currently.

However, maybe some workaround can be found? In what format is your original dataset? Couldn’t you use Pandas dataframe calls on already ordered parts of the dataset (you could even distribute those with Dask), and when it’s done use dask dataframe to process all the time serie?

Dask supports resampling timeseries data.
I think your case can be expressed by something like

df.resample("2us").sum()

Example (using ms since its easier to compute, us is very granular but possible)

In [1]: import pandas as pd

In [2]: import dask.dataframe as dd

In [3]: index = pd.date_range('1/1/2000', periods=9, freq='T')
   ...: series = pd.Series(range(9), index=index)

In [4]: ddf = dd.from_pandas(series, npartitions=2).to_frame()

In [5]: ddf
Out[5]:
Dask DataFrame Structure:
                         0
npartitions=2
2000-01-01 00:00:00  int64
2000-01-01 00:05:00    ...
2000-01-01 00:08:00    ...
Dask Name: to_frame, 2 graph layers

In [6]: ddf.resample("2ms").sum().compute()
Out[6]:
                         0
2000-01-01 00:00:00.000  0
2000-01-01 00:00:00.002  0
2000-01-01 00:00:00.004  0
2000-01-01 00:00:00.006  0
2000-01-01 00:00:00.008  0
...                     ..
2000-01-01 00:07:59.992  0
2000-01-01 00:07:59.994  0
2000-01-01 00:07:59.996  0
2000-01-01 00:07:59.998  0
2000-01-01 00:08:00.000  8

[240001 rows x 1 columns]
1 Like

Hi @guillaumeeb, thank you for your reply first, sorry for delay response.

My original dataset is CSV and the format is analogous to the following one:

   size  timestamp
0     1          4
1     2          4
2     3          6
3     4          7

It has one timestamp column which is represented in integer format and unit is us, and other columns are all scalar type. It is guaranteed that the timestamp column is increasing (not strictly), so there may some repeated timestamps like the example above.

Currently I need to use groupby function to aggregate those rows with same timestamp, and due to original dataset is huge, I need do it by chunks.

My solution now:

  1. Split data into chunks
  2. Groupby on timestamp column to aggregate rows with same timestamp.
  3. Use Pandas’s reindex feature to fill in missing timestamps.
  4. Concatenate result to a temp file (CSV or parquet, also need to deal with same timestamp shown in the boundaries of chunks since first step can not split data to make sure each chunk has unique timestamps).
  5. Use that temp file to do further processing.

Do you have any suggestions on this procedure?

Thanks in advance~

Hi @fjetter, thank you for your response first, sorry for delay response.

I think your method is suitable for computing fixed window (every 2 us/ms) not moving window.

But still thank your reply!

Hi @vava24680,

Could you produce a minimum reproducible example of your algorithm? I guess that to do that, you’ll only need to have some code for generating fake data. Then you could just give us the code you are using with Pandas, and what you are trying to do with Dask. It would be a lot easier to help then.