Why I get a lot of unmanaged memory?

Hi everyone,

I’m facing an issue since few months related to the very large amount of unmanaged memory on all my workers.

I’m using dask distributed to process some PDAL Pipelines on points clouds through a LocalCluster. I get a single point cloud in input and I cut tiles in it, so I will process a pipeline on each tile. At the beggining of the execution, each worker of my cluster get tiles to process.

The problem is when a worker finish a task, there is a lot of unmanaged memory, about 2GiB after each task computation. So when a worker get more than 1 task, its memory reach ~90% of the memory limit, I get the “Memory not released back to the OS” warning (I’m on windows so I can’t malloc_trim the unmanaged memory) and everything crash. But when each worker get 1 task, everything works fine.

Before computation, all my pipelines are turned into delayed objects (process is my function to process PDAL Pipelines) :

delayedPipelines.append(dask.delayed(process)(p))

So I get a list of Delayed objects called delayed and then I launch the computation like this :

delayed = client.compute(delayed)
dask.distributed.progress(delayed)

Am I using Dask in the wrong way? Or is it a normal behavior ?

My dask configuration:

cfg.set({'interface': 'lo'})
cfg.set({'distributed.scheduler.worker-ttl': None})
cfg.set({'distributed.comm.timeouts.connect': 500})
cfg.set({'distributed.scheduler.worker-saturation': 1.0})
cluster = LocalCluster(n_workers=6, threads_per_worker=1)
client = Client(cluster)

I set the worker-saturation to 1.0 as it is recommended in this article

Hi @ClementAlba,

This is not a normal behavior to have a lot of unmanaged memory. Somehow, your process function must not be releasing objects well. It’s a little hard to tell what is the problem without seeing this function.

In your screenshot, I don’t see unmanaged memory though, is this happening afterwards?

When you launch just 1 task, memory is never freed up after it has executed?

You have explanation on the following link on Worker memory management, that could help:
https://distributed.dask.org/en/stable/worker-memory.html#using-the-dashboard-to-monitor-memory-usage

Also this blog post could help you to understand things:

@guillaumeeb Thanks for your response !

No, the unmanaged memory appears just after a task has been executed. On the screenshot above, it was old unmanaged memory. Here is another screenshot :

@dask.delayed
def process(pipeline, temp_dir=None):
    """Process pipeline and delete the associate temp file if it's not a dry run"""
    if temp_dir:
        with Lock(str(pipeline[1])):
            # Get the temp file associated with the pipeline
            temp_file = temp_dir + '/' + str(pipeline[1]) + '.pickle'
        # Execute the pipeline
        pipeline[0].execute()
        del pipeline
        try:
            # Remove the temp file
            os.remove(temp_file)
        except FileNotFoundError:
            pass
    # Don't need to get and suppress the temp file if it's a dry run
    else:
        pipeline[0].execute()
        del pipeline

I have already tried to remove the pipeline object in my process function by placing del pipeline after the pipeline[0].execute() statement. This doesn’t work, there is still a lot of unmanaged memory.

When I run a single task, the unmanaged memory still there, but on the dashboard everything disappears after the code finish running.

I’ve read a lot of articles about why unmanaged memory stays in distributed memory and one of the common causes is that there are always objects referencing the task, so it can’t be collected. I think the objects that reference the tasks on my workers are the futures created after I called client.compute(delayed). But if this is the case, how can I access the Future/task to dereference it? I’ve already tried to do this by creating a WorkerPlugin but I couldn’t get the task that a worker just processed.

Hm, in my opinion you shouldn’t have to do this.

I just caught something in your first code snippet, are you really using the syntax delayed = client.compute(delayed)? You should be calling delayed = client.compute(*delayed), right?

There are other things I don’t fully get yet. I’m not familiar with PDAL:

  • When is your data loaded? Inside the pipeline.execute() call? I guess you are either loading or creating a lot of data in this method?
  • In general, what’s undergoing into the pipeline.execute() method?
  • In your Dashboard screenshot, the memory bars are actually of a good color: they are not grey. For me, it’s meaning that the memory could be cleaned at some point. I see you configured your Workers to have only one thread, so one task at a time. You run into trouble in this case? When a Worker finishes the first execution and begins the second one, it fails with memory error?
  • Finally, did you try to run with less Workers with more memory each to see what’s happening?

I think delayed = client.compute(delayed) is the right way (Managing Memory — Dask.distributed 2024.8.2 documentation)

