Out of memory error faced with Dask

Hi,
I am Anurag, a developer at a HFT firm. My team uses dask to combine features (reindex and forward fill) with respect to unique timestamps and then run a linear regression. We load feature data through a config which has the hashes and as of now we are running a fit for 30 features with 30 dates.

I am working on a server with 378 GB ram, but facing the Out of memory error which I wanted to discuss with you, owing to all your contributions on the dask forum which I really appreciate.

This is the dummy code, which we are using and it usually breaks at the to_dask_array operation when all the data gets loaded into memory.

I will send the code snippet, could you please go through that. I will be highly obliged to you if you can help us out in tackling this issue.

def combine_feature_data(feature_hashes, objective_paths, dates, base_dir):
    combined_feature_dfs = []
    for feature_hash in feature_hashes:
        parquet_files = [os.path.join(base_dir, f"indicator_{feature_hash}_{date}/part.0.parquet") for date in dates]
        feature_df = read_parquet_files(parquet_files)
        combined_feature_dfs.append(feature_df)
    
    objective_dfs = [dd.read_parquet(path) for path in objective_paths]
    combined_feature_dfs.extend(objective_dfs)
    combined_features = dd.concat(combined_feature_dfs, axis=0, interleave_partitions=True)
    combined_features = forward_fill_combined_dataframe(combined_features)
    return combined_features

def forward_fill_combined_dataframe(combined_df):
    all_indices = sorted(set(combined_df.index))
    index_df = pd.DataFrame(all_indices, columns=['timestamp'])
    index_ddf = dd.from_pandas(index_df, npartitions=1).set_index('timestamp')
    reindexed_df = index_ddf.merge(combined_df, left_index=True, right_index=True, how='left').ffill().fillna(0)
    reindex_df = reindexed_df.drop_duplicates()
    return reindexed_df


X = combine_feature_data(feature_hashes, objective_paths, dates, base_dir)
X = X.to_dask_dataframe()
x = (X.iloc[:, :-1]).to_dask_array(lengths=True)
y = (X.iloc[:, -1]).to_dask_array(lengths=True)

model = LinearRegression()
model.fit(x, y)

In the above snippet -

  1. We are successfully able to combine the feature data with objective data. (happens in around 30 Gb of RAM).

  2. But, before fitting, we when load it to memory it goes Out of memory or throws the OOM error. (shoots upto 350 Gb of RAM).

Basically my doubts boils down to how can I run a dask fit without loading of all the data in memory. Would appreciate any general comments and suggestions on the script as well !

Hi @faze-geek, welcome to Dask community!

Could you describe a bit more the input dataset? Number of Parquet files and volume?

I’m not sure I have a clear view of the first step which combine feature with objective data, what is it referring to in your code snippet?

There is no import in your code snippet, which LinearRegression class do you use? How big are x and y?