Unable to distribute memory to workers effectively with Dask on Modin

I was experimenting using Modin to handle large dataframes with minimal changes.

When I tried dask as an engine to modin (I had no issues with controlling data spills on ray), I observed that there were no spills to disk happening (Note: I want the spills to happen)
Looking at the documentation here: Python API — Dask documentation

My system has a RAM of 240 GB and a disk of 3TB, if I want spills to happen after 70% of my RAM is being used, what should I be doing?

The config I’ve shared below sets the memory per worker and then multiplies it by the cpu_count resulting in massive memory numbers (e.g. 6TB+ if 28 cpu) in the dask dashboard which would result in it never spilling. I believe I am missing something but can’t figure out what, I did not encounter this issue with Ray as modin’s engine.
If there’s any best practices to be followed as well, kindly let me know - thanks!

from dask.distributed import Client, LocalCluster
import dask

cpu_count=28
dask.config.set({'distributed.worker.memory.target': 0.6, 
                     'distributed.worker.memory.spill': 0.7, 
                     'distributed.worker.memory.terminate': 0.95}) 
cluster = LocalCluster(
            n_workers=cpu_count,
            memory_limit='240GiB',
            threads_per_worker=1,
            dashboard_address=9986,
            local_directory=modin_workspace)
        )
client = Client(cluster)

tldr; My system has a RAM of 240 GB and a disk of 3TB, if I want spills to happen after 70% of my RAM is being used (while keeping disk space aside), what should I be doing to configure dask’s cluster?

Hi @sagardevaraju, welcome to Dask Discourse!

When using n_workers=cpu_count, you are starting a LocalCluster with 28 workers. And as you said, you set the memory per worker to 240 GB.

As Dask is using one process per Worker, you cannot set a global memory limit, each process is responsible for its own. So you should do something like: memory_limit='8.5GiB', or compute it based on your cpu_count.

Depending on you workflow, you might also use threads instead of processes, resulting in memory being shared between all threads of a single worker.