Yes, my data is loaded when the execution of the pipeline start. So when pipeline.execute() is called.

When the first task of a worker is finished, I get a lot of unmanaged memory that is not been collected, so if a second task run on the same worker, the addition of the unmanaged memory of the two tasks will make the worker fails with memory error.
But when a worker have to execute just one task and no other after it, everything works fine.

Yes, I can get around the problem in this way, with more memory on my workers I can run more than one task but the large amount of unmanaged memory still there. I want to launch as many workers as I want and run more than one task on each without making the distributed memory blow. But I don’t know if it’s possible.

I don’t understand why I have this amount of unmanaged memory, because at the end of the pipeline execution, the result is a file that is written directly to a folder on the disk. Normally, nothing goes into memory.

Do you think the unmanaged memory is due to the data being loaded into my process function?

I did not see something about it on your list, however, if you look here, you’ll see the syntax I recommend. And probably on dask examples site too.

Thanks for the clarification!

Does the memory accumulate with each task, it keeps growing up after each task has been executed?

I don’t understand either yet. There should be no problem to load data in your process function, this is kind of what Dask is doing when you use collections like DataFrame and Array. It is a good practice! You should make sure you don’t have anything that remain into memory into this function though, maybe due to the PDAL library?

@ClementAlba May I ask what version of dask/distributed are you using?

@guillaumeeb After the execution of each task owned by a worker, the unmanaged memory don’t grow up. It just stay on the distributed memory.

Maybe the “type” of data ? All of the examples I found on the net shows huge datasets but all of the data is loaded from a csv. However, in my process function I launch a PDAL pipeline which load a file, I don’t load data from a csv or something like that. Do you think it can be the reason of my problem ?

Hi @ncclementi, I use distrbuted 2022.12.1 and dask 2022.12.1

The type should not be a problem, you can load huge images or multi dimensional arrays through Dask Array, processing them by chunk in a streaming way, with no problem of unmanaged memory.

Did you try to do some memory profiling of your PDAL pipeline without using Dask?

Did you try the syntax delayed = client.compute(*delayed) on your main code?

I discover PDAL has a “stream mode” for pipeline execution, and with this execution mode I don’t have any unmanaged memory, even with huge files the execution success ! The problem is this “stream mode” can’t be applied on all pipelines. this short paragraph explain it well.

Yep, same problem.

I also tried to don’t use distributed, so process my pipelines on my computer thread pool and not on a cluster. The execution has fail.

Currently, I don’t use Dask Arrays to store my delayed objects, I use a standard Python list. You mentionned Dask Arrays can process my data in a streaming way. Do you think replace this Python list by a Dask Array can help me to handle unmanaged memory ? Or at least, or at worst, be good practice for using Dask ?

Okay, so this is not a valid options for all cases. But this might underline a problem when using PDAL Python extension in “normal” mode. I see that it’s just a wrapper around a C++ library, maybe there is some problem with how memory is managed between Python and C++? Something not well released on C++ side even if Python objects are cleaned?

Well, looking again to the code, it seems delayed process execution isn’t returning anything, and all the IO and process is handled by PDAL, so I don’t see a need or a correct way of using Dask Array. There’s no problem on using Delayed for this. I think you definitly should try to profile PDAL a bit, see how memory is evolving when using its python API. Maybe just try the code with a for loop instead of using Delayed and Dask.

By looking more into PDAL, I discover that I can open the file containing the point cloud and use it as a Numpy array in my code. I think this can finally solve my problem.

This is my code using point clouds as Numpy arrays :

@dask.delayed
def process(stages, pc):
    results = []
    for i in range(len(stages)):
        for item in pc:
            stage = stages[i].pipeline(item)
            stage.execute()
            results.append(stage.arrays[0])
            del stage
        pc = iter(results)


@dask.delayed
def read_cloud(readers):
    readers = readers.pipeline()
    iterator = readers.iterator()
    return iterator

delayed = []
it = read_cloud(readers_stage)
result = process(pipeline_stages, it)
delayed.append(result)

In the read_cloud function I read my point cloud using the streaming mode of PDAL. In the iterator variable I get a Python iterator containing all the point cloud chunks (represented by Numpy arrays) generated by the PDAL streaming mode. Then in process I execute each stages of my pipeline on all the arrays. But there is a problem : add Numpy arrays to the results list generate growing unmanaged memory.

Do you know how can I solve/work around this problem ? Should I use Dask Arrays or something like that instead of standard list for the resultsvar ?

I’ve got several comments or questions.

