Dataframe merge with the partitioned dataframe

Hi

I have two Dask dataframe A and B to merge left on “nodes” column and right on “id” column
nodes column in A has duplicate Ids. As long as dataframe B has more than one partition I got incorrect results, such as

merged =                 id        nodes   latitude  longitude
0            30947   8550865317        NaN        NaN
0            30947   3035378247        NaN        NaN
0            30947  11035160757        NaN        NaN
0            30947  11972686129        NaN        NaN
0            30947    117812585        NaN        NaN
...            ...          ...        ...        ...

merge code

A = A.merge(B, how="left", left_on=["nodes"], right_on=["id"], right_index=True, suffixes=(False, False))

If i make dataframe B as single partition, the result is correct

Any idea how to deal with this issue?

Sam

Hi,

Could you provide a sample of A and B Dataframe? And ideally some reproducer with partial data?

Are they indexed? You use both right_on=["id"], right_index=True, is that intentional?

Hi @guillaumeeb,

Here is my reproducable script, and yes, right dataframe has index. But it doesn’t matter actually. the below script i made lots of experiments, and only i made right partition to 1. i can see correct result, otherwise i always see latitude and longitude missing. I also sent you the data link, see if you can access it, thanks.

My Dask version is 2024.2.1, Pandas 2.1.4, PyArrow 15.0.2, Numpy 1.22.4


def main():
    cluster = LocalCluster(n_workers=3, processes=False, silence_logs=logging.DEBUG)
    with Client(cluster) as client:
        dc.set({"dataframe.convert-string": False})
        ways = dd.read_parquet(
            "fji.osm/way", columns=["id", "nodes"], blocksize="32MiB")
        node_coordinates = dd.read_parquet(
            "fji.osm/node", columns=["latitude", "longitude"], index=["id"], blocksize="32MiB")
        node_coordinates = node_coordinates.repartition(npartitions=3)  ## --> change partition number to 1, and the result is correct
        ways[["nodes"]] = ways["nodes"].apply(lambda w: pd.Series({"nodes": [node.get("nodeId") for node in w]}), meta={"nodes": "object"})
        df = ways.map_partitions(stack, meta={"id": "int64", "nodes": "int64"})
        df = df.merge(node_coordinates, how="left", left_on=["nodes"], right_on=["id"], right_index=True, shuffle_method="p2p")
        df.to_csv("joined_way.csv")
        df = df.map_partitions(unstack, meta={"coords": "object"})


def unpack_node(way):
    return pd.Series({"nodes": [node["nodeId"] for node in way["nodes"]]})

def stack(way_df):
    new_df = way_df.set_index("id").explode("nodes").reset_index(0)
    return new_df


def concat_dict(r):
    data = []
    for _id, lon, lat in zip(r["nodes"], r["longitude"], r["latitude"]):
        data.append((_id, lon, lat))
    return pd.Series({"coords": data})

def unstack(df: pd.DataFrame):
    df = df.groupby(by="id", sort=False, group_keys=False).apply(concat_dict)
    return df


if __name__ == "__main__":
    main()

Hi @guillaumeeb,

Can you reproduce this issue?

Sam