Convert to run on existing hardware

Hi, I would like to know if this piece of code could be converted to be used in out of memory. The data has 68mil rows and goes out of memory at 250,000 rows at 30GB ram. Hard disk it 73GB. Would it be possible to train on all data please thanks. See below code:

!pip install rdkit

!pip install duckdb

import duckdb
import pandas as pd

train_path = '/kaggle/input/leash-predict-chemical-bindings/train.parquet'
test_path = '/kaggle/input/leash-predict-chemical-bindings/test.parquet'

con = duckdb.connect()

df = con.query(f"""(SELECT *
                        FROM parquet_scan('{train_path}')
                        WHERE binds = 0
                        ORDER BY random()
                        LIMIT 200000)
                        UNION ALL
                        (SELECT *
                        FROM parquet_scan('{train_path}')
                        WHERE binds = 1
                        ORDER BY random()
                        LIMIT 200000)""").df()



"""## Feature Preprocessing

Lets grab the smiles for the fully assembled molecule `molecule_smiles` and generate ecfps for it. We could choose different radiuses or bits, but 2 and 1024 is pretty standard.

from rdkit import Chem
from rdkit.Chem import AllChem
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import average_precision_score
from sklearn.preprocessing import OneHotEncoder

# Convert SMILES to RDKit molecules
df['molecule'] = df['molecule_smiles'].apply(Chem.MolFromSmiles)

# Generate ECFPs
def generate_ecfp(molecule, radius=2, bits=1024):
    if molecule is None:
        return None
    return list(AllChem.GetMorganFingerprintAsBitVect(molecule, radius, nBits=bits))

df['ecfp'] = df['molecule'].apply(generate_ecfp)

"""## Train Model"""

# One-hot encode the protein_name
onehot_encoder = OneHotEncoder(sparse_output=False)
protein_onehot = onehot_encoder.fit_transform(df['protein_name'].values.reshape(-1, 1))

# Combine ECFPs and one-hot encoded protein_name
X = [ecfp + protein for ecfp, protein in zip(df['ecfp'].tolist(), protein_onehot.tolist())]
y = df['binds'].tolist()

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create and train the random forest model
rf_model = RandomForestClassifier(n_estimators=100, random_state=42), y_train)

# Make predictions on the test set
y_pred_proba = rf_model.predict_proba(X_test)[:, 1]  # Probability of the positive class

# Calculate the mean average precision
map_score = average_precision_score(y_test, y_pred_proba)

print(f"Mean Average Precision (mAP): {map_score:.2f}")

import os

# Process the test.parquet file chunk by chunk
test_file = '/kaggle/input/leash-predict-chemical-bindings/test.csv'
output_file = 'submission.csv'  # Specify the path and filename for the output file

# Read the test.parquet file into a pandas DataFrame
for df_test in pd.read_csv(test_file, chunksize=100000):

    # Generate ECFPs for the molecule_smiles
    df_test['molecule'] = df_test['molecule_smiles'].apply(Chem.MolFromSmiles)
    df_test['ecfp'] = df_test['molecule'].apply(generate_ecfp)

    # One-hot encode the protein_name
    protein_onehot = onehot_encoder.transform(df_test['protein_name'].values.reshape(-1, 1))

    # Combine ECFPs and one-hot encoded protein_name
    X_test = [ecfp + protein for ecfp, protein in zip(df_test['ecfp'].tolist(), protein_onehot.tolist())]

    # Predict the probabilities
    probabilities = rf_model.predict_proba(X_test)[:, 1]

    # Create a DataFrame with 'id' and 'probability' columns
    output_df = pd.DataFrame({'id': df_test['id'], 'binds': probabilities})

    # Save the output DataFrame to a CSV file
    output_df.to_csv(output_file, index=False, mode='a', header=not os.path.exists(output_file))

Thanks & Best Regards

Hi @MichaelSchroter, welcome to Dask Discourse forum!

You’ll be able to read the Parquet files by chunk using Dask, and also use Dask-ml to apply some transformations by chunk.

However, I don’t think you can train a RandomForestClassifier without feeding it all the dataset. Data bound algorithms compatible with Dask-ML must implement a partial_fit method.

In any case, just look at Dask DataFrame read_parquet, and Dask-ml in order to understand what you could do.

1 Like

train_df['molecule'] = train_df['molecule_smiles'].apply(Chem.MolFromSmiles, meta=('object')) # Dask converted code

Hi, thanks for the reply. The issue arises when doing these transformations. It gives a hashing error as per below.

TokenizationError: Object <Boost.Python.function object at 0x587ec4b3a6e0> cannot be deterministically hashed. See for more information.

Then I changed the code to :slight_smile:

# Convert SMILES to RDKit molecules
from dask import delayed
train_df['molecule'] = train_df['molecule_smiles'].apply(delayed(Chem.MolFromSmiles), meta=('object'))

# Generate ECFPs
def generate_ecfp(molecule, radius=2, bits=1024):
    if molecule is None:
        return None
    return list(AllChem.GetMorganFingerprintAsBitVect(molecule, radius, nBits=bits))

train_df['ecfp'] = train_df['molecule'].apply(delayed(generate_ecfp), meta=('object'))

Now I get :slight_smile:

/opt/conda/lib/python3.10/site-packages/dask/dataframe/ UserWarning: Dask currently has limited support for converting pandas extension dtypes to arrays. Converting string to object dtype.
ValueError                                Traceback (most recent call last)
Cell In[3], line 32
     30 # One-hot encode the protein_name
     31 onehot_encoder = OneHotEncoder(sparse_output=False)
---> 32 protein_onehot = onehot_encoder.fit_transform(train_df['protein_name'].values.reshape(-1, 1))
     34 # Combine ECFPs and one-hot encoded protein_name
     35 X = [ecfp + protein for ecfp, protein in zip(train_df['ecfp'].tolist(), protein_onehot.tolist())]

File /opt/conda/lib/python3.10/site-packages/dask/array/, in Array.reshape(self, merge_chunks, limit, *shape)
   2217 if len(shape) == 1 and not isinstance(shape[0], Number):
   2218     shape = shape[0]
-> 2219 return reshape(self, shape, merge_chunks=merge_chunks, limit=limit)

File /opt/conda/lib/python3.10/site-packages/dask/array/, in reshape(x, shape, merge_chunks, limit)
    216     if len(shape) == 1 and x.ndim == 1:
    217         return x
--> 218     missing_size = sanitize_index(x.size / reduce(mul, known_sizes, 1))
    219     shape = tuple(missing_size if s == -1 else s for s in shape)
    221 if np.isnan(sum(x.shape)):

File /opt/conda/lib/python3.10/site-packages/dask/array/, in sanitize_index(ind)
     66     return slice(
     67         _sanitize_index_element(ind.start),
     68         _sanitize_index_element(ind.stop),
     69         _sanitize_index_element(ind.step),
     70     )
     71 elif isinstance(ind, Number):
---> 72     return _sanitize_index_element(ind)
     73 elif is_dask_collection(ind):
     74     return ind

File /opt/conda/lib/python3.10/site-packages/dask/array/, in _sanitize_index_element(ind)
     24 """Sanitize a one-element index."""
     25 if isinstance(ind, Number):
---> 26     ind2 = int(ind)
     27     if ind2 != ind:
     28         raise IndexError("Bad index.  Must be integer-like: %s" % ind)

ValueError: cannot convert float NaN to integer

Would anyone be able to help me in this matter please.
Thanks & Best Regards

You shouldn’t delayed a call into DataFrame.Apply. Instead, see if you can find a workaround using the link in the error. What object types does generate_efp method returns?

I don’t know if the second error is related or not.