Lets say I have 4 dask workers. I want to load a data as dask dataframe and then shuffle and shard the data into 4 partitions with each dask worker storing almost equal data. (after shuffling and sharding there should be 1 dask dataframe with 4 partitions). How can I do this efficiently?
I was currently using
partitioned_dfs = df.random_split([0.25, 0.25, 0.25, 0.25], shuffle=True)
new_df = dd.concat(partitioned_dfs)
But in this case, the 4 partitions are not created of equal size, and also shuffling doesn’t happen across the whole dataset (the documentation only mentions shuffling across a partition). Also, I am having to do the additional operation of concatenating the partitioned dataframes to get a single dataframe with 4 partitions. Can I do the shuffling and sharding together to achieve the above-mentioned outcome?
@vigneshn1997 Thanks for your question! I’d be curious to learn more about your use case here because shuffling is an expensive operation in a parallel+distributed setting. It’s usually recommended to minimize/avoid shuffling your entire DataFrame whenever possible.
That said, you can use DataFrame.repartition()
to update the partitions. Note that the npartitions
parameter won’t create equal-sized partition, Dask will combine nearby partitions in the most efficient (minimal shuffle needed) manner.
And, for the shuffle operation, perhaps DataFrame.sample()
can help?
If you have a specific example where you really need to do this, we can discuss a work-around in Dask.
My usecase is that I have a large dataset stored in some form of storage(the data can be multi modal) to be used for deep learning model training. I have to load the dataset,shuffle and shard the dataset (shuffling of the complete data is necessary since the DL algorithm will benefit from a random order of data points). Then, I need to pre-process the data column-wise (using map partitions function): so lets say a column has images paths, I will pre process the image path to some tensor encoding for the image (this has to be done for all values in the column). After that I have to save the data in partitions (so if 4 workers are there, 4 partitions will be present and 4 processed data files will be saved as TFRecords/PyTorch Tensors).
Then this processed data will be loaded by 4 workers (each worker having its own local dataframe) and Deep Learning model training will be done.
How can sample help with overall data shuffling?
Thanks for sharing your workflow!
How can sample help with overall data shuffling?
You can set frac=1
to avoid concatenating:
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({'x': [1,2] * 10,
'y': range(20)})
ddf = dd.from_pandas(df, npartitions=5)
ddf = ddf.sample(frac=1)
ddf.compute()
Thank you for your reply. I will try this out. For sharding, I want to keep track of which worker has what shard(so lets say I have 4 workers and 4 partitions, can it be ensured that each worker has one shard it works with?) and is there any way to get some pointer/future to each shard?
@vigneshn1997 This again isn’t a typical use case for Dask DataFrame, and I’d be curious to understand the reasoning behind it.
For sharding, I want to keep track of which worker has what shard(so lets say I have 4 workers and 4 partitions, can it be ensured that each worker has one shard it works with?)
I think this happens by default? I tried this on a LocalCluster
, and the scheduler distributes work across each worker equally. Still, for a workaround, you can limit each worker to have one thread, which will ensure each worker only works on one partition at a time:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
and is there any way to get some pointer/future to each shard?
I think you’re looking for ddf.partitions
here:
partitions = [p for p in ddf.partitions]
dask.compute(partitions)
As an aside, you may also consider looking into Dask Delayed and Futures, the low-level Dask APIs that can give you finer control over your computation.
1 Like
Right, so for my use case, after sharding and shuffling I wanted to do a data parallel operation (described in Dividing data among workers and downloading data local to a worker). i.e. I want to preprocess every row of the dataset.
I read more in the dask documentation, and was thinking I can use map_partitions function to let dask handle the rows instead of dividing the rows manually.
1 Like