Unmanaged Memory of Scheduler Causes Failure

Hi, dask developers and experts,

Recently, I use dask to do the distributed computation but alway disturbed by the unmanaged memory (I guess). Since my HPC is non-interactive-mode, now the only things I know the latest output warning is always about the percentage of unmanaged memory, when the job lib.Parallel(n_jobs=24).

When I run the following code on 6 nodes, using 4 processes, and 16 threads (4 threads per worker), the code works.
(node 0 is scheduler, node 1 is for mpi, nodes 2-5 are for worker.)

# for 6 nodes version
import os
import pandas as pd
import numpy as np

from sklearn.ensemble import RandomForestRegressor
import joblib
from joblib import Parallel, delayed

import dask_mpi as dm
from dask.distributed import Client, progress

dm.initialize(local_directory=os.getcwd(),  nthreads=4, memory_limit=0.99)
client = Client()

from sklearn.datasets import make_regression
X, y = make_regression(n_samples = 100000, n_features = 50, random_state=1)

model = RandomForestRegressor(n_estimators=1000, oob_score=True, 
                               random_state=1, max_features = 11, n_jobs=-1)
with joblib.parallel_backend("dask"): model.fit(X, y)

# SHAP
import dalex as dx

model_rf_exp = dx.Explainer(model, X, y, label = "RF Pipeline")

def singleSHAPprocess(obs_num):
    test_obs = X[obs_num:obs_num+1,:]
    shap_test = model_rf_exp.predict_parts(test_obs, type = 'shap', 
                                           B = 5, N = 5000)
    result = shap_test.result[shap_test.result.B == 0]
    result = result[['contribution', 'variable_name']]
    result = result.transpose()
    result = result.rename(columns=result.iloc[1])
    result = result.drop(['variable_name'], axis=0)
    result = result.reset_index(drop=True)
    return result

with joblib.parallel_backend('dask'):
    results_bag = joblib.Parallel(n_jobs=16, verbose=100)(
        joblib.delayed(singleSHAPprocess)(int(obs_num))
        for obs_num in np.linspace(0, 1399, 1400))

Commend on Linux:

mpirun  -np 6  -ppn 1  -machinefile ${PJM_O_NODEINF}  -launcher-exec /bin/pjrsh python for6nodes.py

When I run the following code on 8 nodes, using 4 processes, and 24 threads (4 threads per worker), the code works.
Commend on Linux:

mpirun  -np 8  -ppn 1  -machinefile ${PJM_O_NODEINF}  -launcher-exec /bin/pjrsh python for6nodes.py

and
python code:

with joblib.parallel_backend('dask'):
    results_bag = joblib.Parallel(n_jobs=24, verbose=100)(
        joblib.delayed(singleSHAPprocess)(int(obs_num))
        for obs_num in np.linspace(0, 1399, 1400))

The unmanaged memory warnings come, and the code just fails.

I only try to use more machines, but nothing changes in each worker.
So I wonder whether the memory warnings is from the scheduler.

Can I solve this problem by client.scatter([model_rf_exp, X])?

Thank you for your time.

Add some supplementary information.
When using 4 workers and n_jobs=16, the warning message form the HPC is:

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 145.00 GiB -- Worker memory limit: 185.56 GiB

But when using 6 workers and n_jobs=24, the HPC returns LIMIT OVER MEMORY.

I clearly set in the dm.initialize(local_directory=os.getcwd(), nthreads=4, memory_limit=0.99). So, from my understanding every worker should have 4 threads. Is it right?

Or I just need to set joblib.Parallel(n_jobs=4, verbose=100), rather than joblib.Parallel(n_jobs=16, verbose=100)?

Thanks

Well, I will keep updating these questions to provide information, until I solve it or someone help me to solve.

Now, what I am believing (maybe wrong) are:

  1. with joblib.parallel_backend('dask'): joblib.Parallel(n_jobs=24), this n_jobs is the nthreads of each worker.
    In this way, I just need to have more workers to increase the computing speed. Right?

However, when I keep nothing changed, just an increase in the loop number causes a failure!
The last message from my HPC is:

distributed.utils_perf - INFO - full garbage collection released 31.68 MiB from 920 reference cycles (threshold: 9.54 MiB)

Failure reason is still LIMIT OVER MEMOR. However, I do not know the memory of what, worker? scheduler?

Hi @MichaelChaoLi-cpu,

