How can I solve "Metadata mismatch found in `from_delayed`" when using to_parquet?

Hello All,

I have a Dask dataframe, which has 33 partitions.
I am trying to drop all the columns, except some from a list and save the result to a Parquet file.

Here is the code I am trying:

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

TARGETED_COLS = ['col1', 'col2', 'col3']

cluster = LocalCluster(
    n_workers=4, 
    threads_per_worker=1, 
    timeout="60s"
)
client = Client(cluster)

file_path = "data/my_large_json_lines_file"

ddf = dd.read_json(file_path, lines=True, blocksize="128 MiB")

def process(df):
    return df[TARGETED_COLS] 

ddf2 = ddf.map_partitions(process).to_parquet("test.parquet")

and I get the following error:

ValueError: Metadata mismatch found in `from_delayed`.

Partition type: `pandas.core.frame.DataFrame`
+--------------------------+---------+----------+
| Column                   | Found   | Expected |
+--------------------------+---------+----------+
| 'demo_col1'              | float64 | -        |
| 'demo_col2'              | object  | -        |
| 'demo_col3'              | object  | -        |

Can someone please advice what can I do to fix this?

Note: I do not know the initial columns in advance, only the targeted columns are known.

Thank you!

1 Like

Hi @Sorin,

I’ve got a first question, why are you using map_partitions to keep the column you want? Can’t you use the same syntax as Pandas with Dask to keep the columns?

What do you get when you just output ddf or ddf2, which metadatas?

Would this be possible to have the complete stacktrace of the error you get?

Best,

Thanks for the input, @guillaumeeb

I almost forgot about posting this, as is been several days ago.
I ended up using this:

import os
import dask.bag as db
import json
from pandas.core.dtypes.common import pandas_dtype as dtype
from  dask.distributed import Client

Client(n_workers=os.cpu_count())

TARGET_COLS = ['col1', 'col2', 'col3', 'col4']

bag = db.read_text('filepath', blocksize="128MiB").map(json.loads)

bag = bag.map(lambda x: {key: value for key, value in x.items() if key in TARGET_COLS})


DTYPES = {
     'col1': dtype('O'),
     'col2': dtype('float64'),
     'col3': dtype('float64'),
     'col4': dtype('float64')
}

df = bag.to_dataframe(meta=DTYPES)
df.to_parquet('parquet_filepath', engine="pyarrow")

I will mark this as resolved.

1 Like