My goal is to write a workflow framework using dask graph.
Both serial and parallel tasks work well.
But when I encounter an exclusive gateway element, I don’t know how to implement it.
I have a crude version here.
# task graphs。 a |-on_success-> b means: a -> b or a -> c
|-on_error-> c
def a(n):
try:
n = 1/n
except Exception as e:
return 'error', 0
return 'success', n
def b(n):
print('b', n)
def c(n):
print('c', n)
def func_condition(ret, branch):
branch.get(ret[0])(ret[1])
if __name__ == '__main__':
client = Client('xx:8786')
dsk = {
'a': (a, 0),
'z': (func_condition, 'a', {'success':b, 'error':c}),
}
print(client.get(dsk, 'z'))
The above can simply meet the demand.
But if there is more path depth in the branch?
a -> b -> d or a -> c -> e
Is there a better way to achieve it?