Dask distributed with very large queue

Dear group members

Previously, I asked a question here and the solution works well.

Now, I’m working on a huge dataset. I use 50 slurm jobs, each of them 5 hours.

cluster = SLURMCluster(cores=1, processes=1, memory=str("20GB"), walltime= "05:00:00")
cluster.scale(50) 

So my queue size is 50. I don’t have multi-threading.I submit new jobs and gather them in my code. The output of each function is written in a pickle file and this is being read in another function based on the dependency graph. So, I don’t use that much memory among nodes.

When I have 20 inputs and for each of them I use dask.submit for 300 tasks (which depend on each other in the my function infer_hogs_for_rhog_levels_recursively_future ), so I will have 600 jobs, but because of dependency, some of them are not computed at first, but gradually all of them finish. This works well in around one hour thanks to Dask.

But when I have 1000 inputs, each of them needs 300 dask.submit. Dask start working and after 30 mins or 1 hour, many of tasks got erred.
(My final goal is to run the code for 50,000 inputs each of them 900 tasks.)

The slurm output of the node which main.py is running

2022-10-28 14:51:24 INFO     python code -  dask started
2022-10-28 15:17:31,147 - distributed.scheduler - ERROR - Couldn't gather keys {'infer_hogs_for_rhog_levels_recursively_future-c7e6b31e27dcba1e87e43d898aba9c49': ['tcp://10.203.101.102:35255']} state: ['memory'] workers: ['tcp://10.203.101.102:35255']
NoneType: None
2022-10-28 15:17:31,912 - distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://10.203.101.102:35255'], infer_hogs_for_rhog_levels_recursively_future-c7e6b31e27dcba1e87e43d898aba9c49
NoneType: None
2022-10-28 15:18:39,674 - distributed.scheduler - ERROR - Couldn't gather keys {'infer_hogs_for_rhog_levels_recursively_future-94108266e29785ca7cff8a9b2f221054': []} state: ['processing'] workers: []
NoneType: None
2022-10-28 15:18:39,674 - distributed.scheduler - ERROR - Workers don't have promised key: [], infer_hogs_for_rhog_levels_recursively_future-94108266e29785ca7cff8a9b2f221054
NoneType: None
2022-10-28 15:18:39,674 - distributed.scheduler - ERROR - Couldn't gather keys {'infer_hogs_for_rhog_levels_recursively_future-122e1af610164acb21a2749a4011c89f': []} state: ['processing'] workers: []
NoneType: None
2022-10-28 15:18:39,674 - distributed.scheduler - ERROR - Workers don't have promised key: [], infer_hogs_for_rhog_levels_recursively_future-122e1af610164acb21a2749a4011c89f
NoneType: None

The slurmout put of one of computation node

2022-10-28 14:51:20,449 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.203.101.102:43799'
2022-10-28 14:51:21,369 - distributed.diskutils - INFO - Found stale lock file and directory '/work/FAC/FBM/DBC/cdessim2/default/smajidi1/fastget/bird_ho
g/gethog3_27oct/dask-worker-space/worker-ccsaoycd', purging
2022-10-28 14:51:24,536 - distributed.worker - INFO -       Start worker at: tcp://10.203.101.102:35255
2022-10-28 14:51:24,536 - distributed.worker - INFO -          Listening to: tcp://10.203.101.102:35255
2022-10-28 14:51:24,536 - distributed.worker - INFO -          dashboard at:       10.203.101.102:35613
2022-10-28 14:51:24,536 - distributed.worker - INFO - Waiting to connect to: tcp://10.203.101.150:35647
2022-10-28 14:51:24,536 - distributed.worker - INFO - -------------------------------------------------
2022-10-28 14:51:24,536 - distributed.worker - INFO -               Threads:                          1
2022-10-28 14:51:24,536 - distributed.worker - INFO -                Memory:                  83.82 GiB
2022-10-28 14:51:24,537 - distributed.worker - INFO -       Local Directory: /work/FAC/FBM/DBC/cdessim2/default/smajidi1/fastget/bird_hog/gethog3_27oct/dask-worker-space/worker-_eqjkonw
2022-10-28 14:51:24,537 - distributed.worker - INFO - -------------------------------------------------
2022-10-28 14:51:24,552 - distributed.worker - INFO -         Registered to: tcp://10.203.101.150:35647
2022-10-28 14:51:24,552 - distributed.worker - INFO - -------------------------------------------------
2022-10-28 14:51:24,553 - distributed.core - INFO - Starting established connection
2022-10-28 14:51:29,321 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.46s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
/work/miniconda3/lib/python3.8/subprocess.py:844: RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used
  self.stdout = io.open(c2pread, 'rb', bufsize)
