Parallel programming for dynamic programming on a nice tree decomposition?

I am trying to implement an algorithm in Sage/Python for counting graph homomorphisms from a graph G to a graph H, with dynamic programming on a nice tree decomposition.

I have completed this algorithm, and now I wish to parallelise this program with Dask (I have tried concurrent.futures but there were some pickle issues). It should be noted that for dynamic programming on a nice tree decomposition, we would have some kinds of dependency, that is, we can only start compute the result of a parent node, after we have the result(s) of its child(ren) node(s).

Parallel with Dask:

def process_node(self, node):
    node_type = self.dir_labelled_TD.get_vertex(node)
    match node_type:
        case 'intro':
            result = self._add_intro_node_parallel(node)
        case 'forget':
            result = self._add_forget_node_parallel(node)
        case 'join':
            result = self._add_join_node_parallel(node)
        case _:
            result = self._add_leaf_node_parallel(node)

    node_index = get_node_index(node)
    self.DP_table[node_index] = result
    return result

def count_homomorphisms_parallel(self):
    # Dictionary to store all futures/promises
    self.futures = {}
    for node in reversed(self.dir_labelled_TD.vertices()):
        # Delaying each node process and storing in futures
        self.futures[node] = self.process_node(node)

    print("Futures: ", self.futures)

    # Compute all results, respecting the inherent dependencies among them
    results = dask.compute(*self.futures.values())
    print("Results: ", [f.compute() for f in results])
    return self.DP_table[0][0]

When I am trying to run this program in the notebook, with the following example

par_counter = ParallelGraphHomomorphismCounter(graph, three_grid)
par_count = par_counter.count_homomorphisms_parallel()
print(par_count)

The output is the following. I am relatively new to concurrency (in Python), and have browsed relevant documentations, but still could not figure it out.

So I was wondering if you have some suggestions or ideas, thank you for your time! If you are curious, please see the end for the non-parallel version.

Futures:  {(6, {}): Delayed('process_node-0b571dcd-00e5-4871-871a-ef52e16b4ffb'), (5, {2}): Delayed('process_node-0fbc0886-3368-4d0e-8b09-751cce606ffe'), (4, {0, 2}): Delayed('process_node-0187a2da-aba7-42f7-83ab-497f62ea6b1f'), (3, {0}): Delayed('process_node-18729eea-99f4-45ee-af15-0de45395f181'), (2, {0, 1}): Delayed('process_node-8c85c333-301d-4e49-b9d4-c01bc20c05ae'), (1, {0}): Delayed('process_node-7e528db6-4636-4b31-8eb1-6f807ac32627'), (0, {}): Delayed('process_node-6bb90670-11b4-4e40-b7bc-cefb2fef6479')}

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In [3], line 28
     23 # colour_counter = GraphHomomorphismCounter(square, three_grid, 2, square_clr, three_grid_clr, colourful=True)
     24 # colourful_count = colour_counter.count_homomorphisms_best()
     25 # print(colourful_count)
     27 par_counter = ParallelGraphHomomorphismCounter(graph, three_grid)
---> 28 par_count = par_counter.count_homomorphisms_parallel()
     29 print(par_count)

File ~/github/local-hom-count/local_hom_count_best_parallel.py:118, in ParallelGraphHomomorphismCounter.count_homomorphisms_parallel(self)
    116 # Compute all results, respecting the inherent dependencies among them
    117 results = dask.compute(*self.futures.values())
--> 118 print("Results: ", [f.compute() for f in results])
    119 # Since the results are integrated into the DP_table in each process_node call,
    120 # you can simply access the final result:
    121 return self.DP_table[0][0]

File ~/github/local-hom-count/local_hom_count_best_parallel.py:118, in <listcomp>(.0)
    116 # Compute all results, respecting the inherent dependencies among them
    117 results = dask.compute(*self.futures.values())
--> 118 print("Results: ", [f.compute() for f in results])
    119 # Since the results are integrated into the DP_table in each process_node call,
    120 # you can simply access the final result:
    121 return self.DP_table[0][0]

