Spilling to Disk

dask==2022.6.1

I have a cluster deployed in a docker swarm; 120 workers with 8GB of memory per worker. Workers are started like this:
dask-worker tcp://scheduler:8786 --name $$WORKER_NAME --nthreads 1 --memory-limit=“8 GiB”

My dataset is approximately 6.2 billion rows contained in parquet files. I’m reading the parquet files, persisting and publishing the dataset:

files = glob.glob(f’{folder}/*.parquet’)
ddf = dd.read_parquet(files)
ddf = client.persist(ddf)
client.publish_dataset(original=ddf)

The problem I’m having is that the workers start spilling to disk when memory usage gets to 2GiB and then continues to spill until memory per work reaches 1.3 - 1.4GiB. I’ve tried everything I can think of to stop the workers from spilling to no avail. Here is the memory config of a worker:

‘memory’: {‘recent-to-old-time’: ‘30s’,
‘rebalance’: {‘measure’: ‘managed_in_memory’,
‘sender-min’: 0.3,
‘recipient-max’: 0.6,
‘sender-recipient-gap’: 0.1},
‘target’: False,
‘spill’: False,
‘pause’: False,
‘terminate’: False,
‘max-spill’: False,
‘monitor-interval’: ‘100ms’},

Any suggestions and/or am I missing something? From what I can see in the dashboard, the entire dataset should fit into memory:

@tallyboy91 Welcome! That does look off. @ncclementi do you have thoughts on what’s going on here?

@gjoseph92 Do you have thoughts on this issue?

@tallyboy91 your config does look like it should prevent spilling. How are you setting that config, though? My first guess here is that the workers aren’t actually picking up your config.

Let’s compare these:

>>> client.run(lambda: dask.config.get("distributed.worker.memory"))
>>> client.run(lambda dask_worker: {a: getattr(dask_worker.memory_manager, a) for a in ["memory_limit", "memory_target_fraction", "memory_spill_fraction", "memory_pause_fraction"]})
>>> client.run(lambda dask_worker: type(dask_worker.data))