Dask saving dataframe partitions as files

Hi, I wanted some help with a usecase.

Lets say I have 8 nodes and on each node I setup 32 dask workers (so total 256 dask workers). Now I load a dataset as a dask dataframe on these 256 workers (with 256 partitions), do some row wise pre processing (using map partiitons). Then I want to save the dask dataframe’s partitions as 8 files on those 8 nodes.

So, for this, I can do a repartition operation to repartition the dataframe to have just 8 partitions. But now how do I save these partitions as separate files on those 8 nodes (each node should have just 1 partition saved).

What I was thinking was on these lines:

data = {'col_1': [3, 2, 1, 0,4,5,6,7], 'col_2': ['a', 'b', 'c', 'd','e','f', 'g', 'h']}
df = pd.DataFrame.from_dict(data)
ddf = dd.from_pandas(df, npartitions=4) # smaller example with 4 partitions on 4 workers (2 nodes only)

cl1 = Client(n_workers=4)
cl2 = Client(n_workers=2)

df2 = cl1.persist(ddf)
new_df = df2.repartition(npartitions=2) # repartition to 2 partitions to save on 2 nodes
cl2.compute(new_df) # ValueError: Inputs contain futures that were created by another client

Getting the error: Inputs contain futures that were created by another client

I needed help with 2 things:

  1. how to ensure that the saving is done on 8 nodes with each node having 1 partition
  2. after getting those partitions, how can I do the saving?

For saving was thinking on these lines: (let me know if there is a better method)

def save_part(partition, partition_info=None):
    partition.to_pickle(str(partition_info['number']) + '.csv')
    return None

temp = ddf.map_partitions(save_part, meta=('temp', object))
temp.compute()

@vigneshn1997 Thanks for your question.

Getting the error: Inputs contain futures that were created by another client

This is expected because you’re using a different client (cl2) to access new_df, when cl1 is the client that has a reference to new_df.

As for saving the file on each worker manually, this is generally not recommended, could you please share why you want to do this? I think you can consider publishing the dataset or saving it to cloud storage instead?

(Also, a side note, since you’re persisting before re-partitioning, the in-memory futures won’t be affected when you do re-partition.)

Here’s an example:

import pandas as pd
import dask.dataframe as dd

from distributed import Client, LocalCluster


cluster = LocalCluster()

# Both clients are connected to the same scheduler
client1 = Client(cluster)
client2 = Client(cluster)

data = {'col_1': [3, 2, 1, 0, 4, 5, 6, 7], 'col_2': ['a', 'b', 'c', 'd','e','f', 'g', 'h']}

df = pd.DataFrame.from_dict(data)

ddf = dd.from_pandas(df, npartitions=4) 

new_ddf = ddf.repartition(npartitions=2)

persisted_ddf = client1.persist(new_ddf)

client1.publish_dataset(published_ddf=persisted_ddf)

received_ddf = client2.get_dataset("published_ddf")

client2.compute(received_ddf) # Works