WARNING - Unmanaged memory use is high

Facing WARNING - Unmanaged memory use is high while running Dask on a Pod.

My application details:
A simple Job which reads log files from S3 bucket. Does some data cleaning and then performs some aggregations to gain certain insights. In this application I am launching a LocalCluster which will run on a Pod and do this log aggregation. Please note I am not using HelmCluster or KubeCluster to solve this problem. Moreover please note the overall log files are around 8GB that needs to be processed by this job.

My Setup Details:

╭──────────────────────┬──────────────────────────────────────────╮
│ CONFIG               │ VALUE                                    │
├──────────────────────┼──────────────────────────────────────────┤
│ RUNTIME              │ AWS EKS 1.25                             │
│ NODES                │ 2 Nodes with 8 Cores 32 GB RAM           │
│ DASK VERSION         │ 2023.3.2                                 │
│ DASK DSITRIBUTED     │ 2023.3.2                                 │
│ PYTHON               │ python 3.8.16                            │
│ RANGER_LOG_LOC       │ s3://ranger-logs/200MB_10_ranger.log.txt │
│ LC_WORKER_MEM_LIMIT  │ 1GB                                      │
│ LC_NUM_WORKERS       │ 4                                        │
│ LC_THREAD_PER_WORKER │ 1                                        │
│ INPUT_BLOCK_SIZE     │ 580000                                   │
╰──────────────────────┴──────────────────────────────────────────╯

My application:

import time, os, warnings, psutil
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client
from tabulate import tabulate

warnings.filterwarnings('ignore')

os.environ['MALLOC_TRIM_THRESHOLD_'] = '65536'

# define our functions:
def extractTableName(resource, res_type):
    if res_type == "@table" or res_type == "@table":
        return ".".join(resource.split("/")[:2])
    else:
        return ""


def top_k_users(dataframe_obj, asset=None):
    if asset is not None:
        ddf_to_process = dataframe_obj[dataframe_obj['asset'] == asset]
    else:
        ddf_to_process = dataframe_obj
    return ddf_to_process.reqUser.value_counts().nlargest(10).to_frame()


def allowed_and_denied_metrics(dataframe_obj, user=None, table_name=None):
    print(f'\nAllowed and Denied counts for User {user} or tableName {table_name}.')

    ddf_to_process = dataframe_obj
    if user is not None:
        ddf_to_process = ddf_to_process[ddf_to_process['reqUser'] == user]
    elif table_name is not None:
        ddf_to_process = ddf_to_process[ddf_to_process['asset'] == table_name]

    allowed_counts = ddf_to_process['allowed'].sum()
    denied_counts = ddf_to_process['denied'].sum()

    print(f"for user: {user} and or table: {table_name} allowed -> {allowed_counts}, denied -> {denied_counts}")


def get_worker_and_threads():
    cores = psutil.cpu_count()
    print("CPU count obtained: ", cores)
    cores = 4 if cores is None else cores

    num_wrks = int(os.getenv('LC_NUM_WORKERS', 4))
    if num_wrks < 1:
        num_wrks = cores

    wrk_thrds = int(os.getenv('LC_THREAD_PER_WORKER', 1))
    if wrk_thrds < 1:
        wrk_thrds = int(num_wrks*0.5)

    return num_wrks, wrk_thrds


def print_env_config():
    th = ['CONFIG', 'VALUE']
    data = []

    data.append(['RANGER_LOG_LOC', os.getenv('RANGER_LOG_LOC')])
    data.append(['DASK_SCHEDULAR_ENDPOINT', os.getenv('DASK_SCHEDULAR_ENDPOINT')])
    data.append(['LC_WORKER_MEM_LIMIT', os.getenv('LC_WORKER_MEM_LIMIT')])
    data.append(['LC_NUM_WORKERS', os.getenv('LC_NUM_WORKERS')])
    data.append(['LC_THREAD_PER_WORKER', os.getenv('LC_THREAD_PER_WORKER')] )
    data.append(['EXPORT_PERF_REPORT', os.getenv('EXPORT_PERF_REPORT')] )
    data.append(['INPUT_BLOCK_SIZE', os.getenv('INPUT_BLOCK_SIZE')] )

    print(tabulate(data, headers=th, tablefmt='rounded_outline'))


