Hi everyone
I am using dask distributed and faced problem of memory leak by using pandas dataframe in dask delayed function.
I create sample delayed functions to describe problem
I created 2 delayed functions and trim method
- One function is reading file and append rows to list, in the end it print length of the list
- Second function is doing the same but using pandas
First function
@delayed
def process_file(filename):
lines = []
with open(filename) as f:
for row in f:
lines.append(row)
print(len(lines))
Second function
@delayed
def process(filename):
df = pd.read_parquet(filename)
rows = []
print(df.info())
for row in df.iterrows():
rows.append(row)
print(len(rows))
Trim method
import ctypes
def trim_memory() -> int:
libc = ctypes.CDLL("libc.so.6")
return libc.malloc_trim(0)
calling client
from dask.distributed import Client
cluster = "tcp://localhost:18786"
client = Client(cluster)
a = process('path_to_file')
f = client.compute([a])
try:
for done_work in as_completed(f, with_results=True, raise_errors=True):
pass
except Exception as e:
client.cancel(f)
finally:
del f
Before running tasks memory state of worker is 125.12MiB
I launched task for first function without using pandas for reading big file and memory state became is
510.70 MiB
After that I run first function for reading small file and memory stays the almost the same
511.30 MiB
I understand it is memory management in Linux. Or as said in this article
high water behavior
After that I called trim method and memory returned to initial stage 127.67 MiB
I run pandas couple times and memory usage became 224.07 MiB
Calling trimming did not return back to the initial value, worker still use 193.59MiB
I tried to memory profiling, but I couldn’t find something that give me a source of this leakage.
Or maybe I am doing something wrong and using dask in not correct way?
My setup
dask scheduler with parameters
scheduler --host localhost --port 18786 --dashboard --dashboard-address 18787 --protocol tcp
dask worker
worker --host localhost --nthreads 1 --nworkers 1 --name worker-one-sample --memory-limit 1024Mib --dashboard --dashboard-address 18788 --protocol tcp --local-directory /tmp tcp://127.0.0.1:18786
OS Ubuntu 22.04, Python 3.9.16, dask and distributed 2023.9.2
Warm regards