/work/miniconda3/lib/python3.8/subprocess.py:849: RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used
  self.stderr = io.open(errread, 'rb', bufsize)
2022-10-28 15:08:25,493 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.00s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2022-10-28 15:11:00,914 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.70s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2022-10-28 15:13:31,653 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.01s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

The output of each task is an integer variable, which of course is not large data, however, I open and close files of few MB during the code.

I guess I can change some config from this.

I would appreciate it if you can help on this.
Regards,
Sina

Hi @smajidian,

It is a bit hard to tell what could be the problem without a Minimum Working Example, or at least some code showing what you’re trying to do.

One suggestion: do you really need to use Future? Couldn’t you use Delayed instead? It could simplify the problem a bit.

1000 inputs x 300 tasks, so 300 000 tasks should be fine for Dask.

50 000 x 900, 45 millions, will be much harder.

Dear @guillaumeeb, thanks for your response. Since, I have two levels of parralelization, I think I cannot use delayed.

Sorry the question was too abstract. This is a simplified version of my code. (sorry it’s not a working example but I hope it provides a a general idea of what I’m doing.).

def func1(input1, tree_node, input2):
    client = get_client()
    secede()
    out_future = client.submit(func_recursive, tree_node, input2)
    output = out_future.result()
   rejoin()
    return output

def func_recursive(tree_node, input2):  # traversing a tree
    if tree_node.is_leaf():
        return function3(tree_node, input2)  # once I have the result of leaves, I can compute towards the root of the tree 
    else:
        children_nodes = tree_node.children
    client = get_client()
    secede()
    out_futures = client.map(func_recursive, children_nodes, [input2] * len(children_nodes))
    out = client.gather(out_futures)
    rejoin()

    output = function4(out)  # the real computation is here
    return output

dask_out_list =[]
for input1 in input1_list:
    dask_out = client_dask.submit(func1, input1, tree_node, input2)
    dask_out_list.append(dask_out)

for dask_out in dask_out_list:
    final = dask_out.result()

The size of input1_list is 1000 ( or in future 50,000). The goal of func_recursive is to traverse a tree (graph theory) and the computation starts from the leaves. The computation of each internal depends on the output for the children nodes.

Thank you in advance.

I had forgotten that your use case was so complex…

I have two questions coming to my mind:

  1. Why in the first place do you use the func1 wrapper around func_recursive? Couldn’t you just submit func_recursive in the for loop?
  2. I wonder if the first level of parallelization isn’t enough? Couldn’t you just parallelize the 1000 calls (then 50 000!!) of the recursive function, and then do the recursive thing into the current Python process or thread, without submitting tasks to Dask?

Dear @guillaumeeb, I appreciate your time on this issue.
Regarding 1) The func1 function does some other stuff and write some output in a file which is then used (read) by function4 inside func_recursive.
Regarding 2). That’s a good point but the multi-threading is limited to number of cpus in a computation nodes which is 48 in our institution. Another thing is that if I request for ~40 cpu from slurm, I should wait in a quite long queue to have a complete free node and start my job. Otherwise, slurm can find each cpu in different computation nodes.

Regards, Sina.

What I meant here is can’t you just run the recursive part sequentially on one thread, and just parallelize on the first level of the input list? If you have between 1000 and 50 000 inputs, this would already speed up the computation time, and I’m not sure you can have Dask clusters with more than 1000 cores?

Let’s say you can book 200 cores at a time, with 1000 tasks to parallelize, your workload should already be pretty goodly distributed (200 calls to func1 processed at a time)? Often, trying to parallelize too deeply can be counter productive, but there may be something I didn’t get about your problem.