Hi friends, I want to make my group-by apply
more efficient by making sure the values from the same group only sit in the same partition. However, when I test the idea in the following program. It seems there’s still shuffling in the graph. I understand Dask has no idea about the fact that Im preparing the data in such a way. But is there a way to let it know? (I cannot set the groupby columns as indexes because multiindex is not allowed)
import dask
import pandas as pd
from dask.dataframe import from_pandas
from distributed import Client, LocalCluster
def tap(df):
print(df)
return df
def main():
with dask.config.set({'distributed.scheduler.allowed-failures': 0, "distributed.logging.distributed": "DEBUG"}):
with LocalCluster(n_workers=1, threads_per_worker=1, memory_limit="200MiB") as cluster, Client(cluster) as client:
df = pd.DataFrame(dict(a=list('xxyyzz'),
c=[datetime.datetime(2010, 1, 1),
datetime.datetime(2010, 1, 1),
datetime.datetime(2010, 1, 1),
datetime.datetime(2010, 2, 1),
datetime.datetime(2010, 2, 1),
datetime.datetime(2010, 2, 1)],
d=[1, 2, 3, 4, 5, 6],
))
print("------pandas------")
print(df)
ddf = from_pandas(df, npartitions=3)
print("------partitions------")
ddf.map_partitions(tap, meta=ddf).compute(scheduler=client)
print("------group by------")
group = ddf.groupby('a', 'c').apply(tap, meta=ddf)
group.visualize("chart.svg")
group.compute(scheduler=client)
if __name__ == "__main__":
main()
outputs
------partitions------
a c d
0 x 2010-01-01 1
1 x 2010-01-01 2
a c d
2 y 2010-01-01 3
3 y 2010-02-01 4
a c d
4 z 2010-02-01 5
5 z 2010-02-01 6
------group by------
a c d
2 y 2010-01-01 3
3 y 2010-02-01 4
a c d
0 x 2010-01-01 1
1 x 2010-01-01 2
a c d
4 z 2010-02-01 5
5 z 2010-02-01 6