Why align_partitions() use force=True?

Hi,

When passing severals dask dataframes to map_partitions(), it looks like the underling align_partitions() force repartitioning every dd even if only a single one needs it.

dds = [dask.datasets.timeseries() for i in range(5)]
# Only the latest dd partitionning will differ
dds[-1] = dds[-1].repartition(npartitions=1)

def func(*args):
    return args[0]

dd = dask.dataframe.map_partitions(func, *dds)
dd.dask

image

Hi @epizut,

That is correct, you can track that down in the source code:

However, even if you see the repartition-merge layer on all Dataframes, Dask is smart enough not to do useless work in the background. I’ve reduced a bit the size of your example:

import dask
import dask.dataframe as dd

dds = [dask.datasets.timeseries(end="2000-01-05") for i in range(3)]
# Only the latest dd partitionning will differ
dds[-1] = dds[-1].repartition(npartitions=1)

def func(*args):
    return args[0]

dd = dask.dataframe.map_partitions(func, *dds)

This is the task graph generated when calling dd.visualize():

You can see that the repartition-split layer is actually not changing anything on the data partitions.

But maybe you’re right and the code could be optimized in some way.

I found this repartition force=True by investigating a very long graph packing/unpacking time before computing starts. I think it would help a lot to avoid any non-necessary graph nodes.

Should I create a PR to remove force=True? Do you see any advantage in forcing the repartition for every dd? A better visualization maybe? Does it worth inflating the graph size?

I think I am missing something here because the force parameter in repartition() has been created on purpose for this specific use case.

I think the force kwarg of repartition is unrelated to the align_partition call that is done no matter what.

At his point, if this is causing a problem in your use case, I think you should open an issue in Dask github to discuss this. Something like: “Repartition dataframes that only need it when passing several ones to map_partitions”. I’m not sure if the solution is to just remove the force=True, but maybe elsewhere.

And I think in the end you’re right, if the tasks graph can be optimize, let’s try to do it!

Thank you for your help, let’s continue here