Help to check my delayed methods with Dask dataframe

Hi I tried to use delayed with dataframe, and i want to keep all data in workers rather than pulling into my local machine, because data is huge(more than 30GB, and are read from parquet by dd.read_parquet). I wrote the code, and generally i want to keep data in workers and i need to operate on Pandas dataframe with some methods, e.g. stack, groupby, etc… which probably doesn’t support in Dask yet.

I don’t know if this is right way to do?

delays = df_1.to_delayed()
dfs = [delayed(stack)(d) for d in delays] 
df = dd.from_delayed(dfs)

df = df.merge(coordinates, how="left", left_on=["dataId"], right_on=["id"], right_index=True)

dfs = df.to_delayed()
dfs = [delayed(unstack)(d) for d in dfs]
df = dd.from_delayed(dfs)

df_1 = df_1.merge(df, how="inner", left_on=["id"], right_on=["id"], suffixes=(False, False))
progress(df_1)

def stack(df: pd.DataFrame):
    ... do something to create new_df
    return new_df

def unstack(df: pd.DataFrame):
    ... do something to create new_df
    return new_df

I suppose can use map_partitions :slight_smile:

map_partitions is certainly simpler if it fits your need! But the first provided code should work too if you ever need it.

1 Like