Memory limits reached in simple ETL-like data transformations

First, thanks for the complete reproducer, I was able to run your code and reproduce the behavior you describe!

Some observations or remarks:

  • When I start the LocalCluster and open the Dashboard, I actually see that the Worker is already consuming 256MiB out of 763 available.
  • Running the first path of your code make the occupied memory go up to 612MiB. And the memory is staying occupied after the process as unmanaged memory, I’m not sure why yet.
  • But re running the code does activate memory cleaning and works fine.
  • I’m able to make the code run without staling with 800MB Worker memory by adding some other settings in the configuration (otherwise it stalled with single_file=True:
da.config.set({'distributed.worker.memory.pause': 0.95})
da.config.set({'distributed.worker.memory.target': 0.9})
  • Memory consumption looks like this:
  • But somehow, it has to load all the data into memory before writing in this case, no streaming possible.
  • With single_file=False, Dask streams the writing, but somehow keep partitions into memory, I’m not sure why, so it stalls if lowering the Memory limit. This shouldn’t be related to your workflow as the generated graph indeed looks embarrassingly parallel.

I think it is more on Scheduler and Worker part. Tasks are just paused when reaching a certain percentage of memory limit.

I agree with you, but this is not the case. Memory taken by previously read blocks stays occupied for some reason.

See my screen copy of the Dashboard above. No more blocks to write, no more memory, tasks are pause, impossible to read more data.

If you have some questions on these results, yes please open a new Topic.