Read Parquet with Varying Schemas

I’m writing a service that needs to read in a large number (hundreds/thousands) of parquet files, many of which have varying schemas. What I want is the output to look like a .concat(), where all files are squished into a single dataframe, and rows missing values for a column padded with NaNs. Is there a good way to do this in parallel where I don’t have to actually use .concat() ?

Hi @aidanlewilewi, welcome to Dask community,

I’m not sure I understand you problem, concat should be able to do this in parallel, what do you want to do with the single dataframe at the end?

As read_parquet is lazy (it only reads the header of the file if no schema is specified), you should be able to create a Dask dataframe object from each of your Parquet file, and then use concat to squeeze them lazily in a single Dask Dataframe.

Consider the following example:

import dask.dataframe as dd
import pandas as pd

df1 = pd.DataFrame(dict(a=list('aabbcc'), b=list(range(6))))
df2 = pd.DataFrame(dict(a=list('ddeeff'), c=list(range(6, 12))))

df_c = dd.concat([df1, df2])

df_c in the end is a lazy Dask Dataframe structure, with partitions not necessarily loaded (if I didn’t use Pandas at first anyway).

Hey @guillaumeeb , thanks for the quick response. I guess my question pertains more to the read_parquet step over a large number of files. In your example, you create the two dataframes sequentially, which is fine for a small number of files, but will be slow when reading over a bunch of files.

My hope is to parallelize the actual reading process. Say we have N files and M workers - could I split the read_parquet step so each worker is responsible for N/M files, then use dd.concat() to merge those into a single lazy Dataframe?

My use case is this:

Read a bunch of files, add a new column based off two other columns, then write the big ol’ dataset to a new parquet file. Something like

all_datasets = []

for file in paths:
    all_datasets.append(dd.read_parquet(file))
combined_dataset = dd.concat(all_datasets)

combined_dataset.do_my_thing()

combined_dataset.to_parquet()

Does this all make sense? I’m hoping my ideas of what’s going on under the hood actually have some sense to them

Follow up. Maybe dask.delayed is the tool for this? changed my code to this:

class Dataset():

    def __init__(self, dataset: dd=None):
        self.dataset = dataset

    def load(self, paths: list):
        all_datasets = []

        for file in paths:
            all_datasets.append(self._read_single_file(file))
        self.dataset = dask.delayed(dd.concat)(all_datasets).compute()

    @dask.delayed
    def _read_single_file(self, file):
        return dd.read_parquet(file)

and seemingly improves performance so far

I imagine this can be a bit slow, did you experienced this? dd.read_parquet only reads metadata and not the actual dataset, so it should be kind of fast, but I reckon that over a thousand files, that can take too much time.

Yep, that is a correct approach, but it must be handled with care because you delayed calls for building lazy Dask collections. I think your code is correct and in the end it returns a lazy Dask Dataframe into self.dataset variable.

Another approach that should work, something like:

all_delayed_datasets = []

for file in paths:
    all_delayed_datasets.append(dask.delayed(dd.read_parquet)(file))
all_datasets = dask.compute(*all_delayed_datasets)
combined_dataset = dd.concat(all_datasets)
2 Likes