How to do Range Joins with Dask?

I’m using a Dask Dataframe join, but need to perform a range join.

For examples of range joins, see. e.g. Spark range joins:

SELECT *
FROM points, ranges
WHERE points.symbol = ranges.symbol
  AND points.p >= ranges.start
  AND points.p < ranges.end;

Dask pseudo code:

points.merge(
    ranges,
    how="inner",
    on=["symbol"],
    # conditions=[""]
).compute()

I know these kinds of joins can be expensive. But are there any way to support them (before the compute() call)?

In practice, the ranges table is small, and the points table is a large online parquet table.

Hi @th0ger and welcome! Since the ranges table is small, I think a merge then filter would be fairly efficient since the computation will be embarrassingly parallel (more on that here). You’d replace the dd.from_pandas with dd.read_parquet:

import pandas as pd
import dask.dataframe as dd

points = dd.from_pandas(
    pd.DataFrame(dict(symbol=range(10), p=range(5, 105, 10))),
    npartitions=2
)

# if your smaller table can fit easily in memory, consider ensuring
# it is a single partition with the repartition method
ranges = dd.from_pandas(
    pd.DataFrame(
        dict(symbol=range(5), start=[0, 15, 30, 46, 74],
             end=[10, 40, 50, 54, 84])
    ), npartitions=1
)

merged = points.merge(ranges, how='inner', on='symbol')
filtered = merged.loc[
    ((merged['p'] >= merged['start']) & (merged['p'] < merged['end']))
]

filtered.compute()

Here’s the result of merged.visualize(), showing the embarrassingly parallel computation:
download

1 Like