def run_profiler():
    # get ranger log dir
    print_env_config()
    log_path = os.getenv('RANGER_LOG_LOC', None)
    print("Reading log files from: ", log_path)
    if log_path is None:
        raise FileNotFoundError('No valid log directory was given. Did you forget to set `RANGER_LOG_LOC` env variable?')

    # init local cluster: must return URL with <HOST>:<PORT>
    sch_endpoint = os.getenv('DASK_SCHEDULAR_ENDPOINT', None)
    if sch_endpoint is None:
        wrks, th_wrks = get_worker_and_threads()
        print(f'Launching local cluster with workers: {wrks} \t and threads per worker: {th_wrks} ')
        client = Client(n_workers=wrks, threads_per_worker=th_wrks,
                memory_limit=os.getenv('LC_WORKER_MEM_LIMIT', '512M')
            )   # local cluster mode
    else:
        print('Connecting to Dask schedular at endpoint: ', sch_endpoint)
        client = Client(sch_endpoint)   # Kubernetes Mode

    # begin processing
    total_time = 0
    df = dd.read_json(log_path, encoding='utf-8', blocksize=int(os.getenv('INPUT_BLOCK_SIZE', 1100000)))

    print("checking dataframe:: \n", df.head(5))

    st = time.time()
    ddf = df[['access', 'action', 'evtTime', 'reqUser', 'resType', 'resource', 'result']]
    meta = ('evtTime', 'datetime64[ns]')

    ddf['evtTime'] = ddf['evtTime'].map_partitions(pd.to_datetime, meta=meta)
    ddf['eventDay'] = ddf['evtTime'].dt.date
    et = time.time()
    print('\n Pre-processing time for data type correction and reducing dataframe:', (et - st), ' s')
    total_time += (et-st)

    # adding new cols and first level agg
    st = time.time()
    ddf['asset'] = ddf.apply(lambda x: extractTableName(x.resource, x.resType), axis=1, meta=(None, 'str'))
    ddf['allowed'] = ddf['result'].apply((lambda x: "allowed" if x != 0 else None), meta=('result', 'str'))
    ddf['denied'] = ddf['result'].apply((lambda x: "denied" if x == 0 else None), meta=('result', 'str'))

    ddf_with_allowed_denied = ddf.groupby(['asset', 'reqUser', 'action', 'eventDay']).agg(
        {'allowed': 'count', 'denied': 'count'}).reset_index().compute()

    et = time.time()
    print('Pre-processing time for 1st level aggregations: ', (et - st), ' s')
    total_time += (et-st)

    print('ddf_with_allowed_denied:: \n', ddf_with_allowed_denied.head(10))

    print('starting to measure time for calculating metrics')
    st = time.time()
    ddf_with_allowed_denied.reset_index()
    ddf_with_allowed_denied.set_index('eventDay', inplace=True)

    print('TOP K users::\n' , top_k_users(ddf_with_allowed_denied))

    allowed_and_denied_metrics(ddf_with_allowed_denied, table_name='default.0-556-50643-X')
    allowed_and_denied_metrics(ddf_with_allowed_denied)

    et = time.time()
    print('Execution time:', (et - st) * 1000, 'ms')
    total_time += (et-st)

    print(f"Total time taken is : {total_time} s")
    client.wait_for_workers()
    client.close()


if __name__ == '__main__':
    time.sleep(1)
    run_profiler()

