How to check that a dataframe is properly built?

Hello,

I am writing a code that reads several databases from different files on disk and then aggregates them into one dask.dataframe.

My code executes and returns a dask dataframe that seems fine when looking at df.head() and df.dtypes, but that fails any time I trigger a compute.

For example:

  • df.compute() returns a StopIteration error from pandas’ format and printing library.
  • df.groupby('VAR1').VAR2.std().compute() returns NotImplementedError: Item assignment with <class 'pandas.core.indexes.base.Index'> not supported from dask_expr collection class
  • df.groupby('VAR1').agg({'VAR2': 'mean'}).compute() returns AttributeError: ‘list’ object has no attribute ‘groupby’` from dask’s groupby method

The nature of these errors makes me think that there is something structurally wrong with df, probably due to how I build it. Yet, isinstance(df, dask.dataframe.DataFrame) returns True and the little I can do before computing seems to return fine.

Does anyone have any hint on how to debug this?

For what is worth, the way I read the databases is with from_map(), and the function I pass is basically just dd.read_parquet() when I read only from one database (which also raises the issues above).

Hi @enrico,

Could you detail a bit the workflow that triggers an error?

So you get the error even from one Dataset? Does it happen when using df.tail()? Could you print also the stacktraces you are facing?

Hi @guillaumeeb ,

Thanks a lot for your reply. I kind of solved it (I think?) 5 minutes ago, but I still have a related question.

My process is (code at the end of the message):

  1. read a selection of variables from a parquet db
  2. add empty columns for variables not present in the db, but present in another one that I am also reading
  3. return the resulting dask.dataframe ddf
  4. from_map() maps a function that does 1-3 to a list of dbs, and merges all the returned ddfs into one.
  5. Operates on combined ddf.

My error was returning ddf instead of ddf[list(self.selected_variables)], that is, I was not ensuring that all ddfs had the same column ordering.

from_map() fails on computes (it does not fail on ddf.tail(), ddf.head(), ddf.describe()). The workaround that I have found is to read in each db in a list of ddfs with ddfs = [read_db_dask(db_name) for db_name in db_names]m and then perform ddf = dd.concat(ddfs).
from_map() works on computes if I implement steps 1-3 with pyarrow, i.e. I read in a pyarrow table, add empty columns to the pyarrow table and at the end convert it to a pandas dataframe.

The issue that I have now, is that dask seems to trigger some compute when adding empty columns (step 2 of the process), and if I run the code on the whole database I run out of memory. While the users will always access a subset of the data and might not have this issue, I would like them to face this waiting time when they perform some computation (mean, a plot, anything) instead of when they retrieve the dataframe – I’d like the dataframe to stay lazy as long as possible. (I would also like to unit-test the function without subsetting the db if possible.)

Why is this happening? I expected that adding a column was a lazy instruction.

Here you have the main parts of the code:

def read_db_dask(self,db_name,target_schema=None):
        """Read parquet database applying all filters

        Argument:
        db_name  --  database name
        target_schema  --  shared schema with other dbs

        Returns:
        ddf  --  dask dataframe containing the filtered data

        """

        # read schema to get the list of columns
        db_schema = pq.read_schema(self.db_paths[db_name]+"/_common_metadata")

        # read in only the selected variables that are present in the database
        cols_to_read = [name for name in self.selected_variables if name in db_schema.names]

        # read db
        ddf = dd.read_parquet(
            self.db_paths[db_name],
            engine="pyarrow",
            columns = cols_to_read,
            filters = self.filters,
            index = False,
            dtype_backend = "pyarrow"
        )

        # add empty columns for variables that are present in self.selected_variables but not in current db
        cols_to_add = [name for name in self.selected_variables if name not in db_schema.names]
        for col in cols_to_add:
            print(f"Adding empty columns {col} to db {db_name}")
            if target_schema is not None:
                field_idx = target_schema.get_field_index(col)
                pa_dtype = target_schema.types[field_idx]
                pd_dtype = self.dtype_mapping[pa_dtype]
                ddf[col] = pd.Series(
                    [pd.NA]*len(ddf),
                    dtype=pd_dtype
                )
            else:
                ddf[col] = pd.Series(
                    [pd.NA]*len(ddf)
                )

        return ddf[list(self.selected_variables)]

ddfs = [read_db_dask(db_name,target_schema=global_schema) for db_name in db_names]
ddf = dd.concat(ddfs)

FWIW, I also wrote an equivalent function that reads a pyarrow table and add empty columns to it, and then converts it to a dask dataframe only on returning, and the compute is also triggered.

len(ddf) will trigger the loading of all the database (even if it should be by chunk, there is a computation under the hood). Moreover, I’m not sure if assigning a Pandas series is a good practice and even surprised it works.

Maybe you should try assigning using a lambda expression on a Dask Series, or using map_partitions with correct meta information.