Df[cols].drop_duplicates().compute() causes ValueError: The columns in the computed data do not match the columns in the provided metadata

Goal: To transform 400x 1GB .json files into a few thousand .parquet files, grouped on two keys. For privacy reasons, the data below is a representative example.

Most have the keys 'firstname', 'surname', 'age', 'city', 'job', 'seniority', 'university'

However, some only have 'firstname', 'surname', 'age', 'city', 'university'

That is, in the same order, but with missing keys ('job', 'seniority' in this example).

Noteably, the first row in the first .json has this second style (i.e. missing keys).

I want a .parquet for each person (i.e. grouped by firstname and surname) and only include the city and university (note this data is available in all rows)

My code so far:

input_path = "/files/*.json"
df = dd.read_json(input_path, lines=True, blocksize="512MB")
unique_combinations = df[['firstname', 'surname']].drop_duplicates().compute()

Then I’d iterrows the unique_combinations to make the .parquets. However, the compute() above throws an error:

ValueError: The columns in the computed data do not match the columns in the provided metadata.
Order of columns does not match.
Actual:   ['firstname', 'surname', 'age', 'city', 'job', 'seniority', 'university']
Expected: ['firstname', 'surname', 'age', 'city', 'university', 'job', 'seniority']

Note the expected aligns with the first row of the first .json, but with the missing columns appended to the end.

I’ve tried removing unnecessary columns:

columns = ['firstname','surname','city','university']
df = df[columns]

I also tried setting meta for the df, both with all the columns or only the ones I need (i.e. those above):

meta = pd.DataFrame({
    'firstname': pd.Series(dtype='object'),
    'surname': pd.Series(dtype='object'),
    'age': pd.Series(dtype='object'),
    'city': pd.Series(dtype='object'),
    'job': pd.Series(dtype='object'),
    'seniority': pd.Series(dtype='object'),
    'university': pd.Series(dtype='object')
})
df = dd.read_json(input_path, lines=True, blocksize="512MB"meta=meta)

But nothing seems to work.

Interestingly, the input_path only has a single .json, even with both types of record, it works. It only fails with I try multiple .jsons.

Aside from preprocessing all the .json to contain only the keys I want, how can I do this with Dask?

Thanks

Hi @jamesdeluk, welcome to Dask community!

I think this works with one file because Pandas manages to read and infer columns correctly. But using the Dask version, you’ll end up with partitions which don’t have exactly the same columns, so it fails.

In this case, I think you should try to use from_map:
https://docs.dask.org/en/stable/generated/dask.dataframe.from_map.html

You should be able to define a function that uses Pandas.read_json, and then do some filter to only keep the columns you need, in the same order no matter how the original file is formatted.