I am using a very large data set that contains a unit identifier and time identifier (year). Some of the units have gap between the years which I would like to add observations to the data set for and fill in (using ffill for example).
The two ways that I can think to do this is (1) using a groupby and for each group filling in the missing years and the values using an apply and (2) creating a dataset with all years for all ids, dropping unneeded years, and then doing a groupby to fill na data. I would think that the former approach is more efficient, since either way I have to do a groupby, but all the computations are within the group and not before; however, I wasn’t sure if this was efficient or how to do it in dask due to issues with duplicate indexing that don’t arise when only using pandas.
Another alternative I considered but seems too inefficient was to give each id it’s own partition and then I could use map partitions.
It is worth noting that my data is by default sorted by year (not by id).
I was hoping that someone could recommend the optimal way to do this in Dask. I understand that this is a computationally intensive task, but want to execute it in the most efficient way.
I have provided example data and a couple of my attempts below. I thought this would be the closest thing to an MWE. Please let me know if I can improve my post or provide additional helpful information.
Thank you!
I have a data set like
id year value
3 1998 1
3 2000 2
4 2003 7
4 2006 8
And want to fill in the missing years such that it looks like
year id value
1998 3 1
1999 3 1
2000 3 2
2003 4 7
2004 4 7
2005 4 7
2006 4 8
Pandas code for option 1 which creates new observation and fills at the same time
import pandas as pd
# Sample data
data = {
'id': [3, 3, 4, 4],
'year': [1998, 2000, 2003, 2006],
'value': [1, 2, 7, 8]
}
df = pd.DataFrame(data)
# Function to reindex and forward fill
def reindex_and_fill(group):
years = range(group['year'].min(), group['year'].max() + 1)
return group.set_index('year').reindex(years).ffill().reset_index()
# Apply the function to each group
result = df.groupby('id',as_index=False,group_keys=False).apply(reindex_and_fill)
print(result)
Pandas code for option 2 which first fills in missing data, then does fill of missing data
import pandas as pd
import numpy as np
# Sample data
data = {
'id': [3, 3, 4, 4],
'year': [1998, 2000, 2003, 2006],
'value': [1, 2, 7, 8]
}
df = pd.DataFrame(data)
# Function to create a DataFrame with all years for each id
def create_year_range(group):
years = np.arange(group['year'].min(), group['year'].max() + 1)
return pd.DataFrame({'id': group['id'].iloc[0], 'year': years})
# Group by 'id' and apply the function
year_ranges = df.groupby('id').apply(create_year_range).reset_index(drop=True)
# Merge the original dataframe with the full range dataframe
merged_df = pd.merge(year_ranges, df, on=['id', 'year'], how='left')
# Sort dataframe by id and year
merged_df.sort_values(by=['id', 'year'], inplace=True)
# Forward fill missing values within each id group
filled_df = merged_df.groupby('id',group_keys=False).apply(pd.DataFrame.ffill)
print(filled_df)
Here is a dask compatible version of the option 2
import pandas as pd
import numpy as np
import dask.dataframe as dd
# Sample data
data = {
'id': [3, 3, 4, 4],
'year': [1998, 2000, 2003, 2006],
'value': [1, 2, 7, 8]
}
df = dd.from_pandas(pd.DataFrame(data), npartitions=2)
# Function to create a DataFrame with all years for each id
def create_year_range(group):
years = np.arange(group['year'].min(), group['year'].max() + 1)
return pd.DataFrame({'id': group['id'].iloc[0], 'year': years})
# Group by 'id' and apply the function
year_ranges = df.groupby('id').apply(create_year_range, meta={'id': 'int', 'year': 'int'}).reset_index(drop=True)
# Merge the original dataframe with the full range dataframe
merged_df = df.merge(year_ranges, on=['id', 'year'], how='outer')
# define function to sort and fill
def sort_fill(group):
group = group.sort_values(by='year')
group = group.ffill()
return group
# Forward fill missing values within each id group
filled_df = merged_df.groupby('id',group_keys=False).apply(sort_fill)
print(filled_df.compute())