Efficient task distribution for data movement in a cluster (feature store example)

Hello!

I am experimenting with using dask for parallelizing the data-download → data prep → feature extraction → train workflow.

Let’s say our cluster has 6 nodes. I’d like to submit two types of tasks to this cluster,

  • Task 1: data-download → data prep → feature extraction for 4 different model types. This task is fast and repeated periodically as new data is available.
  • Task 2: ML pipeline (normalization, outlier detection/removal, training, model saving) for a single model type. This task is slow and is repeated as often as possible.

What is the most efficient way to store the features created by Task 1 to be partially consumed by Task 2? After studying the dask documentation, I have found that there are two obvious solutions:

Slowest and simplest:

  • Solution 1: write the features to disk at the end of Task 1, and then load the features from disk in Task 2.

Closer to storing features in RAM locally:

  • Solution 2: Publish the data set to the cluster memory with client.publish_dataset() at the end of Task 1, then use client.get_dataset() inside Task 2 to pull the features from cluster memory.

Meanwhile, there seems to be a third solution that I am hesitant to build due to complexity of data tracking:

Most complex but likely most efficient:

  • Solution 3: Minimize data movement as much as possible according to Data Locality. Try to “prepare” nodes with Task 1, then send Task 2 to the prepared node which still holds the prepared features.

This brings up a variety of questions about client.publish_dataset(). Is it meant to be used as described in Solution 2? Is the overhead associated with distributing the data across cluster memory and converting to/from Dask DataFrame less compared to simply loading to and from disk (Solution 1)?

Thanks for the advice,

Robert

Hi @robcaulk,

First, what is the size of the datasets we are talking about? I’d say go simple if you can.

I’ve never used client.publish_dataset(), but this looks like a good solution to your problem. I won’t try to handle Data Locality and leave this to Dask who will try to do it for you while keeping workers busy.

In general, yes, loading from cluster memory will be much more efficient, except if the next part of your workflow triggers a lot of data exchange between workers. In the end it all depends on your workflow, I would recommend to try solution 2 first, and fall back to 1 (or try it just to see) if this don’t work or is not efficient.