Good afternoon,
I have two sets of data in parquet files. The data contains a key called global_id which I want to use to join the two datasets.
The data I have has been output from Apache Spark, with the global_id hashed with a view to keeping file sizes relatively constant so global_id is not indexed or ordered in the parquet files.
I can easily index it, but I can’t easily order it. I do not want a solution involving re-writing the files. The dataset is Terrabytes and the basic merge function of dask is blowing up memory, since it can’t do much with non indexed, non sorted data. However I know that I have nice partition alignment.
Each dataset has 2000 partitions and each partition contains the same global_id records. That is to say in dataset1.parition.124.parquet the same global_ids exist as dataset2.parition.124.parquet.
I therefore believe I can join on a partition by partition basis. I have tried the following:
ddf1 = dd.read_parquet('dataset1/*.parquet')
ddf2 = dd.read_parquet('dataset2/*.parquet')
def join_partitions(part1, part2):
return part1.merge(part2, how="inner", on="global_id")
joined_ddf = dd.map_partitions(
join_partitions,
ddf1.partitions[:500],
ddf2.partitions[:500],
)
print(joined_ddf.count().compute())
As you can see if I give it the first 500 partitions, it works as I expect. However as I increase this number, to around 1000 partitions the job starts to error with an error like:
RuntimeError: Cycle detected in Dask: (‘read_parquet-fused-9d2ee5e9db5532eb0a88d360ee211a2f’, 34)->_0->(‘read_parquet-fused-9d2ee5e9db5532eb0a88d360ee211a2f’, 34)
If I change the partition select to be [500:1000] it works fine, so it doesn’t seem to be the specific partitions, rathert the amount of partitions.
I can confirm the partitions in the two datasets are perfectly aligned in terms of their containment of the same global_ids.
Could someone give some advice?
Thanks