Delayed functions memory leak by using pandas Dataframe

Hi everyone

I am using dask distributed and faced problem of memory leak by using pandas dataframe in dask delayed function.

I create sample delayed functions to describe problem

I created 2 delayed functions and trim method

  • One function is reading file and append rows to list, in the end it print length of the list
  • Second function is doing the same but using pandas

First function

@delayed
def process_file(filename):
    lines = []
    with open(filename) as f:
        for row in f:
            lines.append(row)
    print(len(lines))

Second function

@delayed
def process(filename):
    df = pd.read_parquet(filename)
    rows = []
    print(df.info())
    for row in df.iterrows():
        rows.append(row)
    print(len(rows))

Trim method

import ctypes
def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

calling client

from dask.distributed import Client
cluster = "tcp://localhost:18786"
client = Client(cluster)
a = process('path_to_file')
f = client.compute([a])

try:
    for done_work in as_completed(f, with_results=True, raise_errors=True):
        pass
except Exception as e:
    client.cancel(f)
finally:
    del f

Before running tasks memory state of worker is 125.12MiB

I launched task for first function without using pandas for reading big file and memory state became is
510.70 MiB

After that I run first function for reading small file and memory stays the almost the same
511.30 MiB

I understand it is memory management in Linux. Or as said in this article

high water behavior

After that I called trim method and memory returned to initial stage 127.67 MiB

I run pandas couple times and memory usage became 224.07 MiB

Calling trimming did not return back to the initial value, worker still use 193.59MiB

I tried to memory profiling, but I couldn’t find something that give me a source of this leakage.
Or maybe I am doing something wrong and using dask in not correct way?

My setup
dask scheduler with parameters

scheduler --host localhost --port 18786 --dashboard --dashboard-address 18787 --protocol tcp

dask worker

worker --host localhost --nthreads 1 --nworkers 1 --name worker-one-sample --memory-limit 1024Mib --dashboard --dashboard-address 18788 --protocol tcp --local-directory /tmp tcp://127.0.0.1:18786

OS Ubuntu 22.04, Python 3.9.16, dask and distributed 2023.9.2

Warm regards

Hi @yesdslv, welcome to Dask discourse forum!

So I generated a CSV file using this code:

with open(my_big_csv.csv', 'w') as csvfile:
    for i in range(10_000_000):
        csvfile.write("value1,value2,value3,4,5,6\n")

and tried to play with your example.

With that, I cannot even run process delayed function on an 8GiB Worker. The object returned by df.iterrows seem to fill up the worker memory.

So the two pieces of code are not equivalent, one is using much more memory than the other.

Anyway, I’m not sure if all that is worth it, Python memory management is really complex, do you really need such a level of precision?

Hi,
Thank you, glad to be here.

Please decrease number of rows in file:

with open(my_big_csv.csv', 'w') as csvfile:
    for i in range(80_000):
        csvfile.write("value1,value2,value3,4,5,6\n")

Yes, they are not equivalent. My point is to demonstrate problem of dask and pandas interaction, pandas dataframe and series references are hold in worker after task completion.

I tried

But it still gradually increasing.

My colleague has a bigger experience working with dask. She said it is better not to use pandas within delayed functions. So it is what it is

When you first run something on a worker, all the necessary global state - namely, module imports - will be loaded as a one-off and not released when the tasks end.
This is a deliberate design feature of any long-running Python service - in the extremely likely event that you run something else afterwards that requires the same modules, they will already be in memory.

This isn’t a leak. A leak is when you run your task thousands of times, release the output after every iteration, and you see the baseline memory usage steadily increase.
However, please read: Mild memory leak in dask workers · Issue #8164 · dask/distributed · GitHub