When passing severals dask dataframes to map_partitions(), it looks like the underling align_partitions() force repartitioning every dd even if only a single one needs it.
dds = [dask.datasets.timeseries() for i in range(5)]
# Only the latest dd partitionning will differ
dds[-1] = dds[-1].repartition(npartitions=1)
def func(*args):
return args[0]
dd = dask.dataframe.map_partitions(func, *dds)
dd.dask
That is correct, you can track that down in the source code:
However, even if you see the repartition-merge layer on all Dataframes, Dask is smart enough not to do useless work in the background. I’ve reduced a bit the size of your example:
import dask
import dask.dataframe as dd
dds = [dask.datasets.timeseries(end="2000-01-05") for i in range(3)]
# Only the latest dd partitionning will differ
dds[-1] = dds[-1].repartition(npartitions=1)
def func(*args):
return args[0]
dd = dask.dataframe.map_partitions(func, *dds)
This is the task graph generated when calling dd.visualize():
I found this repartition force=True by investigating a very long graph packing/unpacking time before computing starts. I think it would help a lot to avoid any non-necessary graph nodes.
Should I create a PR to remove force=True? Do you see any advantage in forcing the repartition for every dd? A better visualization maybe? Does it worth inflating the graph size?
I think the force kwarg of repartition is unrelated to the align_partition call that is done no matter what.
At his point, if this is causing a problem in your use case, I think you should open an issue in Dask github to discuss this. Something like: “Repartition dataframes that only need it when passing several ones to map_partitions”. I’m not sure if the solution is to just remove the force=True, but maybe elsewhere.
And I think in the end you’re right, if the tasks graph can be optimize, let’s try to do it!