Reduction over futures

Hello experts,

What is the best way to run a reduction operation (eg sum) over a list of futures as they complete? My situation is as follows

  • I have a large dataset, split across many files
  • I have a function which:
    - Takes as input a file name
    - Does all the I/O
    - Processes the file into histograms, which are encoded as some large numpy arrays
  • I use client.map(process_fun, filenames) to scale this processing onto a slurm cluster (set up via dask-jobqueue)
  • I would then like to gather all of these arrays and sum them together to obtain my final results

I would like for the summing operation to be:

  • Parallel across the cluster, ideally in some way that intelligently handles the case when multiple workers are on the same physical machine
  • Run as futures complete, because otherwise the intermediate results will fill up all of the available memory

My feeling is that I must not be the first person to try to do something like this (it has a very map/reduce feeling to it), so I am hoping there is a standard solution to this that I have somehow overlooked in the dask documentation ,or otherwise some typical approach that I should be taking.

Thanks in advance for you advice and assistance

Hi @ssrothman, welcome to Dask community!

I think that you are looking for from_delayed, which allows you to build a Dask array from Delayed or Future and then perform array operations.

If this does not work for you, please provide some reproducer so we can better understand your need!