Why I get a lot of unmanaged memory?

I agree with you, but the problem still the same, if the image array array is too large, it can cause distributed memory issues, right ?

I worked on it and this code works well without using the streaming mode :

@dask.delayed
def execute_stages(stages):
    readers_stage = stages.pop(0)
    readers = readers_stage.pipeline()
    readers.execute()
    arr = readers.arrays[0]

    for stage in stages:
        pipeline = stage.pipeline(arr)
        pipeline.execute()
        if len(pipeline.arrays[0]) > 0:
            arr = pipeline.arrays[0]

    return arr


@dask.delayed
def write_cloud(array, stage):
    stage = stage.pipeline(array)
    stage.execute_streaming()

The unmanaged memory completely disapear after the write_cloud function is called and I’m now able to process more than one task per worker, great ! I assume it’s because nothing points on array after the stage.execute_streaming(), right ?

The last problem is when I try to load huge image arrays (so when I’m processing huge clouds), I get a lot of unmanaged memory.

Ideally, Dask should be able to process your image arrays in streaming. Freeing memory when one array is processed. I think the original problem you faced was that the memory was not freed after an array has been processed.

It shouldn’t cause memory issue as long as you don’t process more arrays than can fit on memory at the same time.

I edit my previous message above, and yes, the memory is now freed when one array is processed.

The unmanaged memory grow when I load the array, not when I’m processing it

1 Like

I made some tests executions and I understand what you are meaning. I am now able to process a lot of tasks per workers by setting the memory_limit of my worker to None. This allows me to load many images array at the same time This allows me to load several image arrays at the same time without being bothered by restarting the workers.

I would like to know if it’s a “bad practice” or if it’s a workable solution.

Well it is just a bit weird that the code is failing with some memory limit set, and that you don’t run into memory problems (outside Dask, your OS killing the process) when you set it to None

I encounter problems when there is a limit because of the restart of the workers due to the thresholds. Without setting a limit, these thresholds disappear.

Then I think a better practice would be to try configuring these threshold appropriately, using
https://distributed.dask.org/en/stable/worker-memory.html#thresholds-configuration

This should work if you’re able to process your data with memory_limit to None and do not face any oom_killer or equivalent errors coming from memory.

That version has active memory management by default, I wonder if you are hitting some memory leak coming from somewhere else. I see @guillaumeeb already suggested profiling, this sounds like a good idea to start.