Using pandas json_normalize

Hello guys!
I am struggling how to handle a Dataframe that contains json into one column.
Currently Dask does not contain as equivalent to pd.json_normalize, so, I am using dask.delayed as an alternative.

Creating dummy dataframe:

import dask.dataframe as dd
import dask 
dask.config.set({"dataframe.convert-string": False})
import pandas as pd
import numpy as np
import pyarrow as pa

df = pd.DataFrame({
    'status' :  ['pending', 'pending','pending', 'canceled','canceled','canceled', 'confirmed', 'confirmed','confirmed'],
    'clientId' : ['A', 'B', 'C', 'A', 'D', 'C', 'A', 'B','C'],
    'partner' :  ['A', np.nan,'C', 'A',np.nan,'C', 'A', np.nan,'C'],
    'product' : ['afiliates', 'pre-paid', 'giftcard','afiliates', 'pre-paid', 'giftcard','afiliates', 'pre-paid', 'giftcard'],
    'brand' : ['brand_1', 'brand_2', 'brand_3','brand_1', 'brand_2', 'brand_3','brand_1', 'brand_3', 'brand_3'],
    'sku' : [['rice', 'beans', 'phone', 'tv', 'alexa', 'radio'],['rice', 'beans', 'phone', 'tv', 'alexa', 'radio'],['rice', 'beans', 'phone', 'tv', 'alexa', 'radio'],['rice', 'beans', 'phone', 'tv', 'alexa', 'radio'],['rice', 'beans', 'phone', 'tv', 'alexa', 'radio'],['rice', 'beans', 'phone', 'tv', 'alexa', 'radio'],['rice', 'beans', 'phone', 'tv', 'alexa', 'radio'],['rice', 'beans', 'phone', 'tv', 'alexa', 'radio'],['rice', 'beans', 'phone', 'tv', 'alexa', 'radio']],
    'detail': [{'nickname':'todo','bestfriend':'none','eat':'meat', 'details': {'warranty':'12M', 'vaccines':'None'}},{'nickname':'todo','bestfriend':'none','eat':'meat', 'top features': {'unlock':'yes'}},{'nickname':'todo','bestfriend':'none','eat':'meat'},{'nickname':'todo','extra':'take','bestfriend':'none','eat':'meat'},{'nickname':'todo','bestfriend':'none','eat':'meat'},{'nickname':'todo','bestfriend':'none','eat':'meat'},{'nickname':'todo','bestfriend':'none','eat':'meat'},{'nickname':'todo','bestfriend':'none','eat':'meat'},{'nickname':'todo','bestfriend':'none','eat':'meat'}],
    'gmv' : [100,100,100,100,100,100,100,100,100]
})
ddf = dd.from_pandas(df)
ddf = ddf.repartition(npartitions=3)

I am going to save the Dataframe as parquet, I need to keep it “out of memory” (sadly, I have only 4gb of ram available)

schema = pa.schema([ pa.field('status', pa.string()), pa.field('clientId', pa.string()), pa.field('partner', pa.string()), pa.field('product', pa.string()), pa.field('brand', pa.string()), pa.field('sku', pa.list_(pa.string())), pa.field('detail', pa.struct([ pa.field('bestfriend', pa.string()), pa.field('details', pa.struct([ pa.field('vaccines', pa.string()), pa.field('warranty', pa.string()) ])), pa.field('eat', pa.string()), pa.field('extra', pa.string()), pa.field('nickname', pa.string()), pa.field('top features', pa.struct([ pa.field('unlock', pa.string()) ])) ])), pa.field('gmv', pa.int64())])

ddf.to_parquet('Temp', schema = schema)

On second step, I get my Dataframe to starting the normalization. Observ that the index.Max() of my ddf is 3 (due to index of each partition)

ddf = dd.read_parquet('Temp/*')
print(ddf.index.max().compute())
ddf.head()

Now, I get my normalized results :

normalized = dask.delayed(pd.json_normalize)(ddf['detail'])
normalized = normalized.to_parquet('Temp_norm.parquet').compute()

json_normalize output dataframe does not contain any other data from the original Dataframe, so I cant “merge” it. I am trying to concatenate it (expecting that the original row order are kept, I hope!)

ddf = dd.read_parquet('Temp/*')
normalized = dd.read_parquet('Temp_norm.parquet')
ddf = dd.concat([ddf,normalized], axis = 1)
print(ddf.index.max().compute())
ddf.compute()

but it does not work! How could I solve it?
I´ve tried to create a “unique” column on the original Dataframe and merge by its index, but it crashing due to memory limitations.