So you’re mixing Joblib and Dask, which is a good option when using sklearn models (so for the part where you train the ML Model:

However, this is a bit more unusual when just distributing work

And indeed, you’re having trouble understanding how Joblib interacts with Dask. Joblib just submit its jobs on Dask cluster. Each Joblib job will use one core of a Dask cluster.

No. As you can read in joblib.Parallel — joblib 1.3.0.dev0 documentation, n_jobs is The maximum number of concurrently running jobs. I’d say you should always set it to -1 when using Dask, meanings: just use all my Dask cluster cores. You shouldn’t have specific code for 6 or 8 nodes.

I didn’t take the time to analyze well your code, so it’s difficult here to get what could be using a lot of memory. I don’t know what an Explainer is and what is does.

Dask is complaining that there are 145gb of memory it knows nothing about. That’s an enormous amount; a healthy worker will typically have 0.5~2gb. Either the tasks you’re submitting are allocating 35gb+ worth of heap each (4 threads per worker = 4 tasks running in parallel), in which case you have to break down your problem more, or you have a severe memory leak in your code or in the libraries you’re using.

Thank you, @crusaderky

After I read @guillaumeeb 's comments, I guess I know what happen to my calculation.
In dm.initialize(local_directory=os.getcwd(), nthreads=4, memory_limit=0.99), I set the nthreads=4, which means each worker has 4 cores. However, in the joblib.Parallel(n_jobs=24, verbose=100), the memory will divide into 24 parts and only 4 parts are directly used. Therefore, only 1/6 of the total memory is used, around 40GB.

In the next step, I will try to set joblib.Parallel(n_jobs=-1, verbose=100) first to see what will happen.
Then I will try to increase the dm.initialize(nthreads) to make the job more efficient.

Do you think my guess is right?

Thank you.

Thank you @guillaumeeb

After reading your comments, I seem to know how to make the job more efficient.

I will try to set the joblib.Parallel(n_jobs=24) and

in the next step.

Any new information, I will write it in this log.

Thank you, sir.

I don’t think this is true. Joblib with Dask backend will just use the slots available in your Dask cluster. Here, up to 24 threads across all the cluster. It won’t change the memory per thread.

But as @crusaderky said, you should try to understand why there is so much memory used, and so much unmanaged memory.

Thank you, trying to understand where the memory used is the things I am doing.

When I run the code on PC with joblib threading backend, the program consumes less memory, no more than 30 GB. And the speed is almost the same as the HPC with 8 nodes, which shocks me.

During the test, I find another thing. The speed is significantly faster than the PC when I use one node with 36 cores and 180 GB to run this code:

results_bag = joblib.Parallel(n_jobs=-1, verbose=20000, backend="multiprocessing")(
    joblib.delayed(singleSHAPprocess)(int(obs_num))
    for obs_num in np.linspace(0, 9999, 10000))

And there is no memory error.

I think I do a lot of things wrong to use dask. Sir, I read a lot of things on the dask website. But I hardly find the example codes to use dask to parallelize the loops. Unfortunately, I am not a computer science major, so sometimes it is really hard for me to find the necessary information. However, because of you guys’ help, I am able to solve some problems.

Here is the issue: I want to parallelize this loop and use more cores in the HPC:

Do you have any suggestions? Thank you.

Hi, experts. @guillaumeeb and @crusaderky.

Always thanks for your kindness. I still do not know where the unmanaged memory goes. However, the dask_ml.model_selection.GridSearchCV inspires me. When we use GridSearchCV, the code is like this maybe:

from sklearn.ensemble import RandomForestRegressor
base_estimator = RandomForestRegressor(oob_score=True, random_state=1,
                                       n_estimators = 1000, n_jobs=36)

from dask_ml.model_selection import GridSearchCV
search = GridSearchCV(base_estimator, param_grid, n_jobs=14, cv=10)
search.fit(X, y)

This part base_estimator = RandomForestRegressor(oob_score=True, random_state=1, n_estimators = 1000, n_jobs=36) would run within a worker, and the parallel backend may be multiprocessing. The n_jobs=36 is the core within a worker, right?

Then, in search = GridSearchCV(base_estimator, param_grid, n_jobs=14, cv=10), n_jobs is the number of workers, this part’s parallel backend is dask. Since I am not a CS-related major people, can I understand the structure like this?

If this is right, this part will not work:

I should rewrite them into two parts, within-worker and between-workers:

# within-worker
def multiprocessingLayer(loop_time):
    results_bag = joblib.Parallel(n_jobs=36, backend="multiprocessing")(
        joblib.delayed(singleSHAPprocess)(obs_num)
        for obs_num in list(range(1000*loop_time, 1000*(loop_time+1), 1)))
    return results_bag

# between-workers
with joblib.parallel_backend('dask'):
    results_bag_dask = joblib.Parallel(n_jobs=-1, verbose=100)(
        joblib.delayed(multiprocessingLayer)(int(loop_time))
        for loop_time in list(range(6)))

In the within-part, they might be backend="multiprocessing", like running the code on a single node. Between-workers part relies on dask.

Does this make sense?

Sorry, sir. I have very limited HPC resources, and still waiting in line. If you have any comments, I really appreciate it.

I don’t recommend mix-n-matching dask multiprocessing and dask distributed. It will mess with dask distributed’s load balancing, cost you a lot more memory overall (because all data needs to travel through the main process), and not give you any benefit to begin with if you have so many processors.

You must split your problem in smaller chunks. Your current chunks require 35+ GB heap each which is horrendous; use instead many more chunks that consume no more than ~4GB each (that’s 5% of your target threshold of 140GB * 60%). Then experiment reducing further; normally it’s very healthy to have 100~250MB chunks.

I set the nthreads=4 , which means each worker has 4 cores. However, in the joblib.Parallel(n_jobs=24, verbose=100) , the memory will divide into 24 parts and only 4 parts are directly used. Therefore, only 1/6 of the total memory is used, around 40GB.

That’s not how it works.
If you have 4 threads per worker and 24 chunks, your grand total computation will be split in 24 parts. Then, 4 parts at a time will be fed to the workers. When one is done, another is picked from the queue, thus maximising both memory efficiency and CPU usage.

Change n_jobs=24 to 1000.

Hi, Thank you for your reply.

It really does not work:

Another question is that when I use backend="multiprocessing" and n_jobs=36 on a single node, it seems to get 36 results at a time without any memory error.

However, when we come to dask, if I set dm.initialize(local_directory=os.getcwd(), nthreads=36, memory_limit=0), the code is always killed by LIMIT OVER MEMORY. Here, the nthreads should be no more than 4. Why is that?

Could you provide some hints? Thank you very much.

I fully approve @crusaderky, don’t try to mix Dask multiprocessing and Dask on a distributed cluster. I would say more, if you can, don’t mix joblib with Dask. The case of model fitting with sklearn should be the only exception, because sklearn needs joblib to talk to Dask.

However, in the second part of your code, I’ll just don’t use joblib, so instead of

with joblib.parallel_backend('dask'):
    results_bag = joblib.Parallel(n_jobs=16, verbose=100)(
        joblib.delayed(singleSHAPprocess)(int(obs_num))
        for obs_num in np.linspace(0, 1399, 1400))

I would go with something like:

delayed_results = [dask.delayed(singleSHAPprocess)(int(obs_num))  for obs_num in np.linspace(0, 1399, 1400))]
results = client.compute(*delayed_results)

