Why my managed memory is zero or KB?

Hi

I use k8s cluster to run the dask tasks, and from scheduler dashboard i always see managed memory is zero or few KB. But I can see bytes are stored? did i set something wrong with Dask settings?


dataframe:
  convert-string: false

distributed:
  worker:
    transfer:
      message-bytes-limit: 64MB
    memory:
      recent-to-old-time: 30s
      spill: 0.6
      terminate: false
      rebalance:
        measure: process
  comm:
    retry:
      count: 10
    timeout:
      connect: 30
  p2p:
    disk: true

  scheduler:
    active-memory-manager:
      start: true
      interval: 2s
      measure: process
      policies:
        - class: distributed.active_memory_manager.ReduceReplicas
    worker-saturation: 0.7

logging:
  version: 1
  handlers:
    file:
      class: logging.handlers.RotatingFileHandler
      filename: /mnt/shared/workers.log
      level: INFO
    console:
      class: logging.StreamHandler
      level: INFO
  loggers:
    distributed.worker:
      level: INFO
      handlers:
        - file
        - console
    distributed.scheduler:
      level: INFO
      handlers:
        - file
        - console

  temporary-directory: /mnt/shared/output

Hi @Sam,

This is not necessarily a problem, see Worker Memory Management — Dask.distributed 2024.3.1 documentation or Tackling unmanaged memory with Dask.

This mainly means that this memory is not known by Dask, as input or outputs of your tasks.

This can become a problem if this memory is never freed, which could mean you have some leak somewhere. In your case, we can also see a very unbalanced memory load across workers, but this might be due to your workload.

Hi @guillaumeeb ,

Thanks for info. I do have many data in object types, and i can know it might cause usage calculation wrong, but it looks something strange. Even only my ID which is int64 could still reach at least few hundred MB. But none of workers showed that.

BTW, Regarding to unbalanced memory load across workers, is that related to unbalance division size? or what could cause it? How can i make my work load more balance?

Because generally my workflow is load data by dd.read_parquet with block size → process data by dataframe map_partitions, apply, merge → save results

Sam

It’s hard to tell without a clearer view of what you are really doing. Merge step, depending on the resulting grouping, can cause unbalance memory usage. It depends also on how you save the results.

However, if you are really using Dataframe objects, the unmanaged memory shouldn’t be so high, as this is a Dask collection.

Hi @guillaumeeb,

Thanks for the info again, and i didn’t especially make any result group. I read parquet with fixed block size, e.g. 32MiB or 64MiB, and the parquet file also has fixed row group size 64MiB. Since i used dd.read_parquet into Dask dataframe, everything are in Dask collections

Most of my operations on Dask collections are read_parquet/to_parquet, map_partitions, apply, delayed.

That is one reason i wonder why the un-managed memory is that high.

One information I read from some pages is about the python object (structured data type or string) is sort of part of un-managed memory. However, it bother me because it causes the spill function almost not working.

But sometimes some worker looks it has right amount, see third worker from bottom

My speculation is about structured data make partition size unbalance, and i am still finding

From what you described, I don’t see a reason for this behavior, but we would need a reproducer to understand where the problem might be.