In order to reduce shuffling of merge, I’m trying to align a secondary DataFrame to use the same workers as a primary DataFrame.
But solution below does not work as ddf.dask_keys() returns different keys compare to client.who_has().
How to address these issue?
More general questions how to reduce shuffling when merging two dataframe where second data frame has index that is sub-set of the first dataframe.
def align_dataframes_with_index(primary_df, secondary_df, index_column):
"""
Align a secondary DataFrame to use the same workers and index structure
as a primary DataFrame using set_index instead of repartition.
Parameters:
-----------
primary_df : dask.dataframe.DataFrame
The DataFrame whose divisions and worker assignments we want to match
secondary_df : dask.dataframe.DataFrame
The DataFrame we want to realign
index_column : str
The column to use as index for alignment
Returns:
--------
dask.dataframe.DataFrame
Realigned secondary DataFrame
"""
client = Client.current()
# First, we need to properly persist and track the primary DataFrame
# Instead of just persist(), we'll use client.persist() which returns futures
primary_df = client.persist(primary_df)
# Wait for the futures to complete
wait(primary_df)
# Now get the actual keys that exist on the workers
primary_keys = [key for key in primary_df.__dask_keys__()
if isinstance(key, tuple)] # Filter for actual partition keys
# Get the worker assignments for these keys
primary_workers = client.who_has(primary_keys)
# Verify we have worker information
if not primary_workers:
raise RuntimeError(
"Failed to get worker assignments. "
"This might indicate issues with task distribution."
)
# Handle divisions setup
if primary_df.divisions == (None, None):
# Calculate min and max of the index using client.compute()
min_idx = client.compute(primary_df.index.min()).result()
max_idx = client.compute(primary_df.index.max()).result()
# Create divisions
if isinstance(min_idx, (int, np.integer)):
divisions = [min_idx, max_idx + 1]
else:
divisions = [min_idx, max_idx]
# Set index with proper divisions
primary_df = client.persist(
primary_df.set_index(
primary_df.index,
sorted=True,
divisions=divisions
)
)
wait(primary_df)
# Get the final divisions
divisions = primary_df.divisions
# Set index on secondary DataFrame
aligned_df = secondary_df.set_index(
index_column,
sorted=False,
divisions=divisions,
drop=False
)
# Create worker mapping using the verified worker assignments
worker_mapping = {}
for aligned_key, primary_key in zip(
aligned_df.__dask_keys__(),
primary_df.__dask_keys__()
):
if primary_key in primary_workers:
worker_mapping[aligned_key] = [primary_workers[primary_key][0]]
# Use client.persist() instead of just persist()
aligned_df = client.persist(aligned_df, workers=worker_mapping)
wait(aligned_df)
return aligned_df
Hi @slepeturin,
I’m not sure I understand your issue. Do you think you could build a simple roproducer? In order to know which worker has which DataFrame partitions, I think you should just use
client.who_has(primary_df)
That being said, I’m not sure of your goal here. It seems to me you are reimplementing a merge method. What you are doing is pretty much was Dask is doing by himself, aligning partitions and divisions before merging DataFrames. Moreover, your solution suppose the entire DataFrames are fitting in the cluster memory.
There are good explanations on this page on how to optimize these joins.
Thanks Guillaume for your reply!
The goal : to not just have the same divisions for dataframes
that could be joined by index, but also assign the same divisions to
the same worker in order to reduce shuffling between workers (assumption of cause that memory not the issue)
How to force DASK to do it?
I mean setting index as
df1= df1.set_index('my_index ',drop=False,division=sorted(the_divisions))
df2= df2.set_index('my_index ',drop=False,division=sorted(the_divisions))
and then merge
result = df1.merge(
df2, how=“inner”, left_index=True, right_index=True)
Few related questions:
-
If divisions would be lost [None,None], any issue with join by index or using map_partitions in this case?
-
Setting npartitions=1 , could lead the divisions be lost as [None, None]
Even if the function below be applied.
def check_dd_divisions_int(dd_in):
if dd_in.divisions == (None,None):
dd_out=dd_in.reset_index(drop=False).set_index(dd_in.index.name,Sorted=False,divisions=[dd_in.index.min().compute(),dd_in.index.max().compute() +1])
else:
dd_out=dd_in
return dd_out
The divisions would be lost again after persist , how to address this issue?
Thank you,
Steve
That is what I’m not sure to understand. Doing this, you implement a shuffling method, this is exactly what shuffling is doing, isn’t it?
I would say that joining by index should still work but might induce shuffling. however if no divisions are set, map_partitions won’t work.
I’m a little lost here. Do you think you could build a reproducer to illustrate your questions?
Thanks Guillaume for your reply !
Regarding:
“but also assign the same divisions to
the same worker in order to reduce shuffling between workers
That is what I’m not sure to understand. Doing this, you implement a shuffling method, this is exactly what shuffling is doing, isn’t it?”
I missed point that all these dataframe has persist.
So we shuffling once; and then logic would use these dataframes in merges multiple time.
This way we reduce shuffling between workers during mergers.
Actually another question: what is more efficient: assign index to dataframes (indexes would have the same divisions and then do merge or merge on column and assign index only when map_partitions is used (as you pointed out: “however if no divisions are set, map_partitions won’t work.”)
Regarding the one partition: it looks like issue propagate only for DASK version: ‘2024.10.0’ with dask.config.set({‘dataframe.query-planning’: True})
So you have either more than 2 Dataframes, or you are performing multiple merges? Cloud you describe the high level worflow you are trying to optimize? Maybe it would be just easier to call persist after the first merge?
Merge on column that are not index actually performs a set_index inside the hood. Index should be kept after the merge I guess.
Thanks Guillaume for your reply !
Scenario for merge:
Let say we have four dataframes: df1,df2,df3,df4.
All four dataframes have index with the same division
We would need:
res1= df1 let merge df3
res2= df2 left merge df4
We apply some map_partitions on Res1 and res2
And then concatenate: res1,df3,res2,df4.
Regarding the merge on columns/index.
If it is multiple column, what Index are build under the hood.
As creating index on multiple columns would raise not implemented feature I would need concatenate columns into string and then build index.