Computing chunks locally before sending to workers with map_blocks

I am using map_blocks with a dask array that is backed by a large amount of local data spread over thousands of files. At the moment, when calling map_blocks with this array with a dask.distributed client, it seems that dask computes the data chunk on each worker before proceeding with the computation with the function passed to map_blocks, which means that each worker needs to have access to the full dataset.

This makes sense of course for many use cases, but I am curious whether there is any way to make dask compute each chunk locally (essentially using the synchronous scheduler) before sending it to each worker. The data is too large to compute in full in memory before calling map_blocks. However, if map_blocks was to compute each chunk on the computer where map_blocks is being called from, for the specific use case I am working on, this would be very fast compared to the actual computation time inside map_blocks. This would then allow me to potentially use workers that did not have much disk space and could not host a copy of the full dataset but had enough memory to process a single chunk.

Note that I am also trying to avoid having to set up network-mounted drives as I want to be able to use geographically distributed workers.

Hi @astrofrog,

Could you add some details on your workflow? How are you reading the files on the first place? Do you have some code snippet?

Here, I assume you mean read each chunk from the Python main script on Client side, and then send the blocks to Workers for further processing? This will still mean a lot of Network IO, it’s generally much more effective to read from Workers at first, but I understand this is complicated in your case.

It should be doable with a mix of Futures and building a Dask Array from_delayed, you’ll have to scatter all the data manually from the Client side. You’ll have to do that carefully to avoid any memory issue though. Maybe there are more suitable solutions, I’m not sure.