Facing WARNING - Unmanaged memory use is high while running Dask on a Pod.
My application details:
A simple Job which reads log files from S3 bucket. Does some data cleaning and then performs some aggregations to gain certain insights. In this application I am launching a LocalCluster which will run on a Pod and do this log aggregation. Please note I am not using HelmCluster or KubeCluster to solve this problem. Moreover please note the overall log files are around 8GB that needs to be processed by this job.
My Setup Details:
╭──────────────────────┬──────────────────────────────────────────╮
│ CONFIG │ VALUE │
├──────────────────────┼──────────────────────────────────────────┤
│ RUNTIME │ AWS EKS 1.25 │
│ NODES │ 2 Nodes with 8 Cores 32 GB RAM │
│ DASK VERSION │ 2023.3.2 │
│ DASK DSITRIBUTED │ 2023.3.2 │
│ PYTHON │ python 3.8.16 │
│ RANGER_LOG_LOC │ s3://ranger-logs/200MB_10_ranger.log.txt │
│ LC_WORKER_MEM_LIMIT │ 1GB │
│ LC_NUM_WORKERS │ 4 │
│ LC_THREAD_PER_WORKER │ 1 │
│ INPUT_BLOCK_SIZE │ 580000 │
╰──────────────────────┴──────────────────────────────────────────╯
My application:
import time, os, warnings, psutil
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client
from tabulate import tabulate
warnings.filterwarnings('ignore')
os.environ['MALLOC_TRIM_THRESHOLD_'] = '65536'
# define our functions:
def extractTableName(resource, res_type):
if res_type == "@table" or res_type == "@table":
return ".".join(resource.split("/")[:2])
else:
return ""
def top_k_users(dataframe_obj, asset=None):
if asset is not None:
ddf_to_process = dataframe_obj[dataframe_obj['asset'] == asset]
else:
ddf_to_process = dataframe_obj
return ddf_to_process.reqUser.value_counts().nlargest(10).to_frame()
def allowed_and_denied_metrics(dataframe_obj, user=None, table_name=None):
print(f'\nAllowed and Denied counts for User {user} or tableName {table_name}.')
ddf_to_process = dataframe_obj
if user is not None:
ddf_to_process = ddf_to_process[ddf_to_process['reqUser'] == user]
elif table_name is not None:
ddf_to_process = ddf_to_process[ddf_to_process['asset'] == table_name]
allowed_counts = ddf_to_process['allowed'].sum()
denied_counts = ddf_to_process['denied'].sum()
print(f"for user: {user} and or table: {table_name} allowed -> {allowed_counts}, denied -> {denied_counts}")
def get_worker_and_threads():
cores = psutil.cpu_count()
print("CPU count obtained: ", cores)
cores = 4 if cores is None else cores
num_wrks = int(os.getenv('LC_NUM_WORKERS', 4))
if num_wrks < 1:
num_wrks = cores
wrk_thrds = int(os.getenv('LC_THREAD_PER_WORKER', 1))
if wrk_thrds < 1:
wrk_thrds = int(num_wrks*0.5)
return num_wrks, wrk_thrds
def print_env_config():
th = ['CONFIG', 'VALUE']
data = []
data.append(['RANGER_LOG_LOC', os.getenv('RANGER_LOG_LOC')])
data.append(['DASK_SCHEDULAR_ENDPOINT', os.getenv('DASK_SCHEDULAR_ENDPOINT')])
data.append(['LC_WORKER_MEM_LIMIT', os.getenv('LC_WORKER_MEM_LIMIT')])
data.append(['LC_NUM_WORKERS', os.getenv('LC_NUM_WORKERS')])
data.append(['LC_THREAD_PER_WORKER', os.getenv('LC_THREAD_PER_WORKER')] )
data.append(['EXPORT_PERF_REPORT', os.getenv('EXPORT_PERF_REPORT')] )
data.append(['INPUT_BLOCK_SIZE', os.getenv('INPUT_BLOCK_SIZE')] )
print(tabulate(data, headers=th, tablefmt='rounded_outline'))
def run_profiler():
# get ranger log dir
print_env_config()
log_path = os.getenv('RANGER_LOG_LOC', None)
print("Reading log files from: ", log_path)
if log_path is None:
raise FileNotFoundError('No valid log directory was given. Did you forget to set `RANGER_LOG_LOC` env variable?')
# init local cluster: must return URL with <HOST>:<PORT>
sch_endpoint = os.getenv('DASK_SCHEDULAR_ENDPOINT', None)
if sch_endpoint is None:
wrks, th_wrks = get_worker_and_threads()
print(f'Launching local cluster with workers: {wrks} \t and threads per worker: {th_wrks} ')
client = Client(n_workers=wrks, threads_per_worker=th_wrks,
memory_limit=os.getenv('LC_WORKER_MEM_LIMIT', '512M')
) # local cluster mode
else:
print('Connecting to Dask schedular at endpoint: ', sch_endpoint)
client = Client(sch_endpoint) # Kubernetes Mode
# begin processing
total_time = 0
df = dd.read_json(log_path, encoding='utf-8', blocksize=int(os.getenv('INPUT_BLOCK_SIZE', 1100000)))
print("checking dataframe:: \n", df.head(5))
st = time.time()
ddf = df[['access', 'action', 'evtTime', 'reqUser', 'resType', 'resource', 'result']]
meta = ('evtTime', 'datetime64[ns]')
ddf['evtTime'] = ddf['evtTime'].map_partitions(pd.to_datetime, meta=meta)
ddf['eventDay'] = ddf['evtTime'].dt.date
et = time.time()
print('\n Pre-processing time for data type correction and reducing dataframe:', (et - st), ' s')
total_time += (et-st)
# adding new cols and first level agg
st = time.time()
ddf['asset'] = ddf.apply(lambda x: extractTableName(x.resource, x.resType), axis=1, meta=(None, 'str'))
ddf['allowed'] = ddf['result'].apply((lambda x: "allowed" if x != 0 else None), meta=('result', 'str'))
ddf['denied'] = ddf['result'].apply((lambda x: "denied" if x == 0 else None), meta=('result', 'str'))
ddf_with_allowed_denied = ddf.groupby(['asset', 'reqUser', 'action', 'eventDay']).agg(
{'allowed': 'count', 'denied': 'count'}).reset_index().compute()
et = time.time()
print('Pre-processing time for 1st level aggregations: ', (et - st), ' s')
total_time += (et-st)
print('ddf_with_allowed_denied:: \n', ddf_with_allowed_denied.head(10))
print('starting to measure time for calculating metrics')
st = time.time()
ddf_with_allowed_denied.reset_index()
ddf_with_allowed_denied.set_index('eventDay', inplace=True)
print('TOP K users::\n' , top_k_users(ddf_with_allowed_denied))
allowed_and_denied_metrics(ddf_with_allowed_denied, table_name='default.0-556-50643-X')
allowed_and_denied_metrics(ddf_with_allowed_denied)
et = time.time()
print('Execution time:', (et - st) * 1000, 'ms')
total_time += (et-st)
print(f"Total time taken is : {total_time} s")
client.wait_for_workers()
client.close()
if __name__ == '__main__':
time.sleep(1)
run_profiler()
The YML to schedule this script as a Job in Kubernetes:
apiVersion: batch/v1
kind: CronJob
metadata:
name: ranger-profiler-local-clst
spec:
schedule: "*/10 * * * *"
concurrencyPolicy: Forbid
failedJobsHistoryLimit: 3
jobTemplate:
spec:
backoffLimit: 1
template:
spec:
containers:
- name: ranger-profiler-local-clst
image: public.ecr.aws/f9k4t7e8/mb-demo-repo:dask.k8v7
imagePullPolicy: IfNotPresent
resources:
limits:
memory: 4Gi
requests:
memory: 2Gi
env:
- name: RANGER_LOG_LOC
value: s3://ranger-logs/200MB_10_ranger.log.txt
- name: LC_WORKER_MEM_LIMIT
value: 1GB
- name: LC_NUM_WORKERS
value: "4"
- name: LC_THREAD_PER_WORKER
value: "2"
- name: INPUT_BLOCK_SIZE
value: "980000"
- name: EXPORT_PERF_REPORT
value: "0"
restartPolicy: Never
The image is available publicly and anyone can download this. In my first run of the program I never specified any resource limits and I found my program is not progressing after certain time it’ll be OOMKilled
by kubernetes. At that time using a dynamic logic I was selecting 4 workers and 4 threads each to create my LocalCluster. So without any memory constraints it’ll be OOM.
I was not sure whats happening on server. So tried running this image locally and watched the /status
endpoint to be sure how application is executing locally. It was all good locally but I could see high unmanaged memory.
Then checking logs on server I found that there was warning on server as well:
2023-03-25 17:53:48,345 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.06 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:53:48,345 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 3.06 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:53:58,948 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 3.15 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:54:09,038 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 3.55 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:54:11,248 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:35587 (pid=27) exceeded 95% memory budget. Restarting...
2023-03-25 17:54:11,395 - distributed.nanny - WARNING - Restarting worker
2023-03-25 17:55:04,307 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.96 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:55:04,555 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.07 GiB -- Worker memory limit: 3.82 GiB
Then after a lot of reading and few videos I applied ENV variable fix
os.environ['MALLOC_TRIM_THRESHOLD_'] = '65536'
Then adding worker limits after few retries. Everything was fine locally. Then I tried again on EKS to see if this resolves the problem. With these new changes I noticed that the unmanaged memory warning is not coming and logs are clear. But I am still getting OOMKilled
. Even the printing the first 5 records of dataframe do not work.
After this I tried again to see locally if I can fine tune anything else. I noticed in status dashboard that even though I was not getting unmanaged memory usage warning, my comparative memory utilization (dask vs. unmanaged) was quite high.
and I started getting another warning:
2023-03-28 17:00:03,345 - distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
2023-03-28 17:00:03,363 - distributed.utils_perf - WARNING - full garbage collections took 14% CPU time recently (threshold: 10%)
2023-03-28 17:00:03,804 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2023-03-28 17:00:04,268 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2023-03-28 17:00:04,365 - distributed.utils_perf - WARNING - full garbage collections took 15% CPU time recently (threshold: 10%)
2023-03-28 17:00:04,450 - distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
2023-03-28 17:00:04,955 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
I got fed up with all of these attempts and I wasted quite a lot of time. I decided to process one file at a time. I changed my path in env vars and scheduled Kubernetes Job again. This single file is 215MB and I was sure it’ll work perfectly. It turned out to be quite opposite and I got OOMKilled
error again. Job didn’t even print first 5 records of the dataframe.
I am not sure what I am doing wrong here. Any help will be much appreciated.