Read_csv Unmanaged Memory

Hello,

when I load data from .csv files, I get problematic unmanaged memory that I can’t get rid off :

df_read = ddf.read_csv(file, keep_default_na=False, thousands=' ', dtype=dtypes, parse_dates=dt_cols, dayfirst=True)
log.unmanaged_mem()

Before persist (MiB) :

unmanaged / total / managed : (194.13, 194.13, 0.0)

Persist :

df2 = df_read.persist()
log.unmanaged_mem()

unmanaged / total / managed : (225.13, 268.21, 43.07)

Try to clean, but no effect :

del df_read
client.run(_trim_memory)
log.unmanaged_mem()

Out [26]: (225.8, 268.88, 43.07)

with :

def _trim_memory() -> int:
    gc.collect()
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

How can I deal with the unmanaged memory ?

I work on Linux with LocalCluster / 1 CPU worker

Thanks for your lights !

Hi @frenco,

Thanks for this post and the details. Some questions about it: what is this unmanaged_mem() function? Could you produce a full reproducer with fake data?

You say problematic: how is that? Does it makes you code fail at some point?

You’ve probably read the documentation about unmanaged memory since you are trying to free it. You can see there that a lot of reasons might cause unmanaged memory to grow. It is hard to be sure if it is a memory leak, and I have to admit that I doubt it is one when using basic DataFrames API…

Does your unamanged memory grows every time you read a DataFrame? Or is it just occurring the first time you are reading CSV files?

Hi @guillaumeeb ,

thank you so much for your reactivity.

Here is a standalone example (you must define your own csv path) :

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
import dask.dataframe as ddf
import psutil
from dask.distributed import LocalCluster, Client
from time import sleep

#%%
def unmanaged_mem(client, unit=2**20, decimal=2, before=None):
    # Mesure mémoire workers
    wk_mem_info = client.run(lambda dask_worker: {
        'rss': psutil.Process().memory_info().rss,
        'managed': dask_worker.data.memory.total_weight,
        'limit': dask_worker.memory_manager.memory_limit
        })

    # Agrégation
    mem_info = {'rss': sum(info['rss'] for info in wk_mem_info.values()),
                'managed': sum(info['managed'] for info in wk_mem_info.values()),
                'limit': sum(info['limit'] for info in wk_mem_info.values())
                }
    out = {'unmanaged': round((mem_info["rss"] - mem_info["managed"])/unit, 2), 
           'managed': round(mem_info["managed"]/unit, 2), 
           'total': round(mem_info["rss"]/unit, 2)
            }
    
    if before:
        out = {k: round(out[k] - before[k],2) 
               for k in ['unmanaged', 'managed', 'total']
               }
        
    if out['managed'] :
        out['ratio (%)'] = round(out['unmanaged']/out['managed']*100,1) 
    return out
    
#%%
if __name__ == '__main__':
    
    print('Create CSV files')
    path = '/home/fcs/MODELS/ALM_TOOL/TABLES/ECON/'
    files = [f'ESG_CENT_YIELD_test_{i}.csv' for i in range(4)]
    df = pd.DataFrame({'term': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 30, 40, 50],
                       0: [1, 0.965951326, 0.931902653, 0.897853979, 0.863805305, 0.829756631, 0.795707958, 0.761659284, 0.72761061, 0.703427336, 0.679244063, 0.655060789, 0.630877515, 0.606694241, 0.582510967, 0.558327694, 0.53414442, 0.509961146, 0.485777872, 0.483359545, 0.480941218, 0.456757944, 0.43257467, 0.408391396]
                       })
    rows = df.shape[0]
    for i in range(1,51):
        df[i] = df[0]

    df = pd.concat([df] * 6, ignore_index=True)
    df['SENSI'] = df.index // rows 
    N = 2000
    df = pd.concat([df] * N, ignore_index=True)
    df['SIMULATION'] = df.index // (rows*6) + 1

    for col, desc in {'ECONOMY': 'EUR', 'CLASS': 'ZCB', 'MEASURE': 'PRICE', 'CLASS_MEASURE': 'ZCB_PRICE'
                      }.items():
        df[col] = desc
        
    for file in files:      
        print(f'..{file}')
        df.to_csv(path+file, index=False)
    
    print('Read CSV files')
    cluster = LocalCluster(processes=True, n_workers=1,threads_per_worker=10)
    with cluster, Client(cluster) as client:
        df_ = {}
        start = unmanaged_mem(client)
        print('start :', start)
        for file in files:
            df_read = ddf.read_csv(path+file, keep_default_na=False, thousands=' ')
            bf = unmanaged_mem(client)
            df_[file] = client.persist(df_read)
            sleep(5)
            print(f'delta for {file} :', unmanaged_mem(client, before=bf))
            
        print('check managed :', {k:round(sum(v.memory_usage(deep=True))/2**20,2) for k,v in df_.items()})
        print('total :', round(sum( sum(v.memory_usage(deep=True)) for k,v in df_.items())/2**20,2))
        sleep(5)
        print('global delta :', unmanaged_mem(client, before=start))
    

