Hello,
I’m just getting started with Dask and am running into troubles trying to make parallelization improve my efficiency.
In short, my observations so far are that:
- When using Dask in a testing, low-data environment, there is tremendous overhead making it about 15x slower than just using pandas.
- When attempting to step up to a slightly higher-data (but still not huge) environment, I get a warning about large data size and performance goes very poorly.
Some specifics about my specific goals:
I have a process called run_trial()
that gets run about 500-1500 times in a given routine.
In a low-data environment (working with a table of 20000 rows), pandas takes about 20 seconds to do 500 of these run_trial()
calls.
The time required is roughly linear in length of data. At 100,000 rows, pandas takes about 70 seconds to do 500 calls to run_trial()
.
I thought this might be a good place to use Dask, but in a low_data environment my efforts at parallelization are performing very poorly. It is taking something like 250 seconds to make 500 calls to run_trial()
.
And when I step up to just 100,000 rows I get a warning about the size of memory in the execution graph, and performance is very poor.
Here is the code I’m trying to use to parallelize:
def traverse_features(self, indexes_to_test, bundle_provider):
cohorts = []
temp_cohorts = []
for idx in indexes_to_test:
trials = bundle_provider.get_bundle(new_indexes=[idx])
for trial in trials:
this_result = delayed(self.run_trial)(trial, idx)
temp_cohorts.append(this_result)
if len(temp_cohorts) > 30: # Trying to prevent too many tasks scheduled at once.
cohorts.extend(list(compute(*temp_cohorts)))
temp_cohorts = []
cohorts.extend(list(compute(*temp_cohorts)))
joint = pd.concat(cohorts, axis=0)
return joint
In the above a trial object simply holds a set of train/test data held in pandas series and data frames (2 data frames for test/train X, 2 series for test/train labels, 2 series for test/train weights.)
The number of dimensions is tiny, and in my testing it is just a single feature, so the total payload is just 3 x total # of rows (since every row is either in training or test set).
If I’m using the 100,000-row case, each trial has a total of 300,000 numbers, which does not seem like a lot to me, but I get a warning “UserWarning: Large object of size 4.58 MiB detected in task graph” And performance is bad to the point of not running.
So any help would be appreciated:
— how do I reduce the large overhead?
— How do I stop the large performance hit when using a relatively small (4.5MiB) dataframe? The warning mentions “client.scatter()”, but I have no idea if that is necessary or how to use it in this case.
Since these dataframes are so small (relative to my total RAM), I thought using pandas objects here was the right way to go.