Hi @guillaumeeb ,
thank you so much for your reactivity.
Here is a standalone example (you must define your own csv path) :
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pandas as pd
import dask.dataframe as ddf
import psutil
from dask.distributed import LocalCluster, Client
from time import sleep
#%%
def unmanaged_mem(client, unit=2**20, decimal=2, before=None):
# Mesure mémoire workers
wk_mem_info = client.run(lambda dask_worker: {
'rss': psutil.Process().memory_info().rss,
'managed': dask_worker.data.memory.total_weight,
'limit': dask_worker.memory_manager.memory_limit
})
# Agrégation
mem_info = {'rss': sum(info['rss'] for info in wk_mem_info.values()),
'managed': sum(info['managed'] for info in wk_mem_info.values()),
'limit': sum(info['limit'] for info in wk_mem_info.values())
}
out = {'unmanaged': round((mem_info["rss"] - mem_info["managed"])/unit, 2),
'managed': round(mem_info["managed"]/unit, 2),
'total': round(mem_info["rss"]/unit, 2)
}
if before:
out = {k: round(out[k] - before[k],2)
for k in ['unmanaged', 'managed', 'total']
}
if out['managed'] :
out['ratio (%)'] = round(out['unmanaged']/out['managed']*100,1)
return out
#%%
if __name__ == '__main__':
print('Create CSV files')
path = '/home/fcs/MODELS/ALM_TOOL/TABLES/ECON/'
files = [f'ESG_CENT_YIELD_test_{i}.csv' for i in range(4)]
df = pd.DataFrame({'term': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 30, 40, 50],
0: [1, 0.965951326, 0.931902653, 0.897853979, 0.863805305, 0.829756631, 0.795707958, 0.761659284, 0.72761061, 0.703427336, 0.679244063, 0.655060789, 0.630877515, 0.606694241, 0.582510967, 0.558327694, 0.53414442, 0.509961146, 0.485777872, 0.483359545, 0.480941218, 0.456757944, 0.43257467, 0.408391396]
})
rows = df.shape[0]
for i in range(1,51):
df[i] = df[0]
df = pd.concat([df] * 6, ignore_index=True)
df['SENSI'] = df.index // rows
N = 2000
df = pd.concat([df] * N, ignore_index=True)
df['SIMULATION'] = df.index // (rows*6) + 1
for col, desc in {'ECONOMY': 'EUR', 'CLASS': 'ZCB', 'MEASURE': 'PRICE', 'CLASS_MEASURE': 'ZCB_PRICE'
}.items():
df[col] = desc
for file in files:
print(f'..{file}')
df.to_csv(path+file, index=False)
print('Read CSV files')
cluster = LocalCluster(processes=True, n_workers=1,threads_per_worker=10)
with cluster, Client(cluster) as client:
df_ = {}
start = unmanaged_mem(client)
print('start :', start)
for file in files:
df_read = ddf.read_csv(path+file, keep_default_na=False, thousands=' ')
bf = unmanaged_mem(client)
df_[file] = client.persist(df_read)
sleep(5)
print(f'delta for {file} :', unmanaged_mem(client, before=bf))
print('check managed :', {k:round(sum(v.memory_usage(deep=True))/2**20,2) for k,v in df_.items()})
print('total :', round(sum( sum(v.memory_usage(deep=True)) for k,v in df_.items())/2**20,2))
sleep(5)
print('global delta :', unmanaged_mem(client, before=start))
And Here is my output :
Create CSV files
..ESG_CENT_YIELD_test_0.csv
..ESG_CENT_YIELD_test_1.csv
..ESG_CENT_YIELD_test_2.csv
..ESG_CENT_YIELD_test_3.csv
Read CSV files
start : {‘unmanaged’: 175.09, ‘managed’: 0.0, ‘total’: 175.09}
delta for ESG_CENT_YIELD_test_0.csv : {‘unmanaged’: 26.15, ‘managed’: 132.95, ‘total’: 159.1, ‘ratio (%)’: 19.7}
delta for ESG_CENT_YIELD_test_1.csv : {‘unmanaged’: 21.74, ‘managed’: 132.96, ‘total’: 154.69, ‘ratio (%)’: 16.4}
delta for ESG_CENT_YIELD_test_2.csv : {‘unmanaged’: 26.46, ‘managed’: 132.95, ‘total’: 159.42, ‘ratio (%)’: 19.9}
delta for ESG_CENT_YIELD_test_3.csv : {‘unmanaged’: 17.66, ‘managed’: 132.96, ‘total’: 150.6, ‘ratio (%)’: 13.3}
check managed : {‘ESG_CENT_YIELD_test_0.csv’: 132.93, ‘ESG_CENT_YIELD_test_1.csv’: 132.93, ‘ESG_CENT_YIELD_test_2.csv’: 132.93, ‘ESG_CENT_YIELD_test_3.csv’: 132.93}
total : 531.74
global delta : {‘unmanaged’: 94.25, ‘managed’: 531.82, ‘total’: 626.07, ‘ratio (%)’: 17.7}
As you can see unmanaged memory grow every time I read data.
I don’t know how to deal with it.
Thanks again,
François