RuntimeError: Barrier task with key does not exist. Also - worker exceed 95% memory budget regularly

Hi everyone, I am running into some issues where after running tasks, I get a very large Bytes stored . For reference I have the following dataset

  • merged-case-crossover-environment-data.csv ~6GB
  • APDC_AllRecordsCleaned_WithEnvironment_010701to220331.csv ~58GB

I am running this on local machine

  • AMD Ryzen 9 3900X 12-Core Processor 3.79 GHz
  • 32.0 GB RAM

I regularly get errors like the following:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
I have noticed this happens whenever P2P shuffling is initiated.
I tried this current code:

import dask.dataframe as dd
import dask.array as da
import dask.bag as db
from dask.distributed import Client
from dask.diagnostics import ProgressBar

client = Client(n_workers=4, threads_per_worker=1, processes=True, memory_limit="6GB")
client

# Read the large dataset with date parsing
cherel_full = dd.read_csv('APDC_AllRecordsCleaned_010701to220331.csv', 
                       parse_dates=['Episode_start_date'],
                       dtype=dtype_dict,
                       assume_missing=True,
                       na_values = [' '])  # Adjust dtype as necessary

# Read the merged environment data WITH control samples
env_merged = dd.read_csv('case-crossover-data/merged-case-crossover-environment-data.csv', 
                             parse_dates=['Episode_start_date'],
                             dtype={'SA2_2011_CODE': 'str'})

cherel_full = cherel_full.dropna(subset=['SA2_2011_CODE', 'Episode_start_date'])
# Combine 'SA2_2011_CODE' and 'Episode_start_date' to create a unique identifier and set it as the index
cherel_full = cherel_full.set_index(cherel_full['SA2_2011_CODE'].astype(str) + '_' + cherel_full['Episode_start_date'].astype(str))

env_merged['SA2_2011_CODE'] = env_merged['SA2_2011_CODE'].str.replace('.0', '')
env_merged = env_merged.set_index(env_merged['SA2_2011_CODE'].astype(str) + '_' + env_merged['Episode_start_date'].astype(str))

ami_case_control = cherel_full[cherel_full['PD_AMI'] == 0].merge(env_merged, how = "left", left_index = True, right_index = True).compute()

This now gives me an error I have never seen before and genuinely stumped on how to proceed. I have tried looking for similar errors from others in the Github Issues but can’t find any? Really hoping someone can help me out and point me in the right direction.

2024-03-07 22:09:44,317 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 3c1b4d2b2e9cacb9f2097cbb00ffcbe1 initialized by task ('shuffle-transfer-3c1b4d2b2e9cacb9f2097cbb00ffcbe1', 99) executed on worker tcp://127.0.0.1:63842
2024-03-07 22:14:12,895 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 3c1b4d2b2e9cacb9f2097cbb00ffcbe1 deactivated due to stimulus 'task-erred-1709810052.8706212'
2024-03-07 22:14:14,023 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 143, in get_or_create
    return self.get(spec.id, worker)
  File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 128, in get
    state = self.active_shuffles[id]
KeyError: '3c1b4d2b2e9cacb9f2097cbb00ffcbe1'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 169, in _raise_if_barrier_unknown
    self.scheduler.tasks[key]
KeyError: 'shuffle-barrier-3c1b4d2b2e9cacb9f2097cbb00ffcbe1'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\core.py", line 967, in _handle_comm
    result = handler(**msg)
  File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 148, in get_or_create
    self._raise_if_barrier_unknown(spec.id)
  File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 171, in _raise_if_barrier_unknown
    raise RuntimeError(
RuntimeError: Barrier task with key 'shuffle-barrier-3c1b4d2b2e9cacb9f2097cbb00ffcbe1' does not exist. This may be caused by task fusion during graph generation. Please let us know that you ran into this by leaving a comment at distributed#7816.

Hi @mattshu0410, welcome to Dask community!

Your code perform complex operations like set_index and merge, which triggers complex shuffle between workers, on a bigger than memory dataset, this is always hard to get right.

I would recommend to try to make it work step by step, but first a few remakrs:

  • Before the last merge, you are filtering the bigger dataset, on PR_AMI column, could you do this earlier in the code?
  • Does it filter a lot of rows? As you are calling compute() in the end on the merge Dataframe, you’ll try to gather all the results in a Pandas dataframe on your main Python process side, will it fit in memory?

To better understand where the bottleneck really is, I’d try to run each set_index call separately, and write the result using to_parquet. And if you final merge result is bigger than memory, I would also try to stream the results to disk using to_parquet.

As the first error you’ve written says, you probably run into memory issues while performing this complex steps.

Hi @mattshu0410 ,

state = self.active_shuffles[id]

KeyError: ‘3c1b4d2b2e9cacb9f2097cbb00ffcbe1’

you should never receive an error like this.
Could you please prepare a self-contained reproducer (something we can run without access to your data) and open a ticket on Issues · dask/distributed · GitHub please?

We will also need to know the exact version of all packages you have installed (conda list --export if you’re using anaconda).

FYI @hendrikmakait

@mattshu0410:

I regularly get errors like the following:

Regularly as in “every time” or “often”?

As @guillaumeeb pointed out, your workload contains several complex operations that will likely cause your local cluster to run OOM. A reproducer would be necessary to investigate this further.

With respect to the P2P shuffle error, P2P currently spits out quite a few details because they were necessary to catch uncommon edge cases. At this point, we should probably reduce those. The KeyError is nothing to be concerned about but rather what I would have expected to see.
The runtime error (RuntimeError: Barrier task with key 'shuffle-barrier-3c1b4d2b2e9cacb9f2097cbb00ffcbe1' does not exist. This may be caused by task fusion during graph generation. Please let us know that you ran into this by leaving a comment at distributed#7816.) is a bit more curious. You shouldn’t necessarily see this one, so if you have a reproducer ready I’ll take a look at this. (I suspect it’s related to your workload reliably killing your local cluster.)