See Embarrassingly parallel Workloads — Dask Examples documentation for some more details examples as the second part of your code looks embarrassingly parallel.

The way you launch your Dask cluster with dask-mpi should give you 4 (or 6) workers processes with 4 threads each, so up to 16 (or 24) tasks in parallel for Dask. np.linspace(0, 1399, 1400) gives you 1400 tasks, so Dask should be processing 24 tasks at a time. I don’t really now what each call to singleSHAPprocess does, but you should use the Dashboard to see if each call is consuming too much memory. Or maybe just profile a single code to this function.

In this case, you launch 36 calls of singleSHAPprocess at a time, so more than with your Dask cluster. And you don’t run into memory problem, which sound strange. Could you share the job script you used on top of the mpirun call?

Hi, thank you.

Correction:

This code does NOT work. I am so sorry.

They should be:

from sklearn.ensemble import RandomForestRegressor
base_estimator = RandomForestRegressor(oob_score=True, random_state=1,
                                       n_estimators = 1000) ### here, n_jobs should be default.

from dask_ml.model_selection import GridSearchCV
search = GridSearchCV(base_estimator, param_grid, n_jobs=-1, cv=10)
search.fit(X, y)

Several days before, I have already rewritten the code into this style:

To further improve the performance, I scatter the data to workers.
Now, the code is as follows:

X_scattered = client.scatter(X)
model_rf_exp_scattered = client.scatter(model_rf_exp)

def singleSHAPprocess(obs_num, X, model_rf_exp):
    test_obs = X[obs_num:obs_num+1,:]
    shap_test = model_rf_exp.predict_parts(test_obs, type = 'shap', 
                                           B = 10, N = 900)
    result = shap_test.result[shap_test.result.B == 0]
    result = result[['contribution', 'variable_name']]
    result = result.transpose()
    result = result.rename(columns=result.iloc[1])
    result = result.drop(['variable_name'], axis=0)
    result = result.reset_index(drop=True)
    return result

results = []
for obs_num in list(range(1000)):
    results.append(dask.delayed(singleSHAPprocess)(obs_num, X_scattered, model_rf_exp_scattered))

