Understanding Dask best practices to avoid excessive object creation and GC collection when using Dask Bags


Recently I have been bumping into many garbage collection warnings. I think it might be due to how I’m using Dask Bags.

So I have a dataset containing 100,000s of arrays of varying length. At the beginning of my program I load these into a Bag. Each item of the Bag is a dict containing one of the arrays and some meta data, which is used later on during processing.

I then apply multiple functions to the Bag using map_partitions. These functions are of the following form:

def func(a):
  newa = a.copy()
  newa["key"] = op(newa["other_key"])
  return newa

Where op is just some function. I follow this form, to avoid mutating the input (as suggested by Dask best practices).

I apply them on the bag like so:

bag = bag.map_partitions([func(a) for a in partition])

Things work as expected on smaller datasets (around 10,000 arrays for example), but as size grows Dask starts warning me about excessive time spent garbage collecting. I think this might be due to the form of my func operations. Due to all the copying the number of objects will blow up very quickly, I assume. However, I’m not sure how to tackle this differently.

Any pointers on what I could improve?


Thanks for your question! To start, it looks like you can call dask.bag.map() instead of map_partitions(), which will then apply the provided function across each element of your bag. Then, you can use del per these docs to remove the unneeded objects from memory as you go.

I’ve put together a small example here:

import dask.bag as db
import numpy as np

def func(a, key='value'):
    newa = a.copy()
    newa[key] = newa[key] * 2
    return newa

# Each item of the Bag is a dict containing one of the arrays and some meta data
b = db.from_sequence([
    {'value': np.arange(10), 'color': 'red'},
    {'value': np.arange(1, 2.2, 0.1), 'color': 'blue'}
b # prints dask.bag<from_sequence, npartitions=2>
new_b = b.map(func)
# then delete the old bag from memory
del b
new_b = new_b.compute()


The reason I am using map_partitions instead of map is to avoid a very large task graph (see Avoid Very Large Graphs at Best Practices — Dask documentation). So from what I understand, I don’t think it be a solution to use map .

It seems the only thing I can do is optimize the code of my operations and merge some operations together to avoid copying to often.

Thanks for the additional explanation! Are you able to share a minimal reproducer? It’d be great if I could reproduce the same garbage collection warning you’re seeing and then we can troubleshoot from there.

I wasn’t able to make a minimal reproducer. It would require a large amount of data to actually reproduce it.

I ended up going through my code and optimizing the amount of objects I create by rewriting parts. I also focused on avoiding unnecessary copies of data. This has not entirely solved the problem, but it did improve the situation a lot.