The YML to schedule this script as a Job in Kubernetes:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: ranger-profiler-local-clst
spec:
  schedule: "*/10 * * * *"
  concurrencyPolicy: Forbid
  failedJobsHistoryLimit: 3
  jobTemplate:
    spec:
      backoffLimit: 1
      template:
        spec:
          containers:
          - name: ranger-profiler-local-clst
            image: public.ecr.aws/f9k4t7e8/mb-demo-repo:dask.k8v7
            imagePullPolicy: IfNotPresent
            resources:
              limits:
                memory: 4Gi
              requests:
                memory: 2Gi
            env:
            - name: RANGER_LOG_LOC
              value: s3://ranger-logs/200MB_10_ranger.log.txt
            - name: LC_WORKER_MEM_LIMIT
              value: 1GB
            - name: LC_NUM_WORKERS
              value: "4"
            - name: LC_THREAD_PER_WORKER
              value: "2"
            - name: INPUT_BLOCK_SIZE
              value: "980000"
            - name: EXPORT_PERF_REPORT
              value: "0"

          restartPolicy: Never

The image is available publicly and anyone can download this. In my first run of the program I never specified any resource limits and I found my program is not progressing after certain time it’ll be OOMKilled by kubernetes. At that time using a dynamic logic I was selecting 4 workers and 4 threads each to create my LocalCluster. So without any memory constraints it’ll be OOM.

I was not sure whats happening on server. So tried running this image locally and watched the /status endpoint to be sure how application is executing locally. It was all good locally but I could see high unmanaged memory.

Then checking logs on server I found that there was warning on server as well:

2023-03-25 17:53:48,345 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 3.06 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:53:48,345 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 3.06 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:53:58,948 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 3.15 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:54:09,038 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 3.55 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:54:11,248 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:35587 (pid=27) exceeded 95% memory budget. Restarting...
2023-03-25 17:54:11,395 - distributed.nanny - WARNING - Restarting worker
2023-03-25 17:55:04,307 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.96 GiB -- Worker memory limit: 3.82 GiB
2023-03-25 17:55:04,555 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 3.07 GiB -- Worker memory limit: 3.82 GiB

Then after a lot of reading and few videos I applied ENV variable fix

os.environ['MALLOC_TRIM_THRESHOLD_'] = '65536'

Then adding worker limits after few retries. Everything was fine locally. Then I tried again on EKS to see if this resolves the problem. With these new changes I noticed that the unmanaged memory warning is not coming and logs are clear. But I am still getting OOMKilled. Even the printing the first 5 records of dataframe do not work.

After this I tried again to see locally if I can fine tune anything else. I noticed in status dashboard that even though I was not getting unmanaged memory usage warning, my comparative memory utilization (dask vs. unmanaged) was quite high.

and I started getting another warning:

2023-03-28 17:00:03,345 - distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
2023-03-28 17:00:03,363 - distributed.utils_perf - WARNING - full garbage collections took 14% CPU time recently (threshold: 10%)
2023-03-28 17:00:03,804 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2023-03-28 17:00:04,268 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2023-03-28 17:00:04,365 - distributed.utils_perf - WARNING - full garbage collections took 15% CPU time recently (threshold: 10%)
2023-03-28 17:00:04,450 - distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
2023-03-28 17:00:04,955 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)

I got fed up with all of these attempts and I wasted quite a lot of time. I decided to process one file at a time. I changed my path in env vars and scheduled Kubernetes Job again. This single file is 215MB and I was sure it’ll work perfectly. It turned out to be quite opposite and I got OOMKilled error again. Job didn’t even print first 5 records of the dataframe.

I am not sure what I am doing wrong here. Any help will be much appreciated.

Hi @matrixbegins,

So if I understand correctly, your code works on your local laptop (or a server), even if it was consuming more memory you would think, but you were able to optimize this a bit. However, the same code does not work on a Kubernetes pod.

I think I see some memory incoherence in your post:

  • On the server log when launching manually: Worker memory limit: 3.82 GiB
  • On your setup in Kubernetes: LC_WORKER_MEM_LIMIT │ 1GB
  • On the Pod config:
              limits:
                memory: 4Gi
              requests:
                memory: 2Gi

Does your code needs worker with 4GB or 1GB per worker is sufficient locally?