test_result = dask.compute(*results) 

I test this code on local PC (windows 11) and use the dashboard. I find when the workers begin to work the memory taken reaches around 20GB, but when they run stably the memory of one worker does not exceed 8GB anymore.

Therefore I guess spilled memory is at most 20GB. Just in case, I initialize as follows:

dm.initialize(local_directory=os.getcwd(),  nthreads=1, memory_limit=0.1666)

That means every node build six worked and every one has around 30GB.

The job script is as follows:

#!/bin/bash
#PJM -L "rscunit=ito-a"
#PJM -L "rscgrp=ito-s"
#PJM -L "vnode=4"
#PJM -L "vnode-core=36"
#PJM -L "elapse=24:00:00"
#PJM -j
#PJM -X
module use /home/exp/modulefiles
module load gcc/10.2.0
mpirun  -np 24 -ppn 6  -machinefile ${PJM_O_NODEINF}  -launcher-exec /bin/pjrsh python 05_TE_DaskDalex_4nodes6workers_v0.py

However, I get LIMIT OVER MEMOR information and the code died.
I read the report, and find this:

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 45.07 GiB -- Worker memory limit: 31.23 GiB

I am wondering why the amounts of unmanaged memory are so different on the local PC and remote cluster.

Then I have reduce the worker numbers per node to three:

dm.initialize(local_directory=os.getcwd(),  nthreads=1, memory_limit=0.3333)

And the job script:

#!/bin/bash
#PJM -L "rscunit=ito-a"
#PJM -L "rscgrp=ito-s"
#PJM -L "vnode=4"
#PJM -L "vnode-core=36"
#PJM -L "elapse=24:00:00"
#PJM -j
#PJM -X
module use /home/exp/modulefiles
module load gcc/10.2.0
mpirun  -np 12 -ppn 3  -machinefile ${PJM_O_NODEINF}  -launcher-exec /bin/pjrsh python 05_TE_DaskDalex_4nodes6workers_v0.py

This script can run, but slower than

Today, I use 16 nodes to build 48 works to do the test.
I hope it works.
Thank you for reading my long log.

Hi, @guillaumeeb and @crusaderky

Thanks for your help. Yesterday, I use 16 nodes with 48 processes to run the code. The same code is three times faster than a single node. However, once the allocated memory is lower than 60GB, the code just fails. As I have mentioned on the local PC, the memory consumed is not so much when the code runs stably.

A further increase in the processes per worker would reduce consumed time. I want to ask whether there is a method to open dask DashBoard of remote HPC on local PC.

I searched a lot, only this consistent:
https://stackoverflow.com/questions/53999065/how-to-view-dask-dashboard-when-running-on-a-virtual-machine

Is there any official tutorial?

Thank you.

Best

When yo run this on your PC, how many workers do you have? How many threads per worker? How much memory in total and per worker?

This really depends also on how you launched you cluster with dask-mpi.

This means you launch 24 mpi processes, 6 by nodes on 4 nodes. And with the dask-mpi usage above, I agree that every process will have only one thread and a sixth of the memory of the given node.

But in the jobscript you don’t specify anywhere the amount of memory you want. I guess that since you ask for 4 vnodes, you have complete nodes, so the amount of memory you said.

Is this LIMIT_OVER_MEMORY output given by Dask or by your job scheduling system?

Maybe when you run one one node you use all the 36 cores as Dask threads, so up to 36 tasks in parallel.

You could try to use the following options, in your jobscript:

...
#PJM -L "vnode=2"
#PJM -L "vnode-core=36"
...

mpirun  -np 2 -ppn 1

And in Python:

dm.initialize(local_directory=os.getcwd(),  nthreads=36, memory_limit=1.0)

To have one process per node using all the available cores and memory.

For that, you might want to ask your HPC system admins for help. Be sure that you book the correct amount of memory in your job script, and ask them how to start a web browser an access a web service launched on a compute node.

You might be interested by:
https://pangeo.io/setup_guides/hpc.html

I have two workers and every worker has one thread. In total, I have 64GB, and every worker has 32GB locally.

This is from job scheduling system.

I am trying this code. However, I wonder whether two processes are okay. I remember that one process is for scheduler, and another one is for mpi in dask. So, maybe I need three nodes. Anyway, I will test your code first.

Thank you. I will contact with sysadmin to use dashboard.

Do you use LocalCluster to test locally? Only 2 processes with one thread each should take a long time!

You’re right, the code I gave will only launch one worker (my mistake). But it can also be good to test threads/memory settings at small scale.

That might be an important point: what is your scheduling system expecting in term of memory limit?