Hi, I wanted some help with a usecase.
Lets say I have 8 nodes and on each node I setup 32 dask workers (so total 256 dask workers). Now I load a dataset as a dask dataframe on these 256 workers (with 256 partitions), do some row wise pre processing (using map partiitons). Then I want to save the dask dataframe’s partitions as 8 files on those 8 nodes.
So, for this, I can do a repartition operation to repartition the dataframe to have just 8 partitions. But now how do I save these partitions as separate files on those 8 nodes (each node should have just 1 partition saved).
What I was thinking was on these lines:
data = {'col_1': [3, 2, 1, 0,4,5,6,7], 'col_2': ['a', 'b', 'c', 'd','e','f', 'g', 'h']}
df = pd.DataFrame.from_dict(data)
ddf = dd.from_pandas(df, npartitions=4) # smaller example with 4 partitions on 4 workers (2 nodes only)
cl1 = Client(n_workers=4)
cl2 = Client(n_workers=2)
df2 = cl1.persist(ddf)
new_df = df2.repartition(npartitions=2) # repartition to 2 partitions to save on 2 nodes
cl2.compute(new_df) # ValueError: Inputs contain futures that were created by another client
Getting the error: Inputs contain futures that were created by another client
I needed help with 2 things:
- how to ensure that the saving is done on 8 nodes with each node having 1 partition
- after getting those partitions, how can I do the saving?
For saving was thinking on these lines: (let me know if there is a better method)
def save_part(partition, partition_info=None):
partition.to_pickle(str(partition_info['number']) + '.csv')
return None
temp = ddf.map_partitions(save_part, meta=('temp', object))
temp.compute()