Is Dask XGBoost a good option

Hello,

I am experimenting with Dask and Dask-XGBoost to reduce compute time for my machine learning pipeline. My current setup includes a machine with 192 cores and 1.5TB of memory. The task involves processing CSV data (ranging from 10 to 30 GB) and performing heavy preprocessing steps.

Currently, I’m using pandas, but a significant amount of time is spent on reading, preprocessing, and training, so I am considering Dask to improve performance. Despite my efforts, I haven’t been able to achieve better training times compared to my pandas-based baseline.

Here are the details of my current setup and code:

Data Loading with Dask

def load_data_dask() -> dd.DataFrame:
    order_data = dd.read_csv(order_path, parse_dates=['date'])
    lineitem_data = dd.read_csv(lineitem_path)
    product_data = dd.read_csv(product_path)
    
    data = order_data.merge(lineitem_data, left_on='o_order_id', right_on='li_order_id')
    data = data.merge(product_data, left_on='li_product_id', right_on='p_product_id')
    
    if 'trip_type' in data.columns:
        return data[['o_order_id', 'date', 'department', 'quantity', 'trip_type']]
    else:
        return data[['o_order_id', 'date', 'department', 'quantity']]

Preprocessing with Dask

def custom_agg(group):
    return pd.Series({
        'scan_count': group['scan_count'].sum(),
        'scan_count_abs': group['scan_count'].abs().sum(),
        'weekday': group['weekday'].iloc[0],
        'trip_type': group['trip_type'].iloc[0] if 'trip_type' in group.columns else None
    })

def pre_process_dask(raw_data: dd.DataFrame) -> (da.Array, dd.DataFrame):
    has_labels = label_column in raw_data.columns

    raw_data['scan_count'] = raw_data['quantity']
    raw_data['weekday'] = raw_data['date'].dt.day_name()
    
    weekday_categories = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
    raw_data['weekday'] = raw_data['weekday'].astype('category').cat.set_categories(weekday_categories)

    features_scan_count = raw_data.groupby('o_order_id').apply(custom_agg, meta={
        'scan_count': 'f8',
        'scan_count_abs': 'f8',
        'weekday': 'category',
        'trip_type': 'f8' if has_labels else 'f8'
    })

    weekdays = raw_data.pivot_table(index='o_order_id', columns='weekday', values='scan_count', aggfunc='count').fillna(0)
    
    department_categories = department_columns
    raw_data['department'] = raw_data['department'].astype('category').cat.set_categories(department_categories)
    
    departments = raw_data.pivot_table(index='o_order_id', columns='department', values='scan_count', aggfunc='sum').fillna(0)

    final_data = features_scan_count.drop(columns=['weekday']).join(weekdays).join(departments)

    for c in set(weekday_columns) - set(weekdays.columns):
        final_data[c] = 0
    for c in set(department_columns) - set(departments.columns):
        final_data[c] = 0

    if has_labels:
        final_data = final_data[final_data['trip_type'] != 14]
        final_data[label_column] = final_data['trip_type'].map(encode_label, meta=('trip_type', 'int64'))
        labels = final_data[label_column].to_dask_array()
        features = final_data[featureColumns]
        return labels, features
    else:
        return None, final_data[featureColumns]

Dask Cluster Configuration

from dask.distributed import LocalCluster

cluster = LocalCluster(
    n_workers=8,           # Number of workers
    threads_per_worker=2,  # Threads per worker
    memory_limit='32GB',   # Memory limit per worker
    dashboard_address=':8787'  # Dashboard port
)

Training with Dask-XGBoost

def train_dask(training_data: dd.DataFrame, labels: da.Array, num_rounds: int):
    params = {
        'objective': 'multi:softprob',
        'nround': num_rounds,
        'max_depth': 16,
        'eta': 0.01,
        'subsample': 0.5,
        'min_child_weight': 1,
        'tree_method': 'hist',
        'grow_policy': 'lossguide',
        'num_class': len(label_to_index) 
    }
    print('Finished initialization')

    dtrain = dxgb.DaskDMatrix(client, training_data, labels)
    print('Before training')
    bst = dxgb.train(client, params, dtrain, num_boost_round=100)
    print(bst)

    return bst

Questions and Request for Tips

  1. Is using DaskML a viable option for my use case?
  2. What settings can I fine-tune to achieve better performance?
  3. Are there any specific practices or configurations to optimize Dask for my high-core machine?

I’d appreciate any tips or recommendations that could help improve the performance of my pipeline.

Thank you!

Hi @Aratiganesh123,

It’s a bit hard to answer your questions without knowing the limitations of your workflow. Dask XGBoost is a viable option, but it might not speed up your use case, especially on such a big machine: since you have plenty enough RAM for handling your data, Pandas might still be better depending on your process, especially for merging.

Your code looks fine, but do you have more context about where it is taking time? Did you have a look at the execution on the Dask Dashboard, does your Dask cluster looks fully used?

The only thing I can say for now is that the LocalCluster your are creating is really small compared to the amount of cores and memory you have on the machine!