Hi, dask developers and experts,
Recently, I use dask to do the distributed computation but alway disturbed by the unmanaged memory (I guess). Since my HPC is non-interactive-mode, now the only things I know the latest output warning is always about the percentage of unmanaged memory, when the job lib.Parallel(n_jobs=24)
.
When I run the following code on 6 nodes, using 4 processes, and 16 threads (4 threads per worker), the code works.
(node 0 is scheduler, node 1 is for mpi, nodes 2-5 are for worker.)
# for 6 nodes version
import os
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
import joblib
from joblib import Parallel, delayed
import dask_mpi as dm
from dask.distributed import Client, progress
dm.initialize(local_directory=os.getcwd(), nthreads=4, memory_limit=0.99)
client = Client()
from sklearn.datasets import make_regression
X, y = make_regression(n_samples = 100000, n_features = 50, random_state=1)
model = RandomForestRegressor(n_estimators=1000, oob_score=True,
random_state=1, max_features = 11, n_jobs=-1)
with joblib.parallel_backend("dask"): model.fit(X, y)
# SHAP
import dalex as dx
model_rf_exp = dx.Explainer(model, X, y, label = "RF Pipeline")
def singleSHAPprocess(obs_num):
test_obs = X[obs_num:obs_num+1,:]
shap_test = model_rf_exp.predict_parts(test_obs, type = 'shap',
B = 5, N = 5000)
result = shap_test.result[shap_test.result.B == 0]
result = result[['contribution', 'variable_name']]
result = result.transpose()
result = result.rename(columns=result.iloc[1])
result = result.drop(['variable_name'], axis=0)
result = result.reset_index(drop=True)
return result
with joblib.parallel_backend('dask'):
results_bag = joblib.Parallel(n_jobs=16, verbose=100)(
joblib.delayed(singleSHAPprocess)(int(obs_num))
for obs_num in np.linspace(0, 1399, 1400))
Commend on Linux:
mpirun -np 6 -ppn 1 -machinefile ${PJM_O_NODEINF} -launcher-exec /bin/pjrsh python for6nodes.py
When I run the following code on 8 nodes, using 4 processes, and 24 threads (4 threads per worker), the code works.
Commend on Linux:
mpirun -np 8 -ppn 1 -machinefile ${PJM_O_NODEINF} -launcher-exec /bin/pjrsh python for6nodes.py
and
python code:
with joblib.parallel_backend('dask'):
results_bag = joblib.Parallel(n_jobs=24, verbose=100)(
joblib.delayed(singleSHAPprocess)(int(obs_num))
for obs_num in np.linspace(0, 1399, 1400))
The unmanaged memory warnings come, and the code just fails.
I only try to use more machines, but nothing changes in each worker.
So I wonder whether the memory warnings is from the scheduler.
Can I solve this problem by client.scatter([model_rf_exp, X])
?
Thank you for your time.