I am trying to implement an algorithm in Sage/Python for counting graph homomorphisms from a graph G to a graph H, with dynamic programming on a nice tree decomposition.
I have completed this algorithm, and now I wish to parallelise this program with Dask (I have tried concurrent.futures
but there were some pickle issues). It should be noted that for dynamic programming on a nice tree decomposition, we would have some kinds of dependency, that is, we can only start compute the result of a parent node, after we have the result(s) of its child(ren) node(s).
Parallel with Dask:
def process_node(self, node):
node_type = self.dir_labelled_TD.get_vertex(node)
match node_type:
case 'intro':
result = self._add_intro_node_parallel(node)
case 'forget':
result = self._add_forget_node_parallel(node)
case 'join':
result = self._add_join_node_parallel(node)
case _:
result = self._add_leaf_node_parallel(node)
node_index = get_node_index(node)
self.DP_table[node_index] = result
return result
def count_homomorphisms_parallel(self):
# Dictionary to store all futures/promises
self.futures = {}
for node in reversed(self.dir_labelled_TD.vertices()):
# Delaying each node process and storing in futures
self.futures[node] = self.process_node(node)
print("Futures: ", self.futures)
# Compute all results, respecting the inherent dependencies among them
results = dask.compute(*self.futures.values())
print("Results: ", [f.compute() for f in results])
return self.DP_table[0][0]
When I am trying to run this program in the notebook, with the following example
par_counter = ParallelGraphHomomorphismCounter(graph, three_grid)
par_count = par_counter.count_homomorphisms_parallel()
print(par_count)
The output is the following. I am relatively new to concurrency (in Python), and have browsed relevant documentations, but still could not figure it out.
So I was wondering if you have some suggestions or ideas, thank you for your time! If you are curious, please see the end for the non-parallel version.
Futures: {(6, {}): Delayed('process_node-0b571dcd-00e5-4871-871a-ef52e16b4ffb'), (5, {2}): Delayed('process_node-0fbc0886-3368-4d0e-8b09-751cce606ffe'), (4, {0, 2}): Delayed('process_node-0187a2da-aba7-42f7-83ab-497f62ea6b1f'), (3, {0}): Delayed('process_node-18729eea-99f4-45ee-af15-0de45395f181'), (2, {0, 1}): Delayed('process_node-8c85c333-301d-4e49-b9d4-c01bc20c05ae'), (1, {0}): Delayed('process_node-7e528db6-4636-4b31-8eb1-6f807ac32627'), (0, {}): Delayed('process_node-6bb90670-11b4-4e40-b7bc-cefb2fef6479')}
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In [3], line 28
23 # colour_counter = GraphHomomorphismCounter(square, three_grid, 2, square_clr, three_grid_clr, colourful=True)
24 # colourful_count = colour_counter.count_homomorphisms_best()
25 # print(colourful_count)
27 par_counter = ParallelGraphHomomorphismCounter(graph, three_grid)
---> 28 par_count = par_counter.count_homomorphisms_parallel()
29 print(par_count)
File ~/github/local-hom-count/local_hom_count_best_parallel.py:118, in ParallelGraphHomomorphismCounter.count_homomorphisms_parallel(self)
116 # Compute all results, respecting the inherent dependencies among them
117 results = dask.compute(*self.futures.values())
--> 118 print("Results: ", [f.compute() for f in results])
119 # Since the results are integrated into the DP_table in each process_node call,
120 # you can simply access the final result:
121 return self.DP_table[0][0]
File ~/github/local-hom-count/local_hom_count_best_parallel.py:118, in <listcomp>(.0)
116 # Compute all results, respecting the inherent dependencies among them
117 results = dask.compute(*self.futures.values())
--> 118 print("Results: ", [f.compute() for f in results])
119 # Since the results are integrated into the DP_table in each process_node call,
120 # you can simply access the final result:
121 return self.DP_table[0][0]
File ~/.sage/local/lib/python3.11/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
351 def compute(self, **kwargs):
352 """Compute this dask collection
353
354 This turns a lazy Dask collection into its in-memory equivalent.
(...)
373 dask.compute
374 """
--> 375 (result,) = compute(self, traverse=False, **kwargs)
376 return result
File ~/.sage/local/lib/python3.11/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/github/local-hom-count/local_hom_count_best_parallel.py:239, in ParallelGraphHomomorphismCounter._add_intro_node_parallel(self, node)
235 child_DP_entry = self.DP_table[child_node_index]
236 # print("INTRO child DP entry: ", child_DP_entry)
237 # print("\n")
--> 239 for mapped in range(len(child_DP_entry)):
240 # Neighborhood of the mapped vertices of intro vertex in the target graph
241 mapped_intro_nbhs = [extract_bag_vertex(mapped, vtx, self.actual_target_size) for vtx in intro_vtx_nbhs]
242 # print("mapped: ", mapped)
243 # print("mapped nbhs in target: ", mapped_intro_nbhs)
File ~/.sage/local/lib/python3.11/site-packages/dask/delayed.py:635, in Delayed.__len__(self)
633 def __len__(self):
634 if self._length is None:
--> 635 raise TypeError("Delayed objects of unspecified length have no len()")
636 return self._length
TypeError: Delayed objects of unspecified length have no len()
Non-parallel:
def count_homomorphisms_best(self):
r"""
Return the number of homomorphisms from the graph `G` to the graph `H`.
A homomorphism from a graph `G` to a graph `H` is a function
`\varphi : V(G) \mapsto V(H)`, such that for any edge `uv \in E(G)` the
pair `\varphi(u) \varphi(v)` is an edge of `H`.
For more information, see the :wikipedia:`Graph_homomorphism`.
ALGORITHM:
This is an implementation based on the proof of Prop. 1.6 in [CDM2017]_.
OUTPUT:
- an integer, the number of homomorphisms from `graph` to `target_graph`
EXAMPLES::
sage: graph = graphs.CompleteBipartiteGraph(1, 4)
sage: target_graph = graphs.CompleteGraph(4)
sage: from sage.graphs.hom_count_best import count_homomorphisms_best
sage: count_homomorphisms_best(graph, target_graph)
324
"""
# Whether it's BFS or DFS, every node below join node(s) would be
# computed first, so we can safely go bottom-up.
for node in reversed(self.dir_labelled_TD.vertices()):
node_type = self.dir_labelled_TD.get_vertex(node)
# print("\nNode: ", node, node_type)
match node_type:
case 'intro':
self._add_intro_node_best(node)
case 'forget':
self._add_forget_node_best(node)
case 'join':
self._add_join_node_best(node)
case _:
self._add_leaf_node_best(node)
return self.DP_table[0][0]