Shuffle P2P unstable with adaptive k8s operator?

Hello!

we have a dask workload, that also includes a big groupby operation. We are trying to make this scalable using the dask kubernetes operator, with the idea being that big requests automatically trigger more workers which afterwards scale down automatically.

We observe unpredictable and large runtimes. It appears like there is some kind of oscillation happening when the operator decides to scale down workers and p2p shuffle is used. To track this we are using a grafana dashboard which is scraping the metrics exposed by scheduler and worker.

Here is the behavior with the oscillation:

As a comparison here we ran the same workload but instead of 1-6 autoscaled workers we fixed them to 4:

Comparing these two we see that the non adaptive version takes about 3minutes and the adaptive one needs 10min. Unfortunately, it is also hard to reproduce because it depends at what moment the operator scales down the workers. In the screenshot we see about 2 cycles where the tasks increase after workers are scaled down, but in other cases we had up to 8 cycles until it could complete successfully.

As a comparison we also tried to run the workload with the tasks shuffle method, which appears to be more stable:

We are currently using:
dask==2024.2.0
dask-kubernetes==2024.1.0
python3.11

Additionally, we have also attempted to patch the operator to have a longer time until scale down, but we think this would still occur if there are too many tasks “lost” by downscaling.

We are wondering if someone else experienced similar behavior?
Is this a “normal” use case dask together with its k8s operator aims to support or is it more common to fine tune the parameters to a given workload?
If someone experienced similar did you find workarounds or mitigation strategies?

Really any tips or pointers what to look into would be very much appreciated :slight_smile:

Hi @sil-lnagel,

I think Dask should definitly support this use case, even if adaptivity of cluster is always hard to get right. As you say, this seems related to P2P shuffling, I’m wondering if this is because with this mode the Scheduler tracks less tasks informations.

cc @jacobtomlinson @hendrikmakait

Hi @sil-lnagel,

Thanks for your report. P2P and adaptive unfortunately don’t play always play well together. If adaptive scaling decides to retire a worker which currently participates in a P2P shuffle, it causes the entire shuffle to get restarted.

I agree that Dask should support this, but unfortunately it’s not there yet.

2 Likes

Thanks @hendrikmakait for your quick reply! That’s unfortunate but I somehow suspected something like that could be happening.

We were thinking how to mitigate this issue and an idea would be to prevent the operator to retire workers that “participate in the P2P” shuffle. A very conservative version of this would be to prevent retiring workers that currently run any tasks.

The idea would be to poll the worker’s /metrics endpoint prior to retiring it. Besides the downside that this would keep workers alive that do “anything” does that sound like an approach worth trying or like a waste of time?
I was thinking of querying these:

dask_worker_tasks{state="memory"} 10.0
dask_worker_tasks{state="released"} 10.0
dask_worker_tasks{state="ready"} 18.0
dask_worker_tasks{state="executing"} 1.0

Maybe retire only workers that have 0 executing, ready and in memory.

fyi @jacobtomlinson

I managed to create a “minimal example”: GitHub - sil-lnagel/dask-shuffle-p2p

1 Like