File ~/.sage/local/lib/python3.11/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File ~/.sage/local/lib/python3.11/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/github/local-hom-count/local_hom_count_best_parallel.py:239, in ParallelGraphHomomorphismCounter._add_intro_node_parallel(self, node)
    235 child_DP_entry = self.DP_table[child_node_index]
    236 # print("INTRO child DP entry: ", child_DP_entry)
    237 # print("\n")
--> 239 for mapped in range(len(child_DP_entry)):
    240     # Neighborhood of the mapped vertices of intro vertex in the target graph
    241     mapped_intro_nbhs = [extract_bag_vertex(mapped, vtx, self.actual_target_size) for vtx in intro_vtx_nbhs]
    242     # print("mapped: ", mapped)
    243     # print("mapped nbhs in target: ", mapped_intro_nbhs)

File ~/.sage/local/lib/python3.11/site-packages/dask/delayed.py:635, in Delayed.__len__(self)
    633 def __len__(self):
    634     if self._length is None:
--> 635         raise TypeError("Delayed objects of unspecified length have no len()")
    636     return self._length

TypeError: Delayed objects of unspecified length have no len()

Non-parallel:

def count_homomorphisms_best(self):
    r"""
    Return the number of homomorphisms from the graph `G` to the graph `H`.

    A homomorphism from a graph `G` to a graph `H` is a function
    `\varphi : V(G) \mapsto V(H)`, such that for any edge `uv \in E(G)` the
    pair `\varphi(u) \varphi(v)` is an edge of `H`.

    For more information, see the :wikipedia:`Graph_homomorphism`.

    ALGORITHM:

    This is an implementation based on the proof of Prop. 1.6 in [CDM2017]_.

    OUTPUT:

    - an integer, the number of homomorphisms from `graph` to `target_graph`

    EXAMPLES::

        sage: graph = graphs.CompleteBipartiteGraph(1, 4)
        sage: target_graph = graphs.CompleteGraph(4)
        sage: from sage.graphs.hom_count_best import count_homomorphisms_best
        sage: count_homomorphisms_best(graph, target_graph)
        324
    """
    # Whether it's BFS or DFS, every node below join node(s) would be
    # computed first, so we can safely go bottom-up.
    for node in reversed(self.dir_labelled_TD.vertices()):
        node_type = self.dir_labelled_TD.get_vertex(node)
        # print("\nNode: ", node, node_type)

        match node_type:
            case 'intro':
                self._add_intro_node_best(node)
            case 'forget':
                self._add_forget_node_best(node)
            case 'join':
                self._add_join_node_best(node)

            case _: 
                self._add_leaf_node_best(node)

    return self.DP_table[0][0]

Hi @hedgehog0, welcome to Dask Discourse Forum!

It seems you are using Delayed inside Delayed functions, which in general is not good. I cannot tell for sure because I see no Delayed API calls or decorator in your provided code, but since you are calling compute twice, and seeing the error, this is probably the cause, or at least the way you are building your graph.

Seeing the stack trace, it seems you’ve not provided all the part of your code (*parallel functions).

The best way for us to help you would be to provide a minimal reproducible example.

Hi @guillaumeeb , thank you for the suggestion!

I also looked into the best practices and now have a better idea of how to handle this algorithm.

However, as I want to follow the best practices, I do have one question: It’s mentioned that we should avoid and should not rely on global states. But in our case, the DP (dynamic programming) table is a “global state” as self.DP_table:

class ParallelGraphHomomorphismCounter:
    def __init__(self, graph, target_graph, density_threshold=0.5, graph_clr=None, target_clr=None, colourful=False):
        # everything else
        self.DP_table = [{} for _ in range(len(self.dir_labelled_TD))]

        def process_node(self, node):
            pass

        # everything else

In this new version, process_node would not have @delayed and each *_parallel functions would have, e.g.:

    @delayed
    def _add_leaf_node_parallel(self, node):
        # computation
        return result

So I was wondering if you could suggest me how I should pass the DP table around, as in my non-parallel code, each _add_* function would modify self.DP_table.

