Hello,
I am experimenting with Dask and Dask-XGBoost to reduce compute time for my machine learning pipeline. My current setup includes a machine with 192 cores and 1.5TB of memory. The task involves processing CSV data (ranging from 10 to 30 GB) and performing heavy preprocessing steps.
Currently, I’m using pandas, but a significant amount of time is spent on reading, preprocessing, and training, so I am considering Dask to improve performance. Despite my efforts, I haven’t been able to achieve better training times compared to my pandas-based baseline.
Here are the details of my current setup and code:
Data Loading with Dask
def load_data_dask() -> dd.DataFrame:
order_data = dd.read_csv(order_path, parse_dates=['date'])
lineitem_data = dd.read_csv(lineitem_path)
product_data = dd.read_csv(product_path)
data = order_data.merge(lineitem_data, left_on='o_order_id', right_on='li_order_id')
data = data.merge(product_data, left_on='li_product_id', right_on='p_product_id')
if 'trip_type' in data.columns:
return data[['o_order_id', 'date', 'department', 'quantity', 'trip_type']]
else:
return data[['o_order_id', 'date', 'department', 'quantity']]
Preprocessing with Dask
def custom_agg(group):
return pd.Series({
'scan_count': group['scan_count'].sum(),
'scan_count_abs': group['scan_count'].abs().sum(),
'weekday': group['weekday'].iloc[0],
'trip_type': group['trip_type'].iloc[0] if 'trip_type' in group.columns else None
})
def pre_process_dask(raw_data: dd.DataFrame) -> (da.Array, dd.DataFrame):
has_labels = label_column in raw_data.columns
raw_data['scan_count'] = raw_data['quantity']
raw_data['weekday'] = raw_data['date'].dt.day_name()
weekday_categories = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
raw_data['weekday'] = raw_data['weekday'].astype('category').cat.set_categories(weekday_categories)
features_scan_count = raw_data.groupby('o_order_id').apply(custom_agg, meta={
'scan_count': 'f8',
'scan_count_abs': 'f8',
'weekday': 'category',
'trip_type': 'f8' if has_labels else 'f8'
})
weekdays = raw_data.pivot_table(index='o_order_id', columns='weekday', values='scan_count', aggfunc='count').fillna(0)
department_categories = department_columns
raw_data['department'] = raw_data['department'].astype('category').cat.set_categories(department_categories)
departments = raw_data.pivot_table(index='o_order_id', columns='department', values='scan_count', aggfunc='sum').fillna(0)
final_data = features_scan_count.drop(columns=['weekday']).join(weekdays).join(departments)
for c in set(weekday_columns) - set(weekdays.columns):
final_data[c] = 0
for c in set(department_columns) - set(departments.columns):
final_data[c] = 0
if has_labels:
final_data = final_data[final_data['trip_type'] != 14]
final_data[label_column] = final_data['trip_type'].map(encode_label, meta=('trip_type', 'int64'))
labels = final_data[label_column].to_dask_array()
features = final_data[featureColumns]
return labels, features
else:
return None, final_data[featureColumns]
Dask Cluster Configuration
from dask.distributed import LocalCluster
cluster = LocalCluster(
n_workers=8, # Number of workers
threads_per_worker=2, # Threads per worker
memory_limit='32GB', # Memory limit per worker
dashboard_address=':8787' # Dashboard port
)
Training with Dask-XGBoost
def train_dask(training_data: dd.DataFrame, labels: da.Array, num_rounds: int):
params = {
'objective': 'multi:softprob',
'nround': num_rounds,
'max_depth': 16,
'eta': 0.01,
'subsample': 0.5,
'min_child_weight': 1,
'tree_method': 'hist',
'grow_policy': 'lossguide',
'num_class': len(label_to_index)
}
print('Finished initialization')
dtrain = dxgb.DaskDMatrix(client, training_data, labels)
print('Before training')
bst = dxgb.train(client, params, dtrain, num_boost_round=100)
print(bst)
return bst
Questions and Request for Tips
- Is using DaskML a viable option for my use case?
- What settings can I fine-tune to achieve better performance?
- Are there any specific practices or configurations to optimize Dask for my high-core machine?
I’d appreciate any tips or recommendations that could help improve the performance of my pipeline.
Thank you!