I want to aggregate (add) a large collection of dataframes that all have the same shape and persist on disk as parquet files.
All the dataframes can’t fit in memory on a single machine.
So a simple (but very slow) way to do the aggregation would be to run:
df = pd.read_parquet(files[0])
for file in files[1:] :
df += pd.read_parquet(file)
To speed things up, I want to do this in a distributed way on a cluster. This seems to do the job:
import pandas as pd
import dask.bag as db
df = db.from_sequence(files).map(pd.read_parquet).sum().compute()
Do you think this is a good way to do it or do you recommend a better way?
Hi @eserie and welcome! Since you have a collection of DataFrames, you don’t need to use Dask bag at all. Instead you could do something like:
import dask.dataframe as dd
# lazily read in the parquet files, then sum
ddf = dd.read_parquet(list_of_filepaths).sum()
# compute result
df = ddf.compute()
Here are the dask.dataframe.read_parquet docs-- note that if your files are all in the same directory, you only need to pass in the directory path. Here are some relevant examples on reading/writing data.
I’ve tried what you suggest but unfortunately it does not do the same.
To illustrate things, I wrote this little minimal example:
from pathlib import Path
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.bag as db
# Let's create some random dataframes of shape (100, 10)
path = Path("./tmp")
path.mkdir(exist_ok=True)
for i in range(10):
df = pd.DataFrame(np.random.normal(size=(100, 10)))
df.columns = map(str, df.columns)
df.to_parquet(path / f"{i}.parquet")
files = [path / f"{i}.parquet" for i in range(10)]
# method 1 (wrong result)
df = dd.read_parquet(files).sum()
df.compute().shape == (10,) # <=== dim0 has been summed
# method 2 (result I want)
df = db.from_sequence(files).map(pd.read_parquet).sum()
df.compute().shape == (100, 10) # <=== dim0 is kept (it's what I want)
Furthermore the bag approach distribute the computations well.
This is what the computation graph looks like:
If we scale the above example to 20 files (to clarify) and run:
from pathlib import Path
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.bag as db
# Let's create some random dataframes of shape (100, 10)
path = Path("./tmp")
path.mkdir(exist_ok=True)
for i in range(20):
df = pd.DataFrame(np.random.normal(size=(100, 10)))
df.columns = map(str, df.columns)
df.to_parquet(path / f"{i}.parquet")
files = [path / f"{i}.parquet" for i in range(20)]
df = dd.read_parquet(files)
df3 = df.map_partitions(lambda df: df.sum(), meta=df.head().sum())
assert df3.compute().shape == (200,) # <=== dim0 has been summed
we see that the dim0 of the dataframes has been summed.
In some sense the data is 3-dimensional and we want to sum over the “files” dimension.
If we go to numpy, we can get the correct shape:
df = dd.read_parquet(files)
arr4 = df.values.compute_chunk_sizes().reshape(20, -1, 10).sum(0)
assert arr4.compute().shape == (100, 10) # <=== this is correct.
Hi again @eserie, sorry for misunderstanding your question initially! It sounds like you’re looking to add your dataframes together, and they way you’ve shared already using Dask bag is a perfectly good way to do this. If you also need to do some dataframe processing, then it could be simpler to use Dask dataframe, where something like this would work:
from pathlib import Path
import numpy as np
import pandas as pd
import dask.dataframe as dd
# create some random dataframes of shape (100, 10)
path = Path("./tmp")
path.mkdir(exist_ok=True)
for i in range(5):
df = pd.DataFrame(np.random.normal(size=(100, 10)))
df.columns = map(str, df.columns)
df.to_parquet(path / f"{i}.parquet")
files = [path / f"{i}.parquet" for i in range(5)]
ddf_list = []
for file in files:
ddf = dd.read_parquet(file) # will work better if partitioning is the same
ddf_list.append(ddf)
ddf_total = ddf_list[0]
for ddf in ddf_list[1:]:
ddf_total += ddf
ddf_total.compute().shape
Or for a tree reduction, like with Dask bag, you could do:
import toolz
# with the same list of files as above
ddf_list = []
for file in files:
ddf = dd.read_parquet(file) # will work better if partitioning is the same
ddf_list.append(ddf)
while len(ddf_list) > 1:
ddf_list = [df1 + df2 if df2 is not None else df1 for df1, df2 in list(toolz.partition(2, ddf_list, pad=None))]
ddf_list[0].compute.shape
Then the task graph from ddf_list[0].visualize() looks more like that of Dask bag sum (with split_every=2)
Thanks @scharlottej13 !
I like your solution with toolz.partition, it permits to have a good control on the tree reduction.
However, the tree reduction is done in a different way in dask.bag (the sum-part and sum-aggregate seem to sum more than 2 items and the number of items may vary depending on the level in the tree). Do you think that one solution is more appropriate than the other?
Hi @eserie! I think how appropriate one solution is over another depends more on what else you’re doing-- e.g. if you need to do additional preprocessing, it could be simpler to to use Dask dataframe to avoid converting back and forth between Dask bag and Dask dataframe.