Partition-wise joins (perfectly aligned partitions) using map_partitions

Good afternoon,

I have two sets of data in parquet files. The data contains a key called global_id which I want to use to join the two datasets.

The data I have has been output from Apache Spark, with the global_id hashed with a view to keeping file sizes relatively constant so global_id is not indexed or ordered in the parquet files.

I can easily index it, but I can’t easily order it. I do not want a solution involving re-writing the files. The dataset is Terrabytes and the basic merge function of dask is blowing up memory, since it can’t do much with non indexed, non sorted data. However I know that I have nice partition alignment.

Each dataset has 2000 partitions and each partition contains the same global_id records. That is to say in dataset1.parition.124.parquet the same global_ids exist as dataset2.parition.124.parquet.

I therefore believe I can join on a partition by partition basis. I have tried the following:

ddf1 = dd.read_parquet('dataset1/*.parquet')
ddf2 = dd.read_parquet('dataset2/*.parquet')

def join_partitions(part1, part2):
    return part1.merge(part2, how="inner", on="global_id")

joined_ddf = dd.map_partitions(
    join_partitions,
    ddf1.partitions[:500],
    ddf2.partitions[:500],
)

print(joined_ddf.count().compute())

As you can see if I give it the first 500 partitions, it works as I expect. However as I increase this number, to around 1000 partitions the job starts to error with an error like:

RuntimeError: Cycle detected in Dask: (‘read_parquet-fused-9d2ee5e9db5532eb0a88d360ee211a2f’, 34)->_0->(‘read_parquet-fused-9d2ee5e9db5532eb0a88d360ee211a2f’, 34)

If I change the partition select to be [500:1000] it works fine, so it doesn’t seem to be the specific partitions, rathert the amount of partitions.

I can confirm the partitions in the two datasets are perfectly aligned in terms of their containment of the same global_ids.

Could someone give some advice?

Thanks

Hi @es-code-bar, welcome to Dask community!

Could you give us the complete stacktrace you see? As you said, if you have the same number of partitions and they are aligned, this should be embarrasingly parallel.

Did you try the align_dataframes=False kwarg if everything is already fine?