Thank you very much! (For the code, if you do not mind, I invited you on GitHub; Please see local_hom_count_best_parallel.py. Merci!)

Well, you can’t, or it could be really tricky. This is probably the problem when using concurrent.futures, and you might run into this with Dask if you use multiprocessing Scheduler or distributed.

The question is, do you really need to share this attribute between functions? Are the functions processing nodes getting values updated by other functions?

I tried to look at the code, but it’s not so easy getting into it. Could you just submit functions and update the DP_table on your main process side as results are being computed?

There are some lines of code that I should have removed. In the latest version, concurrent.futures, multiprocessing Scheduler, and distributed are not used. I believe we only use dask.delayed and dask.compute.

In some sense, yes. Suppose we have a nice tree decomposition C --> B --> A, the results of computing C would be used for computing B. Similiarly, the results of computing B would be used to compute A. Each result of computing C/B/A is updated to the DP table (i.e. self.DP_table).

Thank you for the suggestion. I am relatively new to parallel programming, do you mean that each computing/processing process sends the result to the main process, which in turn updates the DP table? Can I do it with Dask?

I’m also thinking about some recursive approach. So that each computing result could feed into the next function/method

Jing
2024年5月3日 +0200 10:05 Guillaume Eynard-Bontemps via Dask Forum notifications@dask.discoursemail.com,写道:

Sure, but at one point if you want to go toward multi processing or distributed, you’ll be blocked. Multithreading in Python is bound by the GIL, if your code is pure Python this often means no parallelization at all.

Well this kind of graph of computation can be expressed by chaining Delayed call, I think you should be able to do it.

You can also do this using Distributed Futures: Launch Tasks from Tasks — Dask.distributed 2024.4.1 documentation.

I would delayed and compute automatically make the code parallel?

Thanks, will check out!

This enables it, but you rely on Python underneath, so if you use a multithreaded scheduler, you Python code might not run in parallel, just concurrently, which often mean sequentially if using pure Python.

Thank you, Guillaume! I have used recursion in my latest code and it now can correctly run without errors (e.g., creating Dask task graphs)! However, I now encounter a new issue: Imgur: The magic of the Internet

In the picture, Sequential time is obtained by running count_homomorphisms_best in local_hom_count_best.py and Parallel time by running count_homomorphisms_parallel(.compute()) in local_hom_count_best_parallel.py.

As you can see, even if the parallel version should have run faster, but it’s not and sometimes even slower than sequential version. In the following code, three_grid is a 3-times-3 grid graph, and H is a random graph of order 20.

Sequential:

counter = GraphHomomorphismCounter(three_grid, H)
%timeit -r 3 counter.count_homomorphisms_best()

2.87 s ± 172 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

Parallel:

par_counter = ParallelGraphHomomorphismCounter(three_grid, H)
par_count = par_counter.count_homomorphisms_parallel()
%timeit -r 3 par_count.compute()

2.59 s ± 45 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

You can see from here that they take roughly same time, however in the following with %prun magic, sequential one is siginificiantly slower, whereas the parallel is not so fast.

When running with Client(), from the dashboard it seems that _add_join_X function would take the most of time in the parallel version (I have experimented with either one, and the difference is not too siginificiant). So I was wondering if you could give me some hints on where I should look at? For instance, should I use Dask Arrays instead of Python lists in my code?

Thank you.

    def _add_join_node_parallel(self, left_child_result, right_child_result):
        """
        Add the join node to the DP table and update it accordingly.
        Assumes left_child_result and right_child_result are the computed results
        from the two child nodes.
        """
        mappings_count = [left * right for left, right in zip(left_child_result, right_child_result)]
        print("Left length: ", len(left_child_result))
        print("Right length: ", len(right_child_result))
        return mappings_count


    def _add_join_node_parallel(self, left_child_result, right_child_result):
        """
        Add the join node to the DP table and update it accordingly.
        Assumes left_child_result and right_child_result are the computed results
        from the two child nodes, provided as Python lists.
        """
        # Convert the Python lists to NumPy arrays
        np_left = np.array(left_child_result)
        np_right = np.array(right_child_result)

        # Create Dask arrays from NumPy arrays
        # You can adjust the chunk sizes to optimize performance
        dask_left = da.from_array(np_left, chunks=(5000))
        dask_right = da.from_array(np_right, chunks=(5000))

        # Perform element-wise multiplication using Dask
        result_dask = dask_left * dask_right

        # Compute the result to get a NumPy array back
        # Note: compute() triggers actual computation and should be used judiciously
        result_np = result_dask.compute()

        # Optionally, convert the result back to a Python list if needed
        result_list = result_np.tolist()

        return result_list

