Best way to add observations to data set by unit

I am using a very large data set that contains a unit identifier and time identifier (year). Some of the units have gap between the years which I would like to add observations to the data set for and fill in (using ffill for example).

The two ways that I can think to do this is (1) using a groupby and for each group filling in the missing years and the values using an apply and (2) creating a dataset with all years for all ids, dropping unneeded years, and then doing a groupby to fill na data. I would think that the former approach is more efficient, since either way I have to do a groupby, but all the computations are within the group and not before; however, I wasn’t sure if this was efficient or how to do it in dask due to issues with duplicate indexing that don’t arise when only using pandas.

Another alternative I considered but seems too inefficient was to give each id it’s own partition and then I could use map partitions.

It is worth noting that my data is by default sorted by year (not by id).

I was hoping that someone could recommend the optimal way to do this in Dask. I understand that this is a computationally intensive task, but want to execute it in the most efficient way.

I have provided example data and a couple of my attempts below. I thought this would be the closest thing to an MWE. Please let me know if I can improve my post or provide additional helpful information.

Thank you!

I have a data set like

id  year  value
3  1998      1
3  2000      2
4  2003      7
4  2006      8

And want to fill in the missing years such that it looks like

year   id  value
1998  3    1
1999  3    1
2000  3    2
2003  4    7
2004  4    7
2005  4    7
2006  4    8

Pandas code for option 1 which creates new observation and fills at the same time

import pandas as pd

# Sample data
data = {
    'id': [3, 3, 4, 4],
    'year': [1998, 2000, 2003, 2006],
    'value': [1, 2, 7, 8]
}

df = pd.DataFrame(data)

# Function to reindex and forward fill
def reindex_and_fill(group):
    years = range(group['year'].min(), group['year'].max() + 1)
    return group.set_index('year').reindex(years).ffill().reset_index()

# Apply the function to each group
result = df.groupby('id',as_index=False,group_keys=False).apply(reindex_and_fill)

print(result)

Pandas code for option 2 which first fills in missing data, then does fill of missing data

import pandas as pd
import numpy as np

# Sample data
data = {
    'id': [3, 3, 4, 4],
    'year': [1998, 2000, 2003, 2006],
    'value': [1, 2, 7, 8]
}

df = pd.DataFrame(data)

# Function to create a DataFrame with all years for each id
def create_year_range(group):
    years = np.arange(group['year'].min(), group['year'].max() + 1)
    return pd.DataFrame({'id': group['id'].iloc[0], 'year': years})

# Group by 'id' and apply the function
year_ranges = df.groupby('id').apply(create_year_range).reset_index(drop=True)

# Merge the original dataframe with the full range dataframe
merged_df = pd.merge(year_ranges, df, on=['id', 'year'], how='left')

# Sort dataframe by id and year
merged_df.sort_values(by=['id', 'year'], inplace=True)

# Forward fill missing values within each id group
filled_df = merged_df.groupby('id',group_keys=False).apply(pd.DataFrame.ffill)

print(filled_df)

Here is a dask compatible version of the option 2

import pandas as pd
import numpy as np
import dask.dataframe as dd

# Sample data
data = {
    'id': [3, 3, 4, 4],
    'year': [1998, 2000, 2003, 2006],
    'value': [1, 2, 7, 8]
}

df = dd.from_pandas(pd.DataFrame(data), npartitions=2)

# Function to create a DataFrame with all years for each id
def create_year_range(group):
    years = np.arange(group['year'].min(), group['year'].max() + 1)
    return pd.DataFrame({'id': group['id'].iloc[0], 'year': years})

# Group by 'id' and apply the function
year_ranges = df.groupby('id').apply(create_year_range, meta={'id': 'int', 'year': 'int'}).reset_index(drop=True)

# Merge the original dataframe with the full range dataframe
merged_df = df.merge(year_ranges, on=['id', 'year'], how='outer')

# define function to sort and fill
def sort_fill(group):
    group = group.sort_values(by='year')
    group = group.ffill()
    return group

# Forward fill missing values within each id group
filled_df = merged_df.groupby('id',group_keys=False).apply(sort_fill)

print(filled_df.compute())

Hi @htj, welcome to Dask Discourse forum!

From what I understand, yes the first approach should be more effective. groupby/apply are expensive when going distributed, the less you use them, the best it is.

What issues are you getting? I was able to make it work with your simple example, but maybe you are having issue when trying on your real dataset?

import pandas as pd
import numpy as np
import dask.dataframe as dd

# Sample data
data = {
    'id': [3, 3, 4, 4],
    'year': [1998, 2000, 2003, 2006],
    'value': [1, 2, 7, 8]
}

df = dd.from_pandas(pd.DataFrame(data), npartitions=2)

# Function to reindex and forward fill
def reindex_and_fill(group):
    years = range(group['year'].min(), group['year'].max() + 1)
    return group.set_index('year').reindex(years).ffill().reset_index()

# Apply the function to each group
result = df.groupby('id', group_keys=False).apply(reindex_and_fill, meta={'year': 'int16', 'id': 'int16', 'value': 'int64'})

result.compute()

Thank you very much for your help @guillaumeeb !

I had been getting an issue related to non-unique indexing if I recall correctly, but I guess that was immaterial.

The more practical issue is still the compute time. I was hoping that there would be a less compute intensive way to do this operation, perhaps taking into account that the data is ordered by year. Even after a few hours, the progress is still at 0%.

In theory, if you keep track of the last year from an observation, you only need to read through the data once. For each observation, you check if it is the first time you observe that id, otherwise you can check if there is a gap in years from the last time you saw it and fill in the gap if needed.

Alternatively, I wondered if an approach where groupby was applied to get all the years for a unit, which I believe could be done as an aggregation (which should be more efficient than an apply). Subsequently, I could find the years needed to fill. Then fill them, perhaps by using a merge instead of a groupby?

Essentially, I don’t need to gather all of the data points together for a traditional groupby-apply operation, so I was hoping that there might be a more computationally efficient method. Though it is not clear to me what would be best with my level of understanding of Dask’s architecture. Any suggestions would be very helpful and welcome.

Thanks for your help!

How big is you dataset? Did you try to use a LocalCluster and monitor execution using a dashboard?

But implementing that in a distributed way would be quite hard! However, if your dataset is not this big, then it might be faster to implement this and process it serially!

Merge or groupby would be equally difficult if on the same non index column. I’m not sure if it is your case or not.