First of all, I’m not sure there is a point of processing stages in streaming since in the end you are keeping all the results in memory. It would simplify code to just apply stages.

Some remarks about the code:

  • I’m not sure there is a point in splitting things in two function. Especially because the first one is only returning an iterator, probably not even reading something until this iterator is used.
  • process function is not returning anything, is that normal?
  • If I understand correctly, in the process function, you are adding to results list every results of every pipeline stages of every images chunks. I’m not sure this is intended?

Final question: what are you doing in the end with all the results? What append to your final points cloud?

Okay I will try with read_cloud and process inside a single function.

No it’s not, I will change it. Normally it should return the results list.

I try to turn this list into an Python iterator to iterate over the results of the first stage when I will execute the second. To execute a stage, PDAL needs the result of the previous one.

The list should contain all the results of the LAST stage, and I will concatenate all these results (represented by Numpy arrays) to write the final cloud using a PDAL stage named writers.

In the code above, it looks like it is keeping the results of all the stages.

But currently, what are you doing with this outputs? If you keep a pointer to the delayed tasks, Dask will just keep the tasks results into workers memory. Could you show the end of the code?

I change my code a little bit, separating the stages executions of the writing stage. Maybe it will be more comprehensible:

@dask.delayed
def execute_stages(stages):
    readers_stage = stages.pop(0)
    readers = readers_stage.pipeline()
    iterator = readers.iterator()
    arrays = [*iterator]

    for stage in stages:
        for i in range(len(arrays)):
            pipeline = stage.pipeline(arrays[i])
            pipeline.execute()
            arrays[i] = pipeline.arrays[0]

    return arrays


@dask.delayed
def write_cloud(arrays, stage):
    full_array = np.concatenate(arrays)
    stage = stage.pipeline(full_array)
    stage.execute_streaming()


results = []
writers = stages.pop()
arrays = execute_stages(stages)
result = write_cloud(arrays, writers)
results.append(result)

In execute_stages the arrays list contains the Numpy arrays representing my cloud after all the stages has been executed. Then, to write the cloud as a file I use a PDAL writers (the last stage) but before that I have to concatenate all my arrays.

The main problems in this code are the arrays = [*iterator] and full_array = np.concatenate(arrays) statements. Because it generate a lot of unmanaged memory with large datasets.

To solve these problems, I tried to not transform my iterator in a list, so avoid the arrays = [*iterator] statement but with this method I can’t iterate many times over my iterator, just once. It is a problem because if I have for example 3 stages, only the first one will be executed.

I ran this code on small points clouds and everything works well because the iterator does not contain a lot of Numpy arrays, so the transformation to a list or the concatenation of all the arrays don’t take to much memory.

Do you know if there is a way using Dask or Dask Arrays to transform an iterator to a list or to concatenate many Numpy arrays ?

The code you provided still does not show what you are computing using Dask. Do you have several pipelines, several stages to execute on different input files? Or here are your trying to launch just one pipeline/stages list on a single input?

If I take the hypothesis you are trying to parallelize processing over a single input points cloud, then ideally, you should read the data by chunks, e.g. cut tiles as you said at first, and have each chunk be processed independently. Maybe this is already what you’re doing, but I can’t see it right now.

What I’m seeing currently, is a single PDAL reader, that reads by chunk using an iterator. I think you could very well stream the processing using this iterator, but int the end, you’ll keep all the resulting arrays into a single worker memory. Something like:

def execute_stages(stages):
    readers_stage = stages.pop(0)
    readers = readers_stage.pipeline()
    iterator = readers.iterator()
    arrays = []
    for array in iterator:
        for stage in stages:
            pipeline = stage.pipeline(array)
            pipeline.execute()
            array = pipeline.arrays[0]
        arrays.append(array)

    return arrays

The other problem is that if your writer needs a full array to be able to write, then I don’t see how you can bypass full_array = np.concatenate(arrays) statement.

In order to take full advantage of Dask, you should be able to process every chunk of data from the Reader’s iterator independently, and write it independently to disk. But as Dask is distributed processing, you wont be able to achieve that with and Iterator interface.

Oh sorry, I have several stages to execute on different input files.

So I should not use the iterator provided by PDAL ? Just read my cloud in standard mode and use the whole Numpy array instead ?

Okay, so I think we should try to parallelize things at this level first. I understand you tried to use streaming mode to have a lower memory footprint, but since at one point you need a whole image array into memory, I think there is no point doing this.

I think we should try this simpler thing first yes.