Running prun with sequential version:

counter = GraphHomomorphismCounter(three_grid, H)
%prun counter.count_homomorphisms_best()

 

         6488789 function calls (6488782 primitive calls) in 7.096 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   697260    1.503    0.000    4.377    0.000 help_functions.py:59(is_valid_mapping)
       17    1.445    0.085    6.490    0.382 local_hom_count_best.py:139(_add_intro_node_best)
  1495300    0.980    0.000    2.161    0.000 help_functions.py:64(<genexpr>)
   798069    0.707    0.000    1.181    0.000 generic_graph.py:12445(has_edge)
  1110596    0.590    0.000    0.611    0.000 generic_graph.py:11306(vertex_iterator)
   697261    0.543    0.000    2.558    0.000 {built-in method builtins.all}
   798069    0.473    0.000    0.473    0.000 {method 'has_edge' of 'sage.graphs.base.sparse_graph.SparseGraphBackend' objects}
        9    0.327    0.036    0.504    0.056 local_hom_count_best.py:229(_add_forget_node_best)
   697320    0.170    0.000    0.170    0.000 {built-in method builtins.isinstance}
        2    0.101    0.051    0.101    0.051 local_hom_count_best.py:277(<listcomp>)
       17    0.069    0.004    0.069    0.004 local_hom_count_best.py:174(<listcomp>)
    34863    0.061    0.000    0.095    0.000 local_hom_count_best.py:200(<listcomp>)
    52884    0.058    0.000    0.058    0.000 help_functions.py:91(add_vertex_into_mapping)
    52040    0.034    0.000    0.034    0.000 help_functions.py:84(extract_bag_vertex)
    52885    0.021    0.000    0.021    0.000 {method 'iterator_verts' of 'sage.graphs.base.c_graph.CGraphBackend' objects}
       17    0.005    0.000    0.008    0.000 generic_graph.py:4054(density)
        9    0.002    0.000    0.002    0.000 local_hom_count_best.py:242(<listcomp>)
     16/9    0.001    0.000    0.002    0.000 homset.py:86(Hom)
        9    0.000    0.000    0.001    0.000 homset.py:611(__init__)
......

Running prun with parallel version:

par_counter = ParallelGraphHomomorphismCounter(three_grid, H)
par_count = par_counter.count_homomorphisms_parallel()
%prun par_count.compute()

         236227 function calls (230883 primitive calls) in 3.099 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      142    2.443    0.017    2.443    0.017 {method 'acquire' of '_thread.lock' objects}
      170    0.076    0.000    0.076    0.000 {method 'read' of '_io.BufferedReader' objects}
      170    0.041    0.000    0.041    0.000 {built-in method marshal.loads}
    12366    0.041    0.000    0.045    0.000 reader.py:99(forward)
    327/1    0.040    0.000    3.100    3.100 {built-in method builtins.exec}
      637    0.018    0.000    0.064    0.000 scanner.py:752(scan_to_next_token)
  576/575    0.017    0.000    0.086    0.000 {built-in method builtins.__build_class__}
      658    0.016    0.000    0.016    0.000 {built-in method posix.stat}
    21068    0.013    0.000    0.013    0.000 reader.py:87(peek)
     5522    0.011    0.000    0.021    0.000 scanner.py:145(need_more_tokens)
      170    0.009    0.000    0.009    0.000 {built-in method io.open_code}
     3418    0.008    0.000    0.139    0.000 scanner.py:113(check_token)
      364    0.008    0.000    0.029    0.000 scanner.py:1270(scan_plain)
    24789    0.008    0.000    0.010    0.000 {built-in method builtins.isinstance}
     5717    0.008    0.000    0.008    0.000 scanner.py:279(stale_possible_simple_keys)
      161    0.008    0.000    0.008    0.000 {built-in method builtins.compile}
  160/124    0.008    0.000    0.010    0.000 {built-in method __new__ of type object at 0x10e8146f0}
      249    0.006    0.000    0.036    0.000 <frozen importlib._bootstrap_external>:1604(find_spec)
      876    0.006    0.000    0.017    0.000 typing.py:168(_type_check)
      953    0.005    0.000    0.010    0.000 scanner.py:135(get_token)
      441    0.005    0.000    0.025    0.000 parser.py:273(parse_node)
       12    0.005    0.000    0.005    0.000 {built-in method posix.listdir}
     2708    0.005    0.000    0.009    0.000 typing.py:1274(__setattr__)
       48    0.005    0.000    0.073    0.002 dataclasses.py:884(_process_class)
