How to merge documents of one bag into another?

I’m working on a problem where I want to merge two datasets into a single collection. These datasets come from different sources, and have a sort parent-child relationship i.e., there is a reference key between the datasets.

The datasets are complex document structures (dict), so I am using dask bags. Bags are useful as I have to perform certain transformations. In one of the transformations I want to merge the child data into the parent document based on the reference key. One of the problems is that the parent dataset is relatively large (millions of documents, over 100GB of data). However, the child dataset can probably be stored in memory (less than 5GB). Another problem is that the reference is in an array of the parent. Thus, one parent may point to multiple child documents.

Example with fake data

parents = [
    dict(
        id=x,
        elements=[dict(child_id=x), dict(child_id=x+1)] if x%3==0 else []
    ) for x in range(10)
]

# Output
[{'id': 0, 'elements': [{'child_id': 0}, {'child_id': 1}]},
 {'id': 1, 'elements': []},
 {'id': 2, 'elements': []},
 {'id': 3, 'elements': [{'child_id': 3}, {'child_id': 4}]},
 {'id': 4, 'elements': []},
 {'id': 5, 'elements': []},
 {'id': 6, 'elements': [{'child_id': 6}, {'child_id': 7}]},
 {'id': 7, 'elements': []},
 {'id': 8, 'elements': []},
 {'id': 9, 'elements': [{'child_id': 9}, {'child_id': 10}]}]

childs = [
    dict(
        id=x,
        value=x**x
    ) for x in range(10)
]

# Output
[{'id': 0, 'value': 1},
 {'id': 1, 'value': 1},
 {'id': 2, 'value': 4},
 {'id': 3, 'value': 27},
 {'id': 4, 'value': 256},
 {'id': 5, 'value': 3125},
 {'id': 6, 'value': 46656},
 {'id': 7, 'value': 823543},
 {'id': 8, 'value': 16777216},
 {'id': 9, 'value': 387420489}]

What I’m trying to achieve is a resulting set as follows

[{'id': 0,
  'elements': [
      {'child_id': 0, 'value': {'id': 0, 'value': 1}},
      {'child_id': 1, 'value': {'id': 1, 'value': 1}}
    ]},
 {'id': 1, 'elements': []},
 {'id': 2, 'elements': []},
 {'id': 3,
  'elements': [
      {'child_id': 3, 'value': {'id': 3, 'value': 27}},
      {'child_id': 4, 'value': {'id': 4, 'value': 256}}
    ]},
 {'id': 4, 'elements': []},
 {'id': 5, 'elements': []},
 {'id': 6,
  'elements': [
      {'child_id': 6, 'value': {'id': 6, 'value': 46656}},
      {'child_id': 7, 'value': {'id': 7, 'value': 823543}}
    ]},
 {'id': 7, 'elements': []},
 {'id': 8, 'elements': []},
 {'id': 9,
  'elements': [
      {'child_id': 9, 'value': {'id': 9, 'value': 387420489}},
      {'child_id': 10}
    ]}]

How I would do this normally is iterating through the parent dataset, and find the matching child document. However, how can I best achieve this using Dask?

I think the possible options include:

1) Join the datasets using dataframes. This means transforming the datasets into dataframes, and indexing on the relevant fields. However, with this large dataset its costly operation. In addition, the parent dataset has can have multiple child elements.

2) Create a shared lookup dictionary for the child dataset. This would be my initial guess, but I don’t think dask has any support for this.

2.1) Alternatively, I thought about creating an indexed dataframe of the child dataset. However I don’t know if the overhead is worth it of retrieving during iterating each element (I should call .compute per element then right?)

2.2) Another interesting approach I found was actors; Actors — Dask.distributed 2021.12.0+21.gd1cf1d45 documentation. I think the benefit is that the data stays in the cluster but is limited to the available memory of a worker.

3) Move the child dataset to an external key-value store (e.g., redis). This introduces overhead of moving the data from the workers to redis.

For now I’m going with 3, but I’d like to hear your comments / suggestions on the best approach here :slight_smile:

1 Like

Hi @joell, welcome, this is an interesting question, and thanks for the nice example snippet!

This is definitely possible with dask.bag, with the caveat that the smaller bag in the join must have a single partition. This is because that smaller bag is scattered to the workers with the partitions of the larger bag so that the join can be done locally for each of its partitions. At 5 GB, I would think that your smaller bag would be a bit too large for that to be a comfortable option.

I would generally recommend that you use Dask Dataframes for this (that is to say, Option 1), and setting an index on the child dataframe. There will be some overhead to converting to dataframes from a Bag, but given the optimizations of Pandas and the more sophisticated merges that dataframes offer, I would thing the overhead would be well worth it. This would also allow for multi-partition merges, which dask.bag would not for your smaller collection.

I would recommend against using Actors for this. They are a more experimental API, have fewer resources available, and you’ll probably have a better time sticking to more well-trod paths of what is, at the end of the day, a merge operation.

