My code works with LocalCluster but, not with SLURMCluster

Hi all

Thanks for this active community of Dask user!

I was able to run the Fibonacci example fib(n) in the dask website using future and slurm.
Based on that, I’ve written a code which includes a recursive function for traversing a tree. I submit job inside the recursive function. So, I have two levels of parallelisation. My code works well with LocalCluster in one minute for the test dataset. But when I use SLURMCluster and check with dashboard, it gets stuck at 0/3.

cluster = SLURMCluster(cores=1, processes=1, memory="1GB", walltime="00:04:00")
n_jobs=3
cluster.scale(n_jobs)

and when I change n_jobs=4, the dashboard says 0/4. and it doesn’t move forward and no computation is done.
and when I set n_jobs=100, the code works well with slurm.
The issue is that when I use the whole dataset, 100 jobs is not enough and it gets stuck again.

Python 3.8.1
dask.__version__ '2022.05.0'

The output log of slurm is almost empty.

2022-08-22 20:10:04,682 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.203.101.168:33999'
2022-08-22 20:10:06,528 - distributed.worker - INFO -       Start worker at: tcp://10.203.101.168:45407
2022-08-22 20:10:06,529 - distributed.worker - INFO -          Listening to: tcp://10.203.101.168:45407
2022-08-22 20:10:06,529 - distributed.worker - INFO -          dashboard at:       10.203.101.168:43333
2022-08-22 20:10:06,529 - distributed.worker - INFO - Waiting to connect to: tcp://10.203.101.165:38517
2022-08-22 20:10:06,529 - distributed.worker - INFO - -------------------------------------------------
2022-08-22 20:10:06,529 - distributed.worker - INFO -               Threads:                          1
2022-08-22 20:10:06,529 - distributed.worker - INFO -                Memory:                   0.93 GiB
2022-08-22 20:10:06,529 - distributed.worker - INFO -       Local Directory: /work/projects/gethog3/dask-worker-space/worker-7o29_jhw
2022-08-22 20:10:06,529 - distributed.worker - INFO - -------------------------------------------------
2022-08-22 20:10:06,538 - distributed.worker - INFO -         Registered to: tcp://10.203.101.165:38517
2022-08-22 20:10:06,538 - distributed.worker - INFO - -------------------------------------------------
2022-08-22 20:10:06,538 - distributed.core - INFO - Starting established connection

This is the simplified version of my code. Sorry if is too much details.

dask_out_list =[]
for input1 in input1_list:
    dask_out = client_dask.submit(func1, input1, tree_node, input2)
    dask_out_list.append(dask_out)

for dask_out in dask_out_list:
    final = dask_out.result()
def func1(input1, tree_node, input2):
    client = get_client()
    out_future = client.submit(func_recursive, tree_node, input2)
    output = out_future.result()
    return output

def func_recursive(tree_node, input2):  # traversing a tree
   # when it stuck, the code doesn't even execute this line. 
    if tree_node.is_leaf():
        return function3(tree_node, input2)  # once I have the result of leaves, I can compute towards the root of the tree 
    else:
        children_nodes = tree_node.children
    client = get_client()
    out_futures = client.map(func_recursive, children_nodes, [input2] * len(children_nodes))
    out = client.gather(out_futures)

    output = function4(out)  # the real computation is here
    return output

( my impression as a beginner: I would like to have my recursive function continues until it reaches the point that the real computation is done, but the queue gets full and it doesn’t continue.

I would appreciate if you could give me a clue how to debug. The thing is that there is no error in the output. Thank you in advance. Regards, Sina!

Hi @smajidian,

Welcome on the forum!

First of all, you probably should try to see if you could simplify your problem and make less recursive code (or not at all). This is a tricky thing with Dask I think.

Your code version is already good, but it’s lacking some part for a reproducer (what is input_list? tree_node?).

To debug more, you should try with a LocalCluster configured according to Slurm, so with only 3 process. Something like LocalCluster(n_workers=3, threads_per_worker=1).

Then, you probably noticed the warning on the docs:

However, this can deadlock the scheduler if too many tasks request jobs at once. Each task does not communicate to the scheduler that they are waiting on results and are free to compute other tasks. This can deadlock the cluster if every scheduling slot is running a task and they all request more tasks.

To avoid this deadlocking issue we can use secede and rejoin. These functions will remove and rejoin the current task from the cluster respectively.

You probably want to look at this too. I think your guess at the end of your message is right.

2 Likes

Dear @guillaumeeb

I appreciate your fast and useful response. It helped me a lot and saved days/weeks of me. Your idea on having same config on both local and slurm is right; when I add threads_per_worker=1, the code didn’t run on LocalCluster either. Using secede() on both submit and map solved the issue (on that day, sorry for late reply).

Thanks again for your awesome help.
Best of luck, Sina.

1 Like

Can you share the modified code so I can see where you added secede?

I’ve added secede() before submit() within two functions and it worked. I didn’t use rejoin. Hope it helps in your case.

def func1(input1, tree_node, input2):
    client = get_client()
    secede()
    out_future = client.submit(func_recursive, tree_node, input2)
    output = out_future.result()
    return output

def func_recursive(tree_node, input2):  # traversing a tree
    if tree_node.is_leaf():
        return function3(tree_node, input2)  # once I have the result of leaves, I can compute towards the root of the tree 
    else:
        children_nodes = tree_node.children
    client = get_client()
    secede()
    out_futures = client.map(func_recursive, children_nodes, [input2] * len(children_nodes))
    out = client.gather(out_futures)

    output = function4(out)  # the real computation is here
    return output

dask_out_list =[]
for input1 in input1_list:
    dask_out = client_dask.submit(func1, input1, tree_node, input2)
    dask_out_list.append(dask_out)

for dask_out in dask_out_list:
    final = dask_out.result()

Just a note, it’s a simplified version of my code, so I didn’t run this code, the full code is in my github page.

1 Like