How to troubleshoot / optimize `n_partitions` + `partition_size` for `dask.bag`?

Hi all,

I maintain a library that uses dask.bag to parallelize processing datasets of audio files, converting them into spectrograms that are then used to train machine learning models.

I have an issue that I think is related to running out of memory with dask.bag. Feedback I have from a user seems to be they can fix the issue, or at least avoid a crash, by setting n_partitions manually. I’ll give more detail below.

My questions are:

  • how can I troubleshoot this issue and confirm that the root problem is related to memory?
  • If it is related to memory, are there methods to find an optimal n_partitions or partition_size programatically, so I don’t need to ask the user to do it?

Ok, more detail:
AFAICT I am following best practices by having dask.bag operate on a list of filenames; the function that I apply with the bag.map method does all the work of opening the files and then making spectrograms, etc.

I have a user that had the code using dask.bag crash on their dataset, but they were able to avoid this by manually setting n_partitions=20.

As I understand it, the default is “around 100”.
(I would link to dask.bag docs here but new users can only put 2 links per post)

My best guess for what’s happening here is that it’s some memory issue; my reasoning for this is that their files are much larger than what we’ve usually run with, ~100 MB instead of the typical 2-10MB.

But the traceback reports concurrent.futures.process.BrokenProcessPool – there’s nothing about memory in the error itself.

The full traceback they get is:

$ vak prep train_config.toml 
2022-10-14 22:59:43,745 - vak.cli.prep - INFO - Determined that purpose of config file is: train.
Will add 'csv_path' option to 'TRAIN' section.
2022-10-14 22:59:43,745 - vak.core.prep - INFO - purpose for dataset: train
2022-10-14 22:59:43,745 - vak.core.prep - INFO - will split dataset
2022-10-14 22:59:44,315 - vak.io.dataframe - INFO - making array files containing spectrograms from audio files in: data/WAV
2022-10-14 22:59:44,319 - vak.io.audio - INFO - creating array files with spectrograms
[                                        ] | 0% Completed | 12.69 sms
Traceback (most recent call last):
  File "/home/hjalmar/callclass/bin/vak", line 8, in <module>
    sys.exit(main())
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/__main__.py", line 45, in main
    cli.cli(command=args.command, config_file=args.configfile)
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/cli/cli.py", line 30, in cli
    COMMAND_FUNCTION_MAP[command](toml_path=config_file)
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/cli/prep.py", line 132, in prep
    vak_df, csv_path = core.prep(
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/core/prep.py", line 201, in prep
    vak_df = dataframe.from_files(
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/io/dataframe.py", line 134, in from_files
    spect_files = audio.to_spect(
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/io/audio.py", line 236, in to_spect
    spect_files = list(bag.map(_spect_file))
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/bag/core.py", line 1480, in __iter__
    return iter(self.compute())
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/multiprocessing.py", line 233, in get
    result = get_async(
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Any feedback you can give would be much appreciated!

This is the part of the dask docs I’m looking at
https://docs.dask.org/en/stable/bag-creation.html?highlight=npartitions#db-from-sequence

I am aware of other “best practices” pages but I guess the issue is I don’t even have a good mental model of what dask is doing that would help me to start troubleshooting. Based on the “drawbacks” section of the bag docs I guess I’m hitting some of the pain points of multiprocessing