Unable to remove unmanaged memory

I am using dask on Kubernetes, I am submitting tasks to the cluster as futures. After getting the result: future.result() I am running future.release() . Before using the cluster memory usage is around 2Gb after running 14K tasks it goes up to 11Gb and stays there even after futures are released. To clear it out I have also tried to manually execute

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client.run(trim_memory)

I have also tried adding MALLOC_TRIM_THRESHOLD_ env var
I get error from nanny process “Run out-of-band function ‘trim_memory’”
Both of these solutions have not resulted in a reduction of unmanaged memory. I would appreciate any advice.

1 Like

Hi @jadeidev, welcome back!

We’ve add some discussions in the past few weeks about the unmanaged memory, for example:
https://dask.discourse.group/t/memory-limits-reached-in-simple-etl-like-data-transformations

This is often a tricky problem, and it seems you already try some possible solutions.

Here, I’m thinking about two things:

  • Did you try to del the Future Python object once you got the result?
  • It looks like your Workers still have some room in term of Memory usage after the computation, so maybe Python or the OS just don’t bother to reclaim memory space. What happens if you submit new computations after that: does the memory keeps stacking up? Or is it released at some point?

@guillaumeeb thank you for your help. I have tried the suggestions you gave.

  • Deleting the futures doesnt seem to do the trick here (i do wonder how does deleting the future from the client side affect workers?)
  • I have also tried to reduce the cluster memory so i can experiemnt with memory usage, basically what happened is that the memory keeps filling and doesnt get reclaimed. it eventually saturates the cluster memory and workers cant proceed. see image below
  • do you happen to know what does the nanny error mean "“Run out-of-band function ‘trim_memory’”?

As stated here:

In this way, your local variables define what is active in Dask. When a future is garbage collected by your local Python session, Dask will feel free to delete that data

Do you have a small reproducer of this issue? It might come from your workflow, maybe you’re keeping something on Workers memory without noticing it?

Do you have a complete stacktrace of the error?

Hi,
This is from the worker’s log, not the nanny, and it’s not an error.
It’s just saying that you called client.run(trim_memory), as in the snippet you posted. Does it say the function is failing?
If calling trim_memory does not fail and does nothing for you and you are using the basic malloc system, then you don’t have a problem with memory trimming to begin with.

Memory growing and growing nonstop into the tens of GiBs without any managed memory being present is a strong sign of a memory leak - I strongly suspect on your side. Double check what libraries you’re calling from dask and try replacing them with stubs.

1 Like

thank you @crusaderky that is very helpful. I thought when it tells me that it “Run out-of-band” it is failing… I suppose it is the way it is reported.
We are using numpy, scipy, pandas, open-cv to perform analytics on images. What would it mean to replace them with stubs?
Is there some way I can figure out from dask dashboard (or any other way) where memory is leaking from?

  1. in the dask graph, take a function y = f(x)
  2. note down the shape of the output given the output (assuming everything is numpy)
  3. return numpy.ones of the same shape instead

Is there some way I can figure out from dask dashboard (or any other way) where memory is leaking from?

Bisection.

  1. Stop defining your dask graph halfway through.
  2. Instead, wrap things up with distributed.wait(<collection(s) so far>)
  3. If the leak disappears, move to 3/4 of the graph definition and repeat. If you can still see it, move to 1/4 instead.

@crusaderky, thank you for the advice. My use case is very small dask graphs.
This is the way I use Dask to distribute the work we do (perhaps i should have added it in my original question, sorry for that):

# define some function that we need executed in a distributed manner
def func(prams: dict) -> str
     try:
          do something here that uses numpy, scipy, pandas, open-cv
    except:
        return "FAIL"
    else:
        return "DONE"

# define parameters to feed the function with     
parameters = [
    {"a":"b1","c":"d1"},
    {"a":"b2","c":"d2"},
    {"a":"b3","c":"d3"},
    ...
]
futures = []
# submit work to cluster
for p in parameters:
    future = client.submit(func, p)  
    futures.append(future)

# wait until everything is done
wait(futures, timeout=1000, return_when="ALL_COMPLETED")

# gather and save results
for f in futures:
    if future.status == "finished":
        future_result = future.result()
        # save the result somewhere
    future.release()

I can see how func would leak memory and I believe that’s where we need to investigate the memory leak, perhaps putting some memory tracers there… any advice helps.
How would i go about deleting futures in this type of flow? would I go and delete the list?

You’re keeping the output of all of your tasks in memory on the workers until all tasks are finished.
You should use gather, as_completed or add_done_callback to get rid of the outputs as soon as they are created instead of keeping the output of every future in memory until all futures are done.

I have the strong suspicion that dask.sizeof.sizeof may return a vastly underestimated output for your futures. This will cause the memory to be reported as unmanaged instead of managed.
Read: Extend sizeof — Dask documentation

1 Like