18090/17934    0.005    0.000    0.005    0.000 {built-in method builtins.len}
     3987    0.004    0.000    0.005    0.000 {built-in method builtins.getattr}
      637    0.004    0.000    0.114    0.000 scanner.py:156(fetch_more_tokens)
        3    0.004    0.001    0.004    0.001 {built-in method _imp.create_dynamic}
      170    0.004    0.000    0.149    0.001 <frozen importlib._bootstrap_external>:1007(get_code)
     1576    0.004    0.000    0.005    0.000 reader.py:114(get_mark)
      181    0.004    0.000    0.047    0.000 <frozen importlib._bootstrap>:1056(_find_spec)
      340    0.003    0.000    0.010    0.000 <frozen importlib._bootstrap_external>:437(cache_from_source)
       30    0.003    0.000    0.003    0.000 local.py:242(release_data)
    80/37    0.003    0.000    0.009    0.000 _parser.py:507(_parse)
     5080    0.003    0.000    0.003    0.000 scanner.py:264(next_possible_simple_key)
      365    0.003    0.000    0.009    0.000 scanner.py:1311(scan_plain_spaces)
     1417    0.003    0.000    0.007    0.000 <frozen importlib._bootstrap_external>:126(_path_join)
      371    0.003    0.000    0.017    0.000 typing.py:1330(__init__)
......

As said above, from what I saw in your code, with default multithreaded Scheduler, you won’t really run things in parallel. And adding Dask layer would probably slow things down a bit.

When running with Client and multiple worker, you should see computations in parallel. If you have sufficient independant tasks and long enough (e.g. longer than 1s), you should definitly see some speedup.

Well it’s a bit difficult, but the way you use Dask array in the provided code above is probably not good, especially if the enclosing function is already submitted as a task using other Dask APIs.

Merci pour vos explications !

Since you mentioned “pure Python” twice (and more after), do you mean that I should use a multi-process scheduler in this case, and/or not using pure Python (e.g., using Cython instead)? Would you suggest other approaches?

It’s possible due to my shallow understanding of parallel/concurrent programming and Dask, I just thought that using dask.delayed and dask.compute would automatically enable parallelism (or concurrency in the presense of Python GIL).

Yes, with Client running in the background, I did/do see computations running in parallel, and the node status changes in the task graph.

In my specific case, I think we may have sufficiently independent tasks, but not necessarily long enough. As you can see from the code, at least from what I saw in the dashboard, _add_join_* function would take the most of the time, which is why I have the somewhat involved implemention for this (because I want to enable parallelism/concurrency inside this function as well). I will check how to use Dask array more efficiently.

With plain Python code, in general, you should use multi process Scheduler or a Distributed cluster. You could also try with Cython but it’s another level and you should release the GIL properly, I’m not an expert here.

Yes, concurrency, which means concurrent operations when possible, but in your case, I don’t think there will be much.