I am using dask distributed and faced problem of memory leak by using pandas dataframe in dask delayed function.
I create sample delayed functions to describe problem
I created 2 delayed functions and trim method
- One function is reading file and append rows to list, in the end it print length of the list
- Second function is doing the same but using pandas
@delayed def process_file(filename): lines =  with open(filename) as f: for row in f: lines.append(row) print(len(lines))
@delayed def process(filename): df = pd.read_parquet(filename) rows =  print(df.info()) for row in df.iterrows(): rows.append(row) print(len(rows))
import ctypes def trim_memory() -> int: libc = ctypes.CDLL("libc.so.6") return libc.malloc_trim(0)
from dask.distributed import Client cluster = "tcp://localhost:18786" client = Client(cluster) a = process('path_to_file') f = client.compute([a]) try: for done_work in as_completed(f, with_results=True, raise_errors=True): pass except Exception as e: client.cancel(f) finally: del f
Before running tasks memory state of worker is 125.12MiB
I launched task for first function without using pandas for reading big file and memory state became is
After that I run first function for reading small file and memory stays the almost the same
I understand it is memory management in Linux. Or as said in this article
high water behavior
After that I called trim method and memory returned to initial stage 127.67 MiB
I run pandas couple times and memory usage became 224.07 MiB
Calling trimming did not return back to the initial value, worker still use 193.59MiB
I tried to memory profiling, but I couldn’t find something that give me a source of this leakage.
Or maybe I am doing something wrong and using dask in not correct way?
dask scheduler with parameters
scheduler --host localhost --port 18786 --dashboard --dashboard-address 18787 --protocol tcp
worker --host localhost --nthreads 1 --nworkers 1 --name worker-one-sample --memory-limit 1024Mib --dashboard --dashboard-address 18788 --protocol tcp --local-directory /tmp tcp://127.0.0.1:18786
OS Ubuntu 22.04, Python 3.9.16, dask and distributed 2023.9.2