Dask Memory Leak Workaround


When using the Dask dataframe where clause I get a “distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS” warning. This happens until the system runs out of memory and swap. Is there a workaround to this or am I doing something wrong. The file I’m reading can be found at Box. You have to read it in with Pandas as save it as a parquet file for Dask to read it.

from dask import dataframe as dd
import dask.array as da
from dask.distributed import Client

from pathlib import Path
import os
file_path = Path('../cannabis_data_science/wa/nov_2021')

client = Client(n_workers=2, threads_per_worker=2, memory_limit = '15GB')

sale_items_df = dd.read_parquet(path = file_path / 'SaleItems_1.parquet', blocksize = '100MB').reset_index()

#this causes the warning
x = sale_items_df.description.where(sale_items_df.description.isna(), sale_items_df.name).compute()

@ufosoftware Welcome!

I think read_parquet might be causing the MemoryError – Dask DataFrame operates lazily, which means the read_parquet is executed only when you call .compute(). I’d suggest using .persist() to make sure read_parquet works:

sale_items_df = dd.read_parquet(path = file_path / 'SaleItems_1.parquet', blocksize = '100MB').reset_index()

If this does raise a memory error, you can check out some optimization techniques given here: Dask Dataframe and Parquet — Dask documentation, like using the pyarrow engine, using appropriate partition sizes, and setting ignore_metadata_file=True.

This blog might also be helpful: Tackling unmanaged memory with Dask | Coiled : Coiled

Thank you for your response. I tried all the suggestions; they delay the occurrence of the memory error and crash but they still occur.