Client not starting and hangs after setting temporary directory in dask config

Hello,

I’m pretty new to dask. I’m working with an insanely large dataset (with lots of texts). I need to manipulate the text and when I launch the process, it goes to such an extent that I get a warning that /tmp is full and dask cannot spill over. After searching around, I found out that I can set the dask worker space to a custom directory using dask.config.set(temporary_directory=/path/to/custom/dir). My scratch space is much bigger than /tmp so I created a folder there and set the temporary directory to that. However, now, the client is not starting and just hangs. Specifically, client = Client(n_workers=32) just hangs. A new folder called dask-worker-space is created in my custom location. How do I rectify this?

Thanks.

Hi @sudarshan85, welcome to this forum!

I’m not able to reproduce your problem. Using the following call on my local laptop:

import dask
from dask.distributed import Client
dask.config.set(temporary_directory='/home/guillaume/dask_tmp')
client = Client(n_workers=32)
future = client.submit(lambda x: x+1, 10)
future.result()

works just fine.

This is what you expect, this folder is used by workers to spill over.

By reading this, I imagine you’re on some kind of HPC cluster? On those, the shared space might be not the best place to use for worker spilling, as it is often a shared file system, which can be inefficient for lots of small IOs. In your case, it might not work for file locking, and that could be the problem?

Anyway, I think the best way to go would be to address the problem your stating in the first place:

Spilling is bad in general, you want to avoid this. Depending on your workflow, Dask should be able to stream over your data to avoid loading it all at once. Could you share an example of your code so that we see if we can optimize it?

Thank you for your reply. Let me give a summary of my task which might help answer some questions.

I work with medical data, specifically mental health notes (I can’t share examples of the exact data or show output that might contain sensitive information). Each mental health note has a note type it belongs to. However, nearly 40+% of the notes don’t have a type (or is categorized as missing). My task is to try to build a classifier on the labeled notes and try to classify the notes that don’t have a type.

I’m working with 3 years worth of data (2020, 2021, 2022). The data was taken from a database using Spark and is save as parquet files. Specifically, the parent folder contains a folder for each year, and each year folder contains a folder for each month. Within each of these folders are multiple parquet that was spit out by Spark. Thus the folder structure is: data_dir/year/month/*. The total size of the entire data is 1.3TB. The data saved contains patient id, document id, entry date, note type, and the note itself.

The boxes are provisioned for us and from POV it appears as a single machine. Details of the box:

CPU Cores: 64
Memory: 1TB
GPUs: 2x V100S with 32GB each

Clearly, if with compressed parquet format the size of the data is 1.3TB, it can’t fit into memory on this machine, hence I turned to dask. The goal is to load all the data, do some EDA and preprocessing, write the processed data to disk, then move on to modeling.

This is the code I have so far (very minimal):

import dask
import dask.dataframe as dd
from dask.distributed import Client

client = Client(n_workers=32)
notes_df = dd.read_parquet(data_dir)
print(notes.npartitions)

Without setting custom temp directory this code runs in 30.4 seconds and reads in all the parquet partitions to a total of 51075. However, when I set the custom temp directory dask.config.set(temporary_directory=project_dir), it just hangs. Specifically, the line client = Client(n_workers=32) just hangs. I think you are write about this being a distributed file system and thats why this is painfully slow (or even hanging).

Now, with the default temporary directory, I execute the following code which causes spillage:

def replace_empty(text, replace=np.nan):
 if pd.isnull(text)
  return text
 elif text.isspace() or text == '':
  return replace
return text

notes_df['ReportText'] = notes_df.map_partitions(lambda df: df['ReportText'].apply(replace_empty), meta=('Text', 'object')).compute()

This is the line that causes spillage eventually. It says no space in ‘/tmp’ (which is about 4.3GB). After about 5 minutes of running, I get an error:

DATE - distributed.spill - ERROR - Spill to disk failed; keeping data in memory
OSError: [Error 28] No space left on device

It still continues to run after the error shows up. Sorry I can’t post the entire traceback. The machines are hosted in remote enclave where there is no copy and paste functionality (for good reason).

Thankis.

Thanks for all the details @sudarshan85, this is really helpful.

Actually, the code above this quote doesn’t really reads all the parquet partitions, Dask is lazy, it just browse the files, builds the Dask Dataframe structure in memory and its associated task graph. If you print notes_df at this point, this will only be pointers to data.

When you call compute() at the end of this line, you are asking Dask to apply it to all partitions and return a consolidated Pandas Dataframe to your client. So basically, you are asking Dask to load all the Data into memory, and then serialize it to your Client. Which as you noted above, will not be possible due to the dataset size.

This is a really common mistake when using Dask Dataframe, it took me sometimes to understand what really meant compute(), especially if you come from Spark.

I think what you want to do instead is applying different transformations to the Dask Dataframe, filters and so one. At one point, you’ll want either to save it back to disk using to_parquet() or equivalent, either to consolidate it as a single Pandas Dataframe using compute(), but not before filtering it down a lot, or taking only a sample of the original dataset.

Bottom line is: don’t call compute() on a Dataframe too large to fit in memory.

Thank you. The whole point of using dask here is so that I can parallel process this huge data. It seems that dask will not serve the purpose I need. Unless I’m understanding this wrong.

I need to preprcess all the text in my data. Whats the solution here? Just read in enough data that could fit my memory? Couldn’t I do that with Pandas? What is dask giving me here?

For example, I could read in all the data in a loop on a per month basis, which would fit in memory. I could just do that in Pandas. Why would I need dask? Maybe for parallel processing?

Dask will do that for you, with the same syntax as Pandas. You don’t have to bother with for loop and splitting your data. Dask will automatically split your data in size that fits in RAM, and process each piece/chunk of this data.
Moreover, as you said, it will do this in parallel automatically, using your 32 or 64 cores, giving you extra speed!

But basically yes, Dask Dataframe is a mean to process dataset bigger than memory by dividing them in smaller chunk and processing them independently (with parallelization).

I was assuming that this was what I was doing? Basically, all the data is in data_dir which contains 3 folders 2020, 2021, and 2022, each of which contains 12 folders each corresponding to each month of the year. Within these month folders, I have a bunch of parquet files written spit out by Spark (this was done by someone else, I actually haven’t worked with Spark :slight_smile:).

My understanding was, if I just point to the data_dir folder, Dask will do the rest as you mentioned. Since that is not happening, I’m thinking I had to read in each month’s folder separately and do the processing. Am I missing something here?

My next step is do read in one month of data, process it using Dask, write it do disk and then proceed to next month using a for loop iterating over all the months for all 3 years.

Sorry, I was probably unclear in my earlier post. The problem you have is the compute call.

Just do something like:

notes_df['ReportText'] = notes_df.map_partitions(lambda df: df['ReportText'].apply(replace_empty), meta=('Text', 'object')) #no compute
notes_df.to_parquet(...)

compute will result in computing all the partitions and trying to load them into memory.

Thanks I didn’t realize it was this easy. I think I’m finally getting how to use Dask. The parallel/distributed computing paradigm requires me to think of the problem in a different way. I’m starting get that :slight_smile:

1 Like