Hi everyone, I am running into some issues where after running tasks, I get a very large Bytes stored . For reference I have the following dataset
merged-case-crossover-environment-data.csv ~6GB
APDC_AllRecordsCleaned_WithEnvironment_010701to220331.csv ~58GB
I am running this on local machine
- AMD Ryzen 9 3900X 12-Core Processor 3.79 GHz
- 32.0 GB RAM
I regularly get errors like the following:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
I have noticed this happens whenever P2P shuffling is initiated.
I tried this current code:
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
from dask.distributed import Client
from dask.diagnostics import ProgressBar
client = Client(n_workers=4, threads_per_worker=1, processes=True, memory_limit="6GB")
client
# Read the large dataset with date parsing
cherel_full = dd.read_csv('APDC_AllRecordsCleaned_010701to220331.csv',
parse_dates=['Episode_start_date'],
dtype=dtype_dict,
assume_missing=True,
na_values = [' ']) # Adjust dtype as necessary
# Read the merged environment data WITH control samples
env_merged = dd.read_csv('case-crossover-data/merged-case-crossover-environment-data.csv',
parse_dates=['Episode_start_date'],
dtype={'SA2_2011_CODE': 'str'})
cherel_full = cherel_full.dropna(subset=['SA2_2011_CODE', 'Episode_start_date'])
# Combine 'SA2_2011_CODE' and 'Episode_start_date' to create a unique identifier and set it as the index
cherel_full = cherel_full.set_index(cherel_full['SA2_2011_CODE'].astype(str) + '_' + cherel_full['Episode_start_date'].astype(str))
env_merged['SA2_2011_CODE'] = env_merged['SA2_2011_CODE'].str.replace('.0', '')
env_merged = env_merged.set_index(env_merged['SA2_2011_CODE'].astype(str) + '_' + env_merged['Episode_start_date'].astype(str))
ami_case_control = cherel_full[cherel_full['PD_AMI'] == 0].merge(env_merged, how = "left", left_index = True, right_index = True).compute()
This now gives me an error I have never seen before and genuinely stumped on how to proceed. I have tried looking for similar errors from others in the Github Issues but can’t find any? Really hoping someone can help me out and point me in the right direction.
2024-03-07 22:09:44,317 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 3c1b4d2b2e9cacb9f2097cbb00ffcbe1 initialized by task ('shuffle-transfer-3c1b4d2b2e9cacb9f2097cbb00ffcbe1', 99) executed on worker tcp://127.0.0.1:63842
2024-03-07 22:14:12,895 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 3c1b4d2b2e9cacb9f2097cbb00ffcbe1 deactivated due to stimulus 'task-erred-1709810052.8706212'
2024-03-07 22:14:14,023 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 143, in get_or_create
return self.get(spec.id, worker)
File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 128, in get
state = self.active_shuffles[id]
KeyError: '3c1b4d2b2e9cacb9f2097cbb00ffcbe1'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 169, in _raise_if_barrier_unknown
self.scheduler.tasks[key]
KeyError: 'shuffle-barrier-3c1b4d2b2e9cacb9f2097cbb00ffcbe1'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\core.py", line 967, in _handle_comm
result = handler(**msg)
File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 148, in get_or_create
self._raise_if_barrier_unknown(spec.id)
File "c:\Users\bigha\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\shuffle\_scheduler_plugin.py", line 171, in _raise_if_barrier_unknown
raise RuntimeError(
RuntimeError: Barrier task with key 'shuffle-barrier-3c1b4d2b2e9cacb9f2097cbb00ffcbe1' does not exist. This may be caused by task fusion during graph generation. Please let us know that you ran into this by leaving a comment at distributed#7816.