Aggregation of many dataframes stored in parquet files

Hello,

I want to aggregate (add) a large collection of dataframes that all have the same shape and persist on disk as parquet files.
All the dataframes can’t fit in memory on a single machine.
So a simple (but very slow) way to do the aggregation would be to run:

df = pd.read_parquet(files[0])
for file in files[1:] :
    df += pd.read_parquet(file)

To speed things up, I want to do this in a distributed way on a cluster. This seems to do the job:

import pandas as pd
import dask.bag as db
df = db.from_sequence(files).map(pd.read_parquet).sum().compute()

Do you think this is a good way to do it or do you recommend a better way?

Hi @eserie and welcome! Since you have a collection of DataFrames, you don’t need to use Dask bag at all. Instead you could do something like:

import dask.dataframe as dd

# lazily read in the parquet files, then sum
ddf = dd.read_parquet(list_of_filepaths).sum()
# compute result
df = ddf.compute()

Here are the dask.dataframe.read_parquet docs-- note that if your files are all in the same directory, you only need to pass in the directory path. Here are some relevant examples on reading/writing data.

Hi @scharlottej13, thank you for your reply!

I’ve tried what you suggest but unfortunately it does not do the same.

To illustrate things, I wrote this little minimal example:

from pathlib import Path
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.bag as db

# Let's create some random dataframes of shape (100, 10)
path = Path("./tmp")
path.mkdir(exist_ok=True)
for i in range(10):
    df = pd.DataFrame(np.random.normal(size=(100, 10)))
    df.columns = map(str, df.columns)
    df.to_parquet(path / f"{i}.parquet") 
files = [path / f"{i}.parquet" for i in range(10)]

# method 1 (wrong result)
df = dd.read_parquet(files).sum()
df.compute().shape  == (10,)  # <=== dim0 has been summed

# method 2 (result I want)
df = db.from_sequence(files).map(pd.read_parquet).sum()
df.compute().shape == (100, 10)  # <=== dim0 is kept (it's what I want)

Furthermore the bag approach distribute the computations well.
This is what the computation graph looks like:

df.visualize()

f you would like to do a set per partition, which is what I think you are after, then you probably want map_partitions(sum):

df.map_partitions(
    lambda df: df.sum(), 
    meta=df.head().sum()
).compute()

It’s still summing along the wrong dimension.

If we scale the above example to 20 files (to clarify) and run:

from pathlib import Path
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.bag as db

# Let's create some random dataframes of shape (100, 10)
path = Path("./tmp")
path.mkdir(exist_ok=True)
for i in range(20):
    df = pd.DataFrame(np.random.normal(size=(100, 10)))
    df.columns = map(str, df.columns)
    df.to_parquet(path / f"{i}.parquet")
files = [path / f"{i}.parquet" for i in range(20)]

df = dd.read_parquet(files)
df3 = df.map_partitions(lambda df: df.sum(), meta=df.head().sum())
assert df3.compute().shape == (200,)  # <=== dim0 has been summed

we see that the dim0 of the dataframes has been summed.

In some sense the data is 3-dimensional and we want to sum over the “files” dimension.

If we go to numpy, we can get the correct shape:

df = dd.read_parquet(files)
arr4 = df.values.compute_chunk_sizes().reshape(20, -1, 10).sum(0)
assert arr4.compute().shape == (100, 10)  # <=== this is correct.

And we have a nice distribution of the sum:

arr4.visualize()

It could be nice to be able do the same thing with the dataframe.

Hi again @eserie, sorry for misunderstanding your question initially! It sounds like you’re looking to add your dataframes together, and they way you’ve shared already using Dask bag is a perfectly good way to do this. If you also need to do some dataframe processing, then it could be simpler to use Dask dataframe, where something like this would work:

from pathlib import Path
import numpy as np
import pandas as pd
import dask.dataframe as dd

# create some random dataframes of shape (100, 10)
path = Path("./tmp")
path.mkdir(exist_ok=True)
for i in range(5):
    df = pd.DataFrame(np.random.normal(size=(100, 10)))
    df.columns = map(str, df.columns)
    df.to_parquet(path / f"{i}.parquet")
files = [path / f"{i}.parquet" for i in range(5)]

ddf_list = []
for file in files:
    ddf = dd.read_parquet(file) # will work better if partitioning is the same
    ddf_list.append(ddf)

ddf_total = ddf_list[0]
for ddf in ddf_list[1:]:
    ddf_total += ddf

ddf_total.compute().shape

Or for a tree reduction, like with Dask bag, you could do:

import toolz

# with the same list of files as above
ddf_list = []
for file in files:
    ddf = dd.read_parquet(file) # will work better if partitioning is the same
    ddf_list.append(ddf)

while len(ddf_list) > 1:
    ddf_list = [df1 + df2 if df2 is not None else df1 for df1, df2 in list(toolz.partition(2, ddf_list, pad=None))]

ddf_list[0].compute.shape

Then the task graph from ddf_list[0].visualize() looks more like that of Dask bag sum (with split_every=2)

Thanks @scharlottej13 !
I like your solution with toolz.partition, it permits to have a good control on the tree reduction.
However, the tree reduction is done in a different way in dask.bag (the sum-part and sum-aggregate seem to sum more than 2 items and the number of items may vary depending on the level in the tree). Do you think that one solution is more appropriate than the other?

1 Like

Hi @eserie! I think how appropriate one solution is over another depends more on what else you’re doing-- e.g. if you need to do additional preprocessing, it could be simpler to to use Dask dataframe to avoid converting back and forth between Dask bag and Dask dataframe.

1 Like