Dask Bag significantly faster with `scheduler='processes'`, help me understand why?

Sorry, I’m still quite new to Dask, and trying to figure out what was going on here, and why using the processes scheduler is faster, and uses less memory, I understand bags use the multiprocessor by default.

Basically I’m processing some common crawl data, after creating a bag with from_sequence

client = Client()
# dask.config.set(scheduler='processes', num_workers=16)

warc_bag = db.from_sequence(warc_list)\
    .map(process_them)\
    .filter(filter_function)\
    .map(another_pre_processor)\

On the larger warc files I would often hit memory errors, and they would take up to 1 hour, for the files that wouldn’t hit memory errors, doing a gc.collect and malloc_trim on the client didn’t help. However as soon as I switched to the processes scheduler dask.config.set(scheduler='processes') the memory issue disappeared, and the the same large warcs now are 10-15mins.

I’m just trying to figure out what the big optimisation was here, and what happened.

Hi @winddude, good question!

Python has a global interpreter lock which prevents many operations from occurring on regular python objects concurrently within the same process. So if any of your processing functions touch the same objects, you can easily run into this lock, and all of your worker threads will have to wait around for each other. I expect this to be the reason for the performance difference you are seeing.

Some libraries (like numpy and pandas) have strategies for releasing the global interpreter lock, which allows them to be more amenable to multithreading. But dask bag, which usually operates on normal objects like dicts, lists, and strings, is usually best with the multiprocessing scheduler (indeed, that is the default scheduler for bag).

2 Likes

Hmm, the only non-local object the map functions (the 2nd one) touch is spacy, nlp = spacy.load('en_core_web_sm') it’s declared globally.

Wondering if the solution is to map_partitions, and call nlp.pipe as suggested here, Spacy + Dask? · Issue #5111 · explosion/spaCy · GitHub.

Or more cores and use processes… I guess it depends whether I need to use the distributed cluster or not, but I’m also seeing better performance with smaller partitions, which would add overhead loading spacy. So tough call.

Not having much luck.

With no shared objects between the functions called by map it still having memory issues, and taking over 3.5x as long to run through the pipelines with the multiprocessing vs processes scheduler. And the Bags are excursively dicts of strings.

I’ve also moved the spacy processing out of dask due to overhead of loading the model and over one hundred thousand patterns for the entity ruler, and using their .pipe on the computed bag.