The the OOMKilled as you said comes from Kubernetes, I think this just means that Dask is using all the Workers memory + some overhead due to Scheduler/Client/Worker processes. So all that cannot fit in 4 GiB. Maybe Kubernetes is just killing the pod because with your configuration just starting the LocalCluster is requesting too much memory?

@guillaumeeb
The YAML that you see, has been derived after a lot of hit and trials. But I see your point. I am sure I had checked it earlier. But just be sure I am not making this obvious mistake I tried the following settings:

LC_WORKER_MEM_LIMIT = 1GB and workers: 4 and threads per worker: 1

Pod Resource Limits:

limits:
  memory: 24Gi
requests:
 memory: 16Gi

And the file that I chose to process was a single 215MB file. Ideally a single file on my Macbook takes less than 30 secs to get processed with this config. The memory utilization of this app with current setup must not go beyond 4-5 GB. After 5 mins of execution I checked logs and got the following:

distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:33569 (pid=22) exceeded 95% memory budget. Restarting...
...
...
distributed.scheduler.KilledWorker: Attempted to run task read_json_chunk-a30aa7b8-e75a-494e-811d-2e4248c4f931 on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:34459. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

And all 4 workers were killed one by one.

Then I changed the worker memory: LC_WORKER_MEM_LIMIT = 2GB and ran the job again. Unfortunately the results were same.
Thanks for highlighting the mistake. On my local machines the workers are happy to process 8GB of logs with 1GB worker memory limit. I relatively have less experience in Kuberntes and new to Dask. For now I am not sure where to begin diagnosing this issue.

To check if my cluster can run jobs well I scheduled two jobs to test it out. One Busybox image which simply prints a counter inside a loop. Secondly, a simple python scripts that reads a file from S3 and prints the content of it line by line. Both of these jobs worked properly I never faced out of memory error.

So this time, it’s not an OOMKilled triggered by Kubernetes, but Dask saying it doesn’t have enough memory. I think this is better to understand the problem.

Well this is weird, if it’s the same file and same amount of memory, you should have the same behavior. But again, in the logs on your first post, it seems your process was using a lot of memory locally. Did you apply the TRIM_TRESHOLD on Kubernetes too? Next thing I would check is if you have same versions of Python/Dask on your laptop and on your Docker image.

You can also try to use threads more, launching two workers with 4GiB and 2 threads, or even one worker with 8GiB and 4 threads.

To Answer your first question, the ENV variable in MALLOC_TRIM_THRESHOLD_ is being set by the python program itself and I am running same image locally and remotely. I will be happy to increase more resources if I am processing lot of data. But in this last report I am only processing single file of 215MB. In my opinion a 2GB worker memory should be good enough to handle that. Please note the POD memory is now 16GB.

My local laptop also has 32 GB memory.

The point that I was trying to highlight was that Locally with 4 workers with memory limit of 1 GB I can process 8GB of logs. It’s slow of course. But a 200MB file is not being processed with 2 GB worker memory is bonkers.

Another question that I have is, if I am running a ARM based image on a x86_64 architecture host will it affect Dask somehow? I am using Multi-Arch builds and I didn’t get the chance to verify which architecture builds are getting pulled on AWS EKS.

Does this part works on your laptop?
If you’re able to run things with the same image and limits on your laptop, but not on Kubernetes, then I’m at a loss of suggestions, especially with the pod memory you mention.

cc @jacobtomlinson @martindurant.

Well, I really don’t know about that…

We don’t know the details of how your data are stored at rest (unless I missed it), but encoding/compression can be very effectivee, whereas in-memory structures like Pandas store a wholly expanded form for speed of operation. Furthermore, any operation you do, generally requires producing temporary copies of columns, potentially many at a time. Thirdmost, python standard strings always take more memory than you think (you might get some savings using arrow strings). And finally, garbage collection to free unused memory is a tricky thing, and it is hard to predict when memory no longer needed will actually get freed.

All this leads to a general rule of thumb in the documentation, that you should have free memory available for dataframe operations of a least a few times the in memory size.