I’m working on a problem where I want to merge two datasets into a single collection. These datasets come from different sources, and have a sort parent-child relationship i.e., there is a reference key between the datasets.
The datasets are complex document structures (dict), so I am using dask bags. Bags are useful as I have to perform certain transformations. In one of the transformations I want to merge the child data into the parent document based on the reference key. One of the problems is that the parent dataset is relatively large (millions of documents, over 100GB of data). However, the child dataset can probably be stored in memory (less than 5GB). Another problem is that the reference is in an array of the parent. Thus, one parent may point to multiple child documents.
Example with fake data
parents = [
dict(
id=x,
elements=[dict(child_id=x), dict(child_id=x+1)] if x%3==0 else []
) for x in range(10)
]
# Output
[{'id': 0, 'elements': [{'child_id': 0}, {'child_id': 1}]},
{'id': 1, 'elements': []},
{'id': 2, 'elements': []},
{'id': 3, 'elements': [{'child_id': 3}, {'child_id': 4}]},
{'id': 4, 'elements': []},
{'id': 5, 'elements': []},
{'id': 6, 'elements': [{'child_id': 6}, {'child_id': 7}]},
{'id': 7, 'elements': []},
{'id': 8, 'elements': []},
{'id': 9, 'elements': [{'child_id': 9}, {'child_id': 10}]}]
childs = [
dict(
id=x,
value=x**x
) for x in range(10)
]
# Output
[{'id': 0, 'value': 1},
{'id': 1, 'value': 1},
{'id': 2, 'value': 4},
{'id': 3, 'value': 27},
{'id': 4, 'value': 256},
{'id': 5, 'value': 3125},
{'id': 6, 'value': 46656},
{'id': 7, 'value': 823543},
{'id': 8, 'value': 16777216},
{'id': 9, 'value': 387420489}]
What I’m trying to achieve is a resulting set as follows
[{'id': 0,
'elements': [
{'child_id': 0, 'value': {'id': 0, 'value': 1}},
{'child_id': 1, 'value': {'id': 1, 'value': 1}}
]},
{'id': 1, 'elements': []},
{'id': 2, 'elements': []},
{'id': 3,
'elements': [
{'child_id': 3, 'value': {'id': 3, 'value': 27}},
{'child_id': 4, 'value': {'id': 4, 'value': 256}}
]},
{'id': 4, 'elements': []},
{'id': 5, 'elements': []},
{'id': 6,
'elements': [
{'child_id': 6, 'value': {'id': 6, 'value': 46656}},
{'child_id': 7, 'value': {'id': 7, 'value': 823543}}
]},
{'id': 7, 'elements': []},
{'id': 8, 'elements': []},
{'id': 9,
'elements': [
{'child_id': 9, 'value': {'id': 9, 'value': 387420489}},
{'child_id': 10}
]}]
How I would do this normally is iterating through the parent dataset, and find the matching child document. However, how can I best achieve this using Dask?
I think the possible options include:
1) Join the datasets using dataframes. This means transforming the datasets into dataframes, and indexing on the relevant fields. However, with this large dataset its costly operation. In addition, the parent dataset has can have multiple child elements.
2) Create a shared lookup dictionary for the child dataset. This would be my initial guess, but I don’t think dask has any support for this.
2.1) Alternatively, I thought about creating an indexed dataframe of the child dataset. However I don’t know if the overhead is worth it of retrieving during iterating each element (I should call .compute per element then right?)
2.2) Another interesting approach I found was actors; Actors — Dask.distributed 2022.8.1+6.gc15a10e8 documentation. I think the benefit is that the data stays in the cluster but is limited to the available memory of a worker.
3) Move the child dataset to an external key-value store (e.g., redis). This introduces overhead of moving the data from the workers to redis.
For now I’m going with 3, but I’d like to hear your comments / suggestions on the best approach here