Need help with efficient parallelization [local machine]

Hello,
I’m just getting started with Dask and am running into troubles trying to make parallelization improve my efficiency.

In short, my observations so far are that:

  • When using Dask in a testing, low-data environment, there is tremendous overhead making it about 15x slower than just using pandas.
  • When attempting to step up to a slightly higher-data (but still not huge) environment, I get a warning about large data size and performance goes very poorly.

Some specifics about my specific goals:
I have a process called run_trial() that gets run about 500-1500 times in a given routine.
In a low-data environment (working with a table of 20000 rows), pandas takes about 20 seconds to do 500 of these run_trial() calls.

The time required is roughly linear in length of data. At 100,000 rows, pandas takes about 70 seconds to do 500 calls to run_trial().

I thought this might be a good place to use Dask, but in a low_data environment my efforts at parallelization are performing very poorly. It is taking something like 250 seconds to make 500 calls to run_trial().
And when I step up to just 100,000 rows I get a warning about the size of memory in the execution graph, and performance is very poor.

Here is the code I’m trying to use to parallelize:

    def traverse_features(self, indexes_to_test, bundle_provider):
        cohorts = []
        temp_cohorts = []
        for idx in indexes_to_test:
            trials = bundle_provider.get_bundle(new_indexes=[idx])
            for trial in trials:
                this_result = delayed(self.run_trial)(trial, idx)
                temp_cohorts.append(this_result)
            if len(temp_cohorts) > 30:  # Trying to prevent too many tasks scheduled at once.
                cohorts.extend(list(compute(*temp_cohorts)))
                temp_cohorts = []
        cohorts.extend(list(compute(*temp_cohorts)))
        joint = pd.concat(cohorts, axis=0)
        return joint

In the above a trial object simply holds a set of train/test data held in pandas series and data frames (2 data frames for test/train X, 2 series for test/train labels, 2 series for test/train weights.)

The number of dimensions is tiny, and in my testing it is just a single feature, so the total payload is just 3 x total # of rows (since every row is either in training or test set).

If I’m using the 100,000-row case, each trial has a total of 300,000 numbers, which does not seem like a lot to me, but I get a warning “UserWarning: Large object of size 4.58 MiB detected in task graph” And performance is bad to the point of not running.

So any help would be appreciated:
— how do I reduce the large overhead?
— How do I stop the large performance hit when using a relatively small (4.5MiB) dataframe? The warning mentions “client.scatter()”, but I have no idea if that is necessary or how to use it in this case.

Since these dataframes are so small (relative to my total RAM), I thought using pandas objects here was the right way to go.

@DRudel Thanks for the question! Would you be able to share a minimal reproducible version of your workflow, perhaps with some sample/toy data? It’ll allow us to help you better.

A general note about the traverse_features function – looks like you’re calling compute within the for-loop, which is generally not considered good practice with Dask.

Since these dataframes are so small (relative to my total RAM), I thought using pandas objects here was the right way to go.

Indeed, I’d also suggest you stick with pandas because this fits in memory. :smile:

Thanks @pavithraes for offering to take a look!
Basically what I’m doing is implementing a custom-rolled version of sklearn’s sequential feature selection. I’m rolling my own to allow greater flexibility in the progression.

Below is a minimal version of non-production code that is intended to show the process for a single round. (Each round depends on the results of the last.)
You can vary the size of the n_samples parameter to try out different data size contexts.

You can assume the data for X fits into memory. (Though memory could still be a concern if too many copies of pieces of X get created in parallel.)

import pandas as pd
from sklearn.datasets import make_classification
import lightgbm as ltb
from sklearn.model_selection import KFold, cross_val_predict
from sklearn.metrics import log_loss
from dask.distributed import Client
from dask import delayed, compute


def main():
    # Create random data and label
    X, y = make_classification(n_samples=500000, n_features=200, n_informative=25, n_redundant=5, n_classes=2,
                                  n_clusters_per_class=10, weights=None, flip_y=0.1, class_sep=1.0, hypercube=False)

    X = pd.DataFrame(data=X)

    my_splitter = KFold(n_splits=5, shuffle=True, random_state=173)

    @delayed
    def run_trial(these_features, feature_name):
        X_this = X[these_features]
        my_model = ltb.LGBMClassifier(boosting_type='dart', importance_type='gain', n_estimators=50, num_leaves=7)
        predictions = cross_val_predict(my_model, X=X_this, y=y, cv=my_splitter, method='predict_proba')
        score = log_loss(y, predictions)
        summary = pd.Series([feature_name, score], index=['name', 'score'])
        return summary

    summaries = []
    # Emulate that we have already found 12 features we like and we are going to add to them.
    established_features = list(X.columns[:12])

    for feature_name in X.columns:
        if feature_name in established_features:
            continue
        print(feature_name)
        these_features = established_features + [feature_name]
        summary = run_trial(these_features, feature_name)
        summaries.append(summary)

    summaries = compute(*summaries)

    full_summary = pd.DataFrame(summaries, columns=['name', 'score'])
    return full_summary


if __name__ == "__main__":
    client = Client()
    results = main()

In this version I no longer get the error I got earlier because I’m pushing the indexes (rather than the data) into the run_trial() method.

But when I run the above I get a memory error when some internal pickling is attempted.
AttributeError: Can’t pickle local object ‘main..run_trial’