Worker blocking on memory limit, despite the streaming-friendly pipeline process

Hi,

I’m trying to simulate a simple data flow using Dask DataFrame and the distributed engine. Despite the quite linear style of the processing, allowing streaming-like processing, with limited memory usage - everything blocks when a memory limit is configured.

Thinking that it is more of a DataFrame issue, I’ve posted a question here: Memory limits reached in simple ETL-like data transformations - #5 by guillaumeeb and received quite deep, investigated answer by @guillaumeeb (many thanks!). Example code can be found there.

Couple of unanswered questions remained, regarding the memory management in the Worker, that boil down to “How the distributed Client is handling the memory limit?”:

  1. It shouldn’t lead to a dead-lock because the process is quite linear, and memory allocation can wait (and don’t block any prior operation) until some other block is freed.
  2. The current behaviour could be a symptom of some global lock (common to allocation and freeing?!) which can result in such dead-lock.
  3. I’ve tried the Dask distributed facilities with other libraries (like PyArrow and Polars) with da.delayed and it does limit the memory consumption, but again - with a dead-lock.

Any idea or explanation of the situation is welcome. Thank you!

After checking the code, I’ve, probably, discovered a possible reason for this behaviour.

When a worker reaches the memory limit (or distributed.worker.memory.target fraction of it), and the spill is disabled (like in my experiment), it gives the garbage collector a chance to free something, and if it is not enough - it pauses the worker - the whole process [1].

I was trying to handle this only with threads (processes = False upon cluster initialisation), so pausing the whole process, makes it impossible for anything to get finished, hence - the deadlock and stall.

The only change that I’ve witnessed by enabling processes is that the worker gets restarted (so it reaches the terminate threshold). Perhaps, in this configuration (processes=True) the Nanny worker is still operational on pause, therefore it manages to kill the actual worker [2]

So, my following question is - are these conclusions - [1] and [2] - correct?

Hi @theJonan,

Sorry, I think we misunderstood each other, we could have continued this conversation on the previous topic, but no matter.

I think your conclusions are correct, but does it solve the issue you are facing? I don’t think killing the workers is a good solution. In your code, I believe that Dask should be able to write some result in the output file and free memory for other operations to continue.

Also, it would be interesting to investigate if tips from

or
https://distributed.dask.org/en/latest/worker-memory.html

can help you.