Index name changed after groupby() and apply() and missing column

I’m using Dask to group a data set using a function and then saving the data to a parquet file. I’ve encountered 2 issues:

  1. After the apply operation, while the index still has the correct values, it no longer has my label of detail_id.
  2. When I save the dataframe to parquet, the last column is dropped unless I add an extra column that will become the column that is dropped.

Desired result

First, using Pandas, this is the salient code that I’m running:

panda_df = panda_df.sort_values(by="detail_id")
panda_df = panda_df.set_index("detail_id", drop=True)
group = panda_df.groupby(by="detail_id")
answer = group.apply(determine_top_label)

When I run this code this is what the dataframe holds:

DesiredAnswer

After I write it to a parquet file and read it back it, it is all good.

Dask

The equivalent code in Dask for the above is

dask_df = dask_df.sort_values(by="detail_id")
dask_df = dask_df.set_index("detail_id", drop=True, sorted=True)
dask_group = dask_df.groupby(by="detail_id")
answer_df = dask_group.apply(determine_top_label, meta=[("function", "string"),
                                                        ("category", "string")])

When I examine the dataframe before saving it looks good except that the name of the index is no longer detail_id. Therefore, when I subsequently go to save to file, I cannot specify the detail_id in the schema, when when I read the file back, the index values are gone (they now start at 0).

data_schema = [("function", "string"),
               ("category", "string")
              ]

# When I write out the data, even though I'm writing out the index, when I read it back in, the index values
# are not present.
to_parquet(answer_df,
           path="/app/Functions/missing_index_values/",
           write_index=True,
           write_metadata_file=True,
           schema=pa.schema(data_schema))

To correct this, after invoking groupby() and apply(), I have to reset the index, rename the column and then set the index again.

# workaround
answer_df = answer_df.reset_index()
answer_df = answer_df.rename(columns={'index':'detail_id'})
answer_df = answer_df.sort_values(by="detail_id")
answer_df = answer_df.set_index("detail_id", sorted=True, drop=True)
answer_df = answer_df.persist()

data_schema = [("detail_id", "int64"),
               ("function", "string"),
               ("category", "string"),
              ]

# Now this works
to_parquet(answer_df,
           path="/app/Functions/workaround/",
           write_index=True,
           write_metadata_file=True,
           schema=pa.schema(data_schema))

Now my index has the proper name with the correct values, but the category column is now missing. The way I’m getting around this is to add an additional column pad, and now that column is the one dropped and not category.

answer_df["pad"] = 0

data_schema = [("detail_id", "int64"),
               ("function", "string"),
               ("category", "string"),
               ("pad", "int64")
              ]

# Now this works
to_parquet(answer_df,
           path="/app/Functions/workaround_with_pad/",
           write_index=True,
           write_metadata_file=True,
           schema=pa.schema(data_schema))

Any suggestions as to what I’m doing incorrectly with the groupby() and apply() that messes up the index name, and with needing to add an extra column so that all of my data that I want persisted gets persisted?

Full notebook code is below. I’ve been able to reproduce this using a local Dask client. I’m using dask and dask-distributed 2022.9.0 and pandas 1.4.4. I’ve bumped up my versions to the latest libraries (2022.11.1 and 1.5.1) and the same occurs.

Thank you in advance,

Mark

Notebook

#!/usr/bin/env python
# coding: utf-8

# Bug: the index of a group by option is not correctly setting the name of the index.

# In[1]:


from datetime import date, datetime
from json import loads, dumps

import boto3
from dask import dataframe as ddf
from dask.dataframe import to_parquet, from_pandas
from dask.distributed import Client, wait
import pandas as pd
import sqlalchemy as sa
import pyarrow as pa
import numpy as np
from dask.dataframe.multi import concat


# In[2]:


data = [# detail_id, function, category, total
        [100, "completions", "stimulation", 100],
        [101, "completions", "stimulation", 50],
        [100, "corporate", "taxes", 5.24],
        [102, "general operations", "land", 75],
        [103, "corporate", "taxes", 5.24],       
        [104, "completions", "stimulation", 100],
        [104, "general operations", "land", 500],
        [102, "general operations", "land", 150],
        [105, "drilling", "proppant", 300],
        [104, "drilling", "labour", 50]]

## Expected results
# Detail   Function                Category  
# 100      Completions             Stimulation
# 101      Completions             Stimulation
# 102      General Operations      Land      
# 103      Corporate               Taxes   
# 104      General Operations      Land     
# 105.     Drilling                Proppant

panda_df = pd.DataFrame(data, columns=["detail_id", "function", "category", "total"])


