I have two Dask dataframe A and B to merge left on “nodes” column and right on “id” column
nodes column in A has duplicate Ids. As long as dataframe B has more than one partition I got incorrect results, such as
merged = id nodes latitude longitude
0 30947 8550865317 NaN NaN
0 30947 3035378247 NaN NaN
0 30947 11035160757 NaN NaN
0 30947 11972686129 NaN NaN
0 30947 117812585 NaN NaN
... ... ... ... ...
merge code
A = A.merge(B, how="left", left_on=["nodes"], right_on=["id"], right_index=True, suffixes=(False, False))
If i make dataframe B as single partition, the result is correct
Here is my reproducable script, and yes, right dataframe has index. But it doesn’t matter actually. the below script i made lots of experiments, and only i made right partition to 1. i can see correct result, otherwise i always see latitude and longitude missing. I also sent you the data link, see if you can access it, thanks.
My Dask version is 2024.2.1, Pandas 2.1.4, PyArrow 15.0.2, Numpy 1.22.4
def main():
cluster = LocalCluster(n_workers=3, processes=False, silence_logs=logging.DEBUG)
with Client(cluster) as client:
dc.set({"dataframe.convert-string": False})
ways = dd.read_parquet(
"fji.osm/way", columns=["id", "nodes"], blocksize="32MiB")
node_coordinates = dd.read_parquet(
"fji.osm/node", columns=["latitude", "longitude"], index=["id"], blocksize="32MiB")
node_coordinates = node_coordinates.repartition(npartitions=3) ## --> change partition number to 1, and the result is correct
ways[["nodes"]] = ways["nodes"].apply(lambda w: pd.Series({"nodes": [node.get("nodeId") for node in w]}), meta={"nodes": "object"})
df = ways.map_partitions(stack, meta={"id": "int64", "nodes": "int64"})
df = df.merge(node_coordinates, how="left", left_on=["nodes"], right_on=["id"], right_index=True, shuffle_method="p2p")
df.to_csv("joined_way.csv")
df = df.map_partitions(unstack, meta={"coords": "object"})
def unpack_node(way):
return pd.Series({"nodes": [node["nodeId"] for node in way["nodes"]]})
def stack(way_df):
new_df = way_df.set_index("id").explode("nodes").reset_index(0)
return new_df
def concat_dict(r):
data = []
for _id, lon, lat in zip(r["nodes"], r["longitude"], r["latitude"]):
data.append((_id, lon, lat))
return pd.Series({"coords": data})
def unstack(df: pd.DataFrame):
df = df.groupby(by="id", sort=False, group_keys=False).apply(concat_dict)
return df
if __name__ == "__main__":
main()