And Here is my output :

Create CSV files
..ESG_CENT_YIELD_test_0.csv
..ESG_CENT_YIELD_test_1.csv
..ESG_CENT_YIELD_test_2.csv
..ESG_CENT_YIELD_test_3.csv
Read CSV files
start : {‘unmanaged’: 175.09, ‘managed’: 0.0, ‘total’: 175.09}
delta for ESG_CENT_YIELD_test_0.csv : {‘unmanaged’: 26.15, ‘managed’: 132.95, ‘total’: 159.1, ‘ratio (%)’: 19.7}
delta for ESG_CENT_YIELD_test_1.csv : {‘unmanaged’: 21.74, ‘managed’: 132.96, ‘total’: 154.69, ‘ratio (%)’: 16.4}
delta for ESG_CENT_YIELD_test_2.csv : {‘unmanaged’: 26.46, ‘managed’: 132.95, ‘total’: 159.42, ‘ratio (%)’: 19.9}
delta for ESG_CENT_YIELD_test_3.csv : {‘unmanaged’: 17.66, ‘managed’: 132.96, ‘total’: 150.6, ‘ratio (%)’: 13.3}
check managed : {‘ESG_CENT_YIELD_test_0.csv’: 132.93, ‘ESG_CENT_YIELD_test_1.csv’: 132.93, ‘ESG_CENT_YIELD_test_2.csv’: 132.93, ‘ESG_CENT_YIELD_test_3.csv’: 132.93}
total : 531.74
global delta : {‘unmanaged’: 94.25, ‘managed’: 531.82, ‘total’: 626.07, ‘ratio (%)’: 17.7}

As you can see unmanaged memory grow every time I read data.
I don’t know how to deal with it.

Thanks again,

François

So I did several tests with your code. Hard to grasp certainties…

First, if you just use compute calls, not persisting things, you don’t see this memory growth. So I guess that a persisted DataFrame that is kept into worker memory generates some other structures to keep.

Second, if you delete the objects, unmanaged memory stays the same, but if you re read them, it doesn’t growth and some is even freed sometimes.

I also try to increase CSV sizes and numbers, not a real effect on the unmanaged size, it growth about the same whe using persist.

I think it is really hard to now for sure from what it comes, especially with a garbage collected language such as Python, and I’m under the impression that this unmanaged memory can be freed at some point when processing is done.

So I’ll ask again:

Is this really a big deal for your, and how?

Hello Guillaume,

thanks again for your testing.

Indeed, it’s not simple to understand the behaviour.

Here is my basis trouble :

I run 40 times a time-dependant projection, expecting being stable in the run time.

Each step, I read data and make calculation.

Here is the tracking of my unmanaged and managed memory, and the corresponding running time.

managed memory is stable : about 700 MiB.
unmanaged memory is growing time after time until I guess it is spilling memory on disk.
calculation time is becoming chaotic from this point

Here is another simulation using trim_memory :

Even if behaviour is not exactly the same, the unmanaged memory is still growing. Even if calculation time is less erratic, it grows also along simulation terms.

That’s why I pointed the read_csv unmanaged memory trouble, that seems to me easier to analyse.

But you’re right, there is a ceiling on unmanaged memory on read_csv. It would only explain 500 MiB in my case…

and the unmanaged memory is indeed linked to the managed memory (278 MiB) as shown below :

… I have to go on on my analysis…