# In[3]:


from collections import defaultdict

TAX_TUPLE = ('corporate', 'taxes')

def determine_top_label(row_df):
    func_cat_map = defaultdict(float)
    
    for _, row in row_df.iterrows():
        func_cat_map[(row["function"], row["category"])] += abs(row["total"])
        
    if len(func_cat_map) == 1:
        func_cat = func_cat_map.popitem()[0]
    else:
        func_cat, amount = None, 0.0
        for fc, a in func_cat_map.items():
            if fc != TAX_TUPLE and a > amount:
                func_cat, amount = fc, a
                
    return pd.Series(data=func_cat, index=["function", "category"])


# ## Pandas Group by and Apply

# In[4]:


panda_df = panda_df.sort_values(by="detail_id")
panda_df = panda_df.set_index("detail_id", drop=True)
group = panda_df.groupby(by="detail_id")
answer = group.apply(determine_top_label)


# In[5]:


answer


# In[6]:


answer.index  # Here, the index is still called "detail_id"


# In[7]:


answer.to_parquet(path="/app/Functions/answer_pandas.parquet", index=True)


# In[8]:


# After writing out the file to parquet and reading it back in, the index has the desired label of "detail_id"
# Dataframe is correct
verify_df = pd.read_parquet("/app/Functions/answer_pandas.parquet")
verify_df


# ## Dask Group By and Apply

# In[9]:


c = Client()


# In[10]:


panda_df = pd.DataFrame(data, columns=["detail_id", "function", "category", "total"])
dask_df = from_pandas(data=panda_df, npartitions=1)


# In[11]:


dask_df = dask_df.sort_values(by="detail_id")
dask_df = dask_df.set_index("detail_id", drop=True, sorted=True)
dask_group = dask_df.groupby(by="detail_id")
answer_df = dask_group.apply(determine_top_label, meta=[("function", "string"),
                                                        ("category", "string")])


# In[12]:


answer_df.compute()


# In[13]:


answer_df.index  # At this point the index has a different name.


# In[14]:


data_schema = [("function", "string"),
               ("category", "string")
              ]

# When I write out the data, even though I'm writing out the index, when I read it back in, the index values
# are not present.
to_parquet(answer_df,
           path="/app/Functions/missing_index_values/",
           write_index=True,
           write_metadata_file=True,
           schema=pa.schema(data_schema))


# In[15]:


verify_dask_df = ddf.read_parquet("/app/Functions/missing_index_values/")
verify_dask_df.compute()


# ## Workaround

# In[16]:


# To get the index to be called "detail_id", I have to reset the index, rename the column to what I want it to be
# then set the column back to be the index.

panda_df = pd.DataFrame(data, columns=["detail_id", "function", "category", "total"])
dask_df = from_pandas(data=panda_df, npartitions=1)
dask_df = dask_df.sort_values(by="detail_id")
dask_df = dask_df.set_index("detail_id", drop=True, sorted=True)
dask_group = dask_df.groupby(by="detail_id")
answer_df = dask_group.apply(determine_top_label, meta=[("function", "string"),
                                                        ("category", "string")])


# In[17]:


# workaround
answer_df = answer_df.reset_index()
answer_df = answer_df.rename(columns={'index':'detail_id'})
answer_df = answer_df.sort_values(by="detail_id")
answer_df = answer_df.set_index("detail_id", sorted=True, drop=True)
answer_df = answer_df.persist()


# In[18]:


answer_df.dtypes


# In[19]:


answer_df.compute()  # Everything looks good here


# In[20]:


answer_df.index


# In[21]:


data_schema = [("detail_id", "int64"),
               ("function", "string"),
               ("category", "string"),
              ]

# Now this works
to_parquet(answer_df,
           path="/app/Functions/workaround/",
           write_index=True,
           write_metadata_file=True,
           schema=pa.schema(data_schema))


# In[22]:
# At this point index is correct, but the category column is missing

verify_dask_df = ddf.read_parquet("/app/Functions/workaround/")
verify_dask_df.compute()  # Where did the category column go?


# In[23]:


answer_df["pad"] = 0

data_schema = [("detail_id", "int64"),
               ("function", "string"),
               ("category", "string"),
               ("pad", "int64")
              ]

# Now this works
to_parquet(answer_df,
           path="/app/Functions/workaround_with_pad/",
           write_index=True,
           write_metadata_file=True,
           schema=pa.schema(data_schema))


# In[24]:
 # All is again after resetting the index and padding the dataframe.
verify_dask_df = ddf.read_parquet("/app/Functions/workaround_with_pad/")
verify_dask_df.compute()