I took a crack at what this would look like with dask.bag. It’s a little awkward to de-nest and then re-nest the dataset, but is doable:

import dask.bag as db
import toolz

parents = [
    dict(
        id=x,
        elements=[dict(child_id=x), dict(child_id=x+1)] if x%3==0 else []
    ) for x in range(10)
]

childs = [
    dict(
        id=x,
        value=x**x
    ) for x in range(10)
]

parents = db.from_sequence(parents, npartitions=2)
childs = db.from_sequence(childs, npartitions=1) # can only have a single partition!

# Take the nested dict and de-nest it so that there is an id, child_id for each row
def denest(partition):
    rows = []
    for row in partition:
        rows += [{ "id": row["id"], "child_id": e["child_id"]} for e in row["elements"]]
    return rows

# Take the `join` tuple and put it into a dict of the right shape.
def combine(el):
    return {**el[1], "value": el[0]["value"]}

# The join step
joined = (
    parents
    .map_partitions(denest)
    .join(childs, on_self="child_id", on_other="id")
    .map(combine)
)

# The reduction function for groups of "id", combining the elements lists together.
def reduce(el):
    return {
        "id": el[0],
        "elements": [{"child_id": e["child_id"], "value": e["value"]} for e in el[1]]
    }

# We would normally use `db.groupby` or `db.foldby` here, but in this case
# we know that the groups already live on the same partitions, and we don't
# want to trigger any logic around swapping data between partitions!
# So instead we do a partitionwise groupby.
reduced = (
    joined
    .map_partitions(lambda p: list(toolz.groupby("id", p).items()))
    .map(reduce)
)

# reduced is almost what we want! But we lost rows that had no child
# elements. This recovers them by doing a paritionwise join and choosing
# the "reduced" version if it exists.
def coalesce(a, b):
    joined = list(toolz.join("id", a, "id", b, right_default=None))
    return [x[0] if x[1] is None else x[1] for x in joined]

final = db.map_partitions(coalesce, parents, reduced)
final.compute()

Which produces

[{'id': 0,
  'elements': [{'child_id': 0, 'value': 1}, {'child_id': 1, 'value': 1}]},
 {'id': 3,
  'elements': [{'child_id': 3, 'value': 27}, {'child_id': 4, 'value': 256}]},
 {'id': 1, 'elements': []},
 {'id': 2, 'elements': []},
 {'id': 4, 'elements': []},
 {'id': 6,
  'elements': [{'child_id': 6, 'value': 46656},
   {'child_id': 7, 'value': 823543}]},
 {'id': 9, 'elements': [{'child_id': 9, 'value': 387420489}]},
 {'id': 5, 'elements': []},
 {'id': 7, 'elements': []},
 {'id': 8, 'elements': []}]

It’s also interesting to take a look at the task graph for this. Note that in the join step there is a partition that is scattered to all of the others: this is the child partition:

Hi @joell, have you had the chance to try either a dask.Bag approach or a dask.DataFrame approach for your problem?

Hi Ian,

Thanks for your reply, and the thorough example. It seems like an interesting approach. But it would only work when the smaller set fits in single partition right?
For this use case it might actually work then. So then this is really nice example how to proceed :smirk:


My current task had a deadline, so I went with option 3. This option worked suprisingly well :slight_smile: What I did:

  1. Load the “child dataset” into redis with the respective key
  2. Iterate through the parent dataset
  3. When we have a child relation, get the data from redis

To load the data into redis I wrote (inspired by dask-mongo package)

def write_redis(values: list, func_key, value_func):
    r = redis.Redis(host='redis-joell-master')
    for value in values:
        key = func_key(value)
        value = json.dumps(value_func(value))
        
        for i in range(5):
            try:
                r.set(key, value)
            except redis.ConnectionError:
                sleep(5 + 2**i)
            else:
                break

def to_redis(bag: db.Bag, func_key, value_func):
    partials = bag.map_partitions(write_redis, func_key, value_func)
    collect = checkpoint(partials)
    return collect.compute()

However I thought some more about option 1, and tried to validate if it works. The problem I think is that one parent, can have multiple child elements. So I thought about the following approach (for future reference).

First transform the datasets into lookup DataFrames:
i. Child DataFrame of <child_id>, <content>
ii. Parent DataFrame of <parent_id>, <content>
iii. Flattened Parent → Child lookup of <parent_id>, <child_id> such that <parent_id> can be duplicated when there are multiple <child_id>.

Secondly, we index dataset i and iii on the <child_id>, and join these DataFrames. Subsequently, we reindex the joined table on <parent_id>, and join it with the parent dataset ii.

Finally, we can map through the partitions of dataset ii to construct the dataset in the expected form.

I think this might work. What do you think of this approach?

Sorry for the slow reply @joell, and thanks for sharing the redis-based approach. That’s a trick I haven’t seen before with Dask, and it’s kind of fun :).

Your sketch for a dataframe approach makes sense to me. It may or may not be worth it to index the flattened parent dataset on child_id, but that’s something that would take some experimentation to determine.