Hi,
I am Anurag, a developer at a HFT firm. My team uses dask to combine features (reindex and forward fill) with respect to unique timestamps and then run a linear regression. We load feature data through a config which has the hashes and as of now we are running a fit for 30 features with 30 dates.
I am working on a server with 378 GB ram, but facing the Out of memory error which I wanted to discuss with you, owing to all your contributions on the dask forum which I really appreciate.
This is the dummy code, which we are using and it usually breaks at the to_dask_array operation when all the data gets loaded into memory.
I will send the code snippet, could you please go through that. I will be highly obliged to you if you can help us out in tackling this issue.
def combine_feature_data(feature_hashes, objective_paths, dates, base_dir):
combined_feature_dfs = []
for feature_hash in feature_hashes:
parquet_files = [os.path.join(base_dir, f"indicator_{feature_hash}_{date}/part.0.parquet") for date in dates]
feature_df = read_parquet_files(parquet_files)
combined_feature_dfs.append(feature_df)
objective_dfs = [dd.read_parquet(path) for path in objective_paths]
combined_feature_dfs.extend(objective_dfs)
combined_features = dd.concat(combined_feature_dfs, axis=0, interleave_partitions=True)
combined_features = forward_fill_combined_dataframe(combined_features)
return combined_features
def forward_fill_combined_dataframe(combined_df):
all_indices = sorted(set(combined_df.index))
index_df = pd.DataFrame(all_indices, columns=['timestamp'])
index_ddf = dd.from_pandas(index_df, npartitions=1).set_index('timestamp')
reindexed_df = index_ddf.merge(combined_df, left_index=True, right_index=True, how='left').ffill().fillna(0)
reindex_df = reindexed_df.drop_duplicates()
return reindexed_df
X = combine_feature_data(feature_hashes, objective_paths, dates, base_dir)
X = X.to_dask_dataframe()
x = (X.iloc[:, :-1]).to_dask_array(lengths=True)
y = (X.iloc[:, -1]).to_dask_array(lengths=True)
model = LinearRegression()
model.fit(x, y)
In the above snippet -
-
We are successfully able to combine the feature data with objective data. (happens in around 30 Gb of RAM).
-
But, before fitting, we when load it to memory it goes Out of memory or throws the OOM error. (shoots upto 350 Gb of RAM).