Slow down transfer between workers

Dear Dask community,

I have a situation where several workers are reading data from disk. This data should then be processed by other GPU-accelerated workers. This system works as intended for small datasets. However, for large datasets, it goes wrong. The data reading workers are usually much faster than the GPU-acc workers. This leads to many objects in memory waiting to be processed, which would not be a problem if the data reading workers would keep those objects in their own memory. Unfortunately, they immediately transfer the objects to the GPU-acc workers, who cannot keep up, start having memory issues and spill to disk (which I want to avoid).

I have tried to avoid this by tinkering with the distributed.memory config settings, for example by setting the pause value to 0.5. It seems like this did stop the transfers, but it also caused the GPU-acc workers to stop processing the data…

Any ideas on how I can tell the scheduler to not immediately transfer the data to the GPU-acc workers?


This is a known issue and it’s currently in the final stages of development in Withhold root tasks [no co assignment] by gjoseph92 · Pull Request #6614 · dask/distributed · GitHub.
We would love to have some early feedback on it; could you try out the PR’s branch and report back on the PR itself?

1 Like

That seems very promising! Thanks for pointing me to that PR. I will give it a go.

1 Like

@MaximLippeveld FYI you’ll need to set the config distributed.scheduler.worker-saturation: 1.0 to see any effect (the changes in that PR are behind a feature flag that’s off by default).

You can also client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", 1.0)) (or float('nan') to revert) when the scheduler is idle and there are no tasks in the system, if you want to see before/after behavior more easily.

1 Like