How ot train a large sklearn.ensemble.RandomForestRegressor on multi-nodes HPC

Thank you.
I am training a sklearn.ensemble.RandomForestRegressor

from sklearn.datasets import make_regression
X, y = make_regression(n_samples = 6300000, n_features = 55, random_state=1) ### size is 6,300,000, huge!

from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor(n_estimators=1000, oob_score=True, random_state=1, n_jobs=36)
model.fit(X, y)

I tried it on one node (180GB memory, 36 cores) of a cluster (I have 16 these nodes in this cluster), but it failed. However, if I use the node with 360GB and 36 cores, it can run smoothly (This is a different type and I just have one).
So. memory is the limitation. Furthermore, training the model on one node with 36 cores is slow, even though the training works on the 360GB node. Of course, I hope that I can use all the cores.

I try to use dask on the HPC:

import joblib
from dask.distributed import Client, progress

from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor

X, y = make_regression(n_samples = 50000, n_features = 20, random_state=1)
client = Client(processes=False, threads_per_worker=36, n_workers=4, memory_limit='500GB') 
model = RandomForestRegressor(n_estimators=1000, oob_score=True, random_state=1)
with joblib.parallel_backend('dask'): model.fit(X, y)

This reprex return error:

Traceback (most recent call last):
  File "/home/app/intel/intel2020_up1/intelpython3/lib/python3.7/site-packages/joblib/parallel.py", line 63, in _register_dask
    from ._dask import DaskDistributedBackend
  File "/home/app/intel/intel2020_up1/intelpython3/lib/python3.7/site-packages/joblib/_dask.py", line 18, in <module>
    from distributed.utils import funcname, itemgetter
ImportError: cannot import name 'itemgetter' from 'distributed.utils' (/home/usr6/q70176a/.local/lib/python3.7/site-packages/distributed/utils.py)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "test.py", line 51, in <module>
    with joblib.parallel_backend('dask'): model.fit(X, y)
  File "/home/app/intel/intel2020_up1/intelpython3/lib/python3.7/site-packages/joblib/parallel.py", line 171, in __init__
    register()
  File "/home/app/intel/intel2020_up1/intelpython3/lib/python3.7/site-packages/joblib/parallel.py", line 70, in _register_dask
    raise ImportError(msg)
ImportError: To use the dask.distributed backend you must install both the `dask` and distributed modules.

See https://dask.pydata.org/en/latest/install.html for more information.

I refer to Scikit-Learn & Joblib — dask-ml 2022.5.28 documentation
How can I run this smoothly?
Thank you!

Hi @MichaelChaoLi-cpu,

The below error seems to indicate that you don’t have distributed package installed in your Python environment. Can you check that?

I think then, you need to pass more realistic arguments to the Client constructor. Considering your node configuration either:

client = Client(processes=False, threads_per_worker=36, memory_limit='360GB') 

or

client = Client(n_workers=4, threads_per_worker=9, memory_limit='90GB') 
1 Like

Hi @guillaumeeb

Thank you for answering my questions.

After I install anaconda and reinstall the packages, it works. Thanks.

Now, I met another problem: unable to allocate memory to each worker. I run this on linux with four nodes (each node has 36 cores and 180GB memory):

mpirun  -np 36  -ppn 9  -machinefile ${PJM_O_NODEINF}  -launcher-exec /bin/pjrsh python testEG.py

And in the python script:

from dask.distributed import Client, progress
client = Client()
model = RandomForestRegressor(n_estimators=1000, oob_score=True, random_state=1)
with joblib.parallel_backend("dask"): model.fit(X, y)

The speed of this code should be a little bit slower than the calculation on one node with 36 cores, due to communication time. However this code is still four times faster than the calculation on one node.

In the log:

......
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   5.20 GiB
distributed.worker - INFO -       Local Directory: /home/usr6/q70176a/dask-worker-space/worker-c4p61e6q
distributed.worker - INFO - -------------------------------------------------
......

That each thread has only 5.20GB memory. 36 thread in total around 180GB.
Or there are 144 threads on the way in fact, and they use 180*4 GB.

I try other settings in the code, such as:

from dask.distributed import Client, progress, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=9, memory_limit='700GB')
client = Client(cluster)

However, this directly leads the program to fail.

Do you have any idea about this problem?

Thank you

Hi @guillaumeeb
Sorry, ignore the above information.
I forget to set the dask-mpi.core.initialize()'s arguments.
Sorry. Now everything works.

Thank you.

1 Like

Glad you fixed the issue.

I was not sure at first that you were trying to use dask-mpi or not. I just discovered that with dask-mpi initialize() method, you can declare a Client with no argument. Nice!

Hi @guillaumeeb

Thank you! Everything works, now!