I’m using dask distributed on an LSF cluster to downsample a 10TB 3D dataset and save the results to disk. To prevent worker memory from exploding I’m batching my tasks and iterating over batches. This is all working fine, except that a significant fraction of my batches end up hanging on the finalize tasks for ~400s. All the heavy lifting (loading data, downsampling, saving to disk) is done, but these finalize tasks tie up a single worker who has low CPU and memory use for nearly 5 minutes, thereby blocking my next batch.
Since I’m running these tasks for their side effects, I don’t actually need the values returned from the distributed scheduler, so is there some way I can avoid scheduling these finalize tasks in the first place? And why might these tasks be hanging?
Do you have a representative snippet you could share that reproduces this issue? Ideally one that doesn’t rely on 10TB of data, but does show the finalize task .
I wonder if you ran into this issue. If you are using map_partitions somewhere with a dataframe in a keyword argument that can cause undesired all-to-one computations something like what you are describing. But it’s difficult to say more without a more concrete example, so that’s a bit of a shot in the dark.
Unfortunately I don’t have a small reproducer. In broad strokes, I’m loading data from a zarr array, downsampling it via repeated calls to this function, then saving the results with this function . Both of these routines construct highlevel graphs directly… perhaps I’m doing something broken there?
@d-v-b it looks like your write_blocks function is returning 0 for every chunk. (I’m also curious why da.store doesn’t work for you here, and you’ve essentially implemented a different version of it, but that should be a separate discussion.)
I think it’s likely that all the time is being spent transferring the many inputs to finalize (a bunch of 0s) to one worker, and finalize itself runs quickly once those inputs are transferred.
Since they’re all 0s, you’d think they’d be quick to transfer. However, workers have rather complicated logic limiting how many other workers and how much data they will try to transfer at once. So it’s possible that the the one worker running the finalize task (which probably has thousands of input keys) is only slowly dripping out requests for those keys from other workers and being more conservative than it needs to.
If you click on the one blue bar in the “Tasks Processing” chart on the lower-left, and then click on the finalize task that it’s running, I’d be curious to see a screenshot of that. I want to see how many bytes dask thinks the inputs are, and how many inputs there are.
The whole idea of finalize is a little odd. We have a chunked dataset, but when you compute it, rather than sending each chunk back separately, we first transfer all chunks to one worker, concatenate them into one large array/dataframe, then send that single large object back to the client. This add a lot more latency and required more memory than if the results were streamed back. It would be great if you wanted to discuss this case on [Discussion] Streaming results to Client as they become available · Issue #4754 · dask/distributed · GitHub.
I assume you’re doing compute() on your final array. Since you don’t actually need the results (just the side effect), you could do distributed.wait(arr.persist()) instead of compute(). This would run all the tasks, but not try to send their results back to the client, and therefore skip the finalize step entirely.
Thanks for the input @gjoseph92, I love the idea of using persist() here instead of compute(), and if that’s not a game-changer I will look into tweaking the config. Hopefully I can get time soon to iterate on a smaller reproducer for this issue as well.
As for why I re-implemented da.store, in my re-implementation I express the storage of an array of chunks as an operation that returns another array. Thus I can compute slices of this array to only store subregions of the source, which is a hack to get around performance limitations of the dask scheduler. Because da.store returns an opaque delayed object, slicing it isn’t possible. And if there’s every anything useful returned from the storage action, e.g. a status code, keeping that within the NDArray API would be convenient as well.
Do you know the number of chunks you’re writing to disk? That’s probably the number of finalize tasks. And I assume your dashboard screenshot is representative—30-40 workers?
I’m typically using 200-400 workers, and each finalize task corresponds to a set of chunks being written to different zarr arrays (i’m generating multiresolution pyramids here, so the input is single chunk of source data, and the output is 4-8 chunks of downsampled data, which get written to disk, and this happens for tens of thousands of inputs)
Okay. I’m just trying to figure out how to make a test case to reproduce this; I want to see if it’s an issue with the data-fetching algorithm on the worker causing it to be unnecessarily conservative, or getting stuck in a retry or something. If you can estimate the number of inputs each finalize would have, that would help a lot.