Thank you for your reply. Let me give a summary of my task which might help answer some questions.
I work with medical data, specifically mental health notes (I can’t share examples of the exact data or show output that might contain sensitive information). Each mental health note has a note type it belongs to. However, nearly 40+% of the notes don’t have a type (or is categorized as missing
). My task is to try to build a classifier on the labeled notes and try to classify the notes that don’t have a type.
I’m working with 3 years worth of data (2020, 2021, 2022). The data was taken from a database using Spark and is save as parquet
files. Specifically, the parent folder contains a folder for each year, and each year folder contains a folder for each month. Within each of these folders are multiple parquet
that was spit out by Spark. Thus the folder structure is: data_dir/year/month/*
. The total size of the entire data is 1.3TB. The data saved contains patient id, document id, entry date, note type, and the note itself.
The boxes are provisioned for us and from POV it appears as a single machine. Details of the box:
CPU Cores: 64
Memory: 1TB
GPUs: 2x V100S with 32GB each
Clearly, if with compressed parquet format the size of the data is 1.3TB, it can’t fit into memory on this machine, hence I turned to dask. The goal is to load all the data, do some EDA and preprocessing, write the processed data to disk, then move on to modeling.
This is the code I have so far (very minimal):
import dask
import dask.dataframe as dd
from dask.distributed import Client
client = Client(n_workers=32)
notes_df = dd.read_parquet(data_dir)
print(notes.npartitions)
Without setting custom temp
directory this code runs in 30.4 seconds and reads in all the parquet partitions to a total of 51075
. However, when I set the custom temp directory dask.config.set(temporary_directory=project_dir)
, it just hangs. Specifically, the line client = Client(n_workers=32)
just hangs. I think you are write about this being a distributed file system and thats why this is painfully slow (or even hanging).
Now, with the default temporary directory, I execute the following code which causes spillage:
def replace_empty(text, replace=np.nan):
if pd.isnull(text)
return text
elif text.isspace() or text == '':
return replace
return text
notes_df['ReportText'] = notes_df.map_partitions(lambda df: df['ReportText'].apply(replace_empty), meta=('Text', 'object')).compute()
This is the line that causes spillage eventually. It says no space in ‘/tmp’ (which is about 4.3GB). After about 5 minutes of running, I get an error:
DATE - distributed.spill - ERROR - Spill to disk failed; keeping data in memory
OSError: [Error 28] No space left on device
It still continues to run after the error shows up. Sorry I can’t post the entire traceback. The machines are hosted in remote enclave where there is no copy and paste functionality (for good reason).
Thankis.