--------------------------------------------------------------------------- ValueError Traceback (most recent call last) Cell In[147], [line 1](vscode-notebook-cell:?execution_count=147&line=1) ----> [1](vscode-notebook-cell:?execution_count=147&line=1) print(ddf.index.max().compute()) [2](vscode-notebook-cell:?execution_count=147&line=2) ddf.compute() File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_collection.py:480, in FrameBase.compute(self, fuse, concatenate, **kwargs) [478](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:478) if not isinstance(out, Scalar) and concatenate: [479](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:479) out = out.repartition(npartitions=1) --> [480](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:480) out = out.optimize(fuse=fuse) [481](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:481) return DaskMethodsMixin.compute(out, **kwargs) File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_collection.py:595, in FrameBase.optimize(self, fuse) [577](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:577) def optimize(self, fuse: bool = True): [578](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:578) """Optimizes the DataFrame. [579](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:579) [580](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:580) Runs the optimizer with all steps over the DataFrame and wraps the result in a (...) [593](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:593) The optimized Dask Dataframe [594](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:594) """ --> [595](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_collection.py:595) return new_collection(self.expr.optimize(fuse=fuse)) File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_expr.py:93, in Expr.optimize(self, **kwargs) [92](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:92) def optimize(self, **kwargs): ---> [93](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:93) return optimize(self, **kwargs) File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_expr.py:3100, in optimize(expr, fuse) [3079](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3079) """High level query optimization [3080](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3080) [3081](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3081) This leverages three optimization passes: (...) [3096](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3096) optimize_blockwise_fusion [3097](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3097) """ [3098](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3098) stage: core.OptimizerStage = "fused" if fuse else "simplified-physical" -> [3100](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3100) return optimize_until(expr, stage) File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_expr.py:3061, in optimize_until(expr, stage) [3058](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3058) return expr [3060](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3060) # Lower -> [3061](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3061) expr = expr.lower_completely() [3062](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3062) if stage == "physical": [3063](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_expr.py:3063) return expr File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_core.py:457, in Expr.lower_completely(self) [455](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:455) lowered = {} [456](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:456) while True: --> [457](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:457) new = expr.lower_once(lowered) [458](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:458) if new._name == expr._name: [459](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:459) break File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_core.py:423, in Expr.lower_once(self, lowered) [421](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:421) for operand in out.operands: [422](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:422) if isinstance(operand, Expr): --> [423](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:423) new = operand.lower_once(lowered) [424](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:424) if new._name != operand._name: [425](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:425) changed = True File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_core.py:423, in Expr.lower_once(self, lowered) [421](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:421) for operand in out.operands: [422](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:422) if isinstance(operand, Expr): --> [423](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:423) new = operand.lower_once(lowered) [424](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:424) if new._name != operand._name: [425](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:425) changed = True File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_core.py:423, in Expr.lower_once(self, lowered) [421](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:421) for operand in out.operands: [422](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:422) if isinstance(operand, Expr): --> [423](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:423) new = operand.lower_once(lowered) [424](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:424) if new._name != operand._name: [425](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:425) changed = True File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_core.py:412, in Expr.lower_once(self, lowered) [409](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:409) expr = self [411](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:411) # Lower this node --> [412](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:412) out = expr._lower() [413](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:413) if out is None: [414](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_core.py:414) out = expr File c:\Users\F3164582\Banco do Brasil S.A\DINED Biz Dev - General\Python\LojaBB\.venv\Lib\site-packages\dask_expr\_concat.py:173, in Concat._lower(self) [164](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:164) return StackPartitionInterleaved( [165](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:165) self.join, [166](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:166) self.ignore_order, (...) [169](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:169) *cast_dfs, [170](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:170) ) [172](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:172) else: --> [173](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:173) raise ValueError( [174](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:174) "Unable to concatenate DataFrame with unknown " [175](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:175) "division specifying axis=1" [176](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:176) ) [178](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:178) cast_dfs = [] [179](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:179) for df in dfs: [180](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:180) # dtypes of all dfs need to be coherent [181](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:181) # refer to https://github.com/dask/dask/issues/4685 [182](file:///C:/Users/F3164582/Banco%20do%20Brasil%20S.A/DINED%20Biz%20Dev%20-%20General/Python/LojaBB/.venv/Lib/site-packages/dask_expr/_concat.py:182) # and https://github.com/dask/dask/issues/5968. ValueError: Unable to concatenate DataFrame with unknown division specifying axis=1

Hi @frbelotto,

Not sure I really understand what you are trying to achieve. Are you looking to add a new column to your original DataFrame with the output of pd.json_normalize?

If so, I’ve you taken a look at map_partitions()?

I have a dataframe which contains a column with a json.
I just want to normalize this column and aggregate it to the original dataframe.

When trying to just concat it (like I would do with pandas) , I cannot

I could merge or join it but pd.json_normalize but default pd json_normalize output do not contain any column than the output normalized json.

I tried to play a bit with your example, and I also saw that you commented on a github issue.

I have no better solution than @Patrick here.

Got it. Thanks anyway