Align a secondary DataFrame to use the same workers and index structure as a primary DataFrame

In order to reduce shuffling of merge, I’m trying to align a secondary DataFrame to use the same workers as a primary DataFrame.
But solution below does not work as ddf.dask_keys() returns different keys compare to client.who_has().
How to address these issue?
More general questions how to reduce shuffling when merging two dataframe where second data frame has index that is sub-set of the first dataframe.

def align_dataframes_with_index(primary_df, secondary_df, index_column):
    """
    Align a secondary DataFrame to use the same workers and index structure
    as a primary DataFrame using set_index instead of repartition.

    Parameters:
    -----------
    primary_df : dask.dataframe.DataFrame
        The DataFrame whose divisions and worker assignments we want to match
    secondary_df : dask.dataframe.DataFrame
        The DataFrame we want to realign
    index_column : str
        The column to use as index for alignment

    Returns:
    --------
    dask.dataframe.DataFrame
        Realigned secondary DataFrame
    """
    client = Client.current()

    # First, we need to properly persist and track the primary DataFrame
    # Instead of just persist(), we'll use client.persist() which returns futures
    primary_df = client.persist(primary_df)

    # Wait for the futures to complete
    wait(primary_df)

    # Now get the actual keys that exist on the workers
    primary_keys = [key for key in primary_df.__dask_keys__()
                   if isinstance(key, tuple)]  # Filter for actual partition keys

    # Get the worker assignments for these keys
    primary_workers = client.who_has(primary_keys)

    # Verify we have worker information
    if not primary_workers:
        raise RuntimeError(
            "Failed to get worker assignments. "
            "This might indicate issues with task distribution."
        )

    # Handle divisions setup
    if primary_df.divisions == (None, None):
        # Calculate min and max of the index using client.compute()
        min_idx = client.compute(primary_df.index.min()).result()
        max_idx = client.compute(primary_df.index.max()).result()

        # Create divisions
        if isinstance(min_idx, (int, np.integer)):
            divisions = [min_idx, max_idx + 1]
        else:
            divisions = [min_idx, max_idx]

        # Set index with proper divisions
        primary_df = client.persist(
            primary_df.set_index(
                primary_df.index,
                sorted=True,
                divisions=divisions
            )
        )
        wait(primary_df)

    # Get the final divisions
    divisions = primary_df.divisions

    # Set index on secondary DataFrame
    aligned_df = secondary_df.set_index(
        index_column,
        sorted=False,
        divisions=divisions,
        drop=False
    )

    # Create worker mapping using the verified worker assignments
    worker_mapping = {}
    for aligned_key, primary_key in zip(
        aligned_df.__dask_keys__(),
        primary_df.__dask_keys__()
    ):
        if primary_key in primary_workers:
            worker_mapping[aligned_key] = [primary_workers[primary_key][0]]

    # Use client.persist() instead of just persist()
    aligned_df = client.persist(aligned_df, workers=worker_mapping)
    wait(aligned_df)

    return aligned_df

Hi @slepeturin,

I’m not sure I understand your issue. Do you think you could build a simple roproducer? In order to know which worker has which DataFrame partitions, I think you should just use

client.who_has(primary_df)

That being said, I’m not sure of your goal here. It seems to me you are reimplementing a merge method. What you are doing is pretty much was Dask is doing by himself, aligning partitions and divisions before merging DataFrames. Moreover, your solution suppose the entire DataFrames are fitting in the cluster memory.

There are good explanations on this page on how to optimize these joins.

Thanks Guillaume for your reply!
The goal : to not just have the same divisions for dataframes
that could be joined by index, but also assign the same divisions to
the same worker in order to reduce shuffling between workers (assumption of cause that memory not the issue)
How to force DASK to do it?
I mean setting index as
df1= df1.set_index('my_index ',drop=False,division=sorted(the_divisions))
df2= df2.set_index('my_index ',drop=False,division=sorted(the_divisions))
and then merge
result = df1.merge(
df2, how=“inner”, left_index=True, right_index=True)

Few related questions:

  1. If divisions would be lost [None,None], any issue with join by index or using map_partitions in this case?

  2. Setting npartitions=1 , could lead the divisions be lost as [None, None]
    Even if the function below be applied.
    def check_dd_divisions_int(dd_in):
    if dd_in.divisions == (None,None):
    dd_out=dd_in.reset_index(drop=False).set_index(dd_in.index.name,Sorted=False,divisions=[dd_in.index.min().compute(),dd_in.index.max().compute() +1])
    else:
    dd_out=dd_in

    return dd_out

The divisions would be lost again after persist , how to address this issue?
Thank you,
Steve

That is what I’m not sure to understand. Doing this, you implement a shuffling method, this is exactly what shuffling is doing, isn’t it?

I would say that joining by index should still work but might induce shuffling. however if no divisions are set, map_partitions won’t work.

I’m a little lost here. Do you think you could build a reproducer to illustrate your questions?

Thanks Guillaume for your reply !
Regarding:
“but also assign the same divisions to
the same worker in order to reduce shuffling between workers
That is what I’m not sure to understand. Doing this, you implement a shuffling method, this is exactly what shuffling is doing, isn’t it?”
I missed point that all these dataframe has persist.
So we shuffling once; and then logic would use these dataframes in merges multiple time.
This way we reduce shuffling between workers during mergers.
Actually another question: what is more efficient: assign index to dataframes (indexes would have the same divisions and then do merge or merge on column and assign index only when map_partitions is used (as you pointed out: “however if no divisions are set, map_partitions won’t work.”)
Regarding the one partition: it looks like issue propagate only for DASK version: ‘2024.10.0’ with dask.config.set({‘dataframe.query-planning’: True})

So you have either more than 2 Dataframes, or you are performing multiple merges? Cloud you describe the high level worflow you are trying to optimize? Maybe it would be just easier to call persist after the first merge?

Merge on column that are not index actually performs a set_index inside the hood. Index should be kept after the merge I guess.

Thanks Guillaume for your reply !
Scenario for merge:
Let say we have four dataframes: df1,df2,df3,df4.
All four dataframes have index with the same division
We would need:
res1= df1 let merge df3
res2= df2 left merge df4
We apply some map_partitions on Res1 and res2
And then concatenate: res1,df3,res2,df4.

Regarding the merge on columns/index.
If it is multiple column, what Index are build under the hood.
As creating index on multiple columns would raise not implemented feature I would need concatenate columns into string and then build index.