Cross-graph dependencies and "starting" criteria

Hi all,

I intend to use dask to realize rather complicated workflows with multiple external programs/APIs, where I will feed the scheduler with pre-defined workflows, i.e., graphs, where sometimes graph B (I request computation of now) depends on a task in graph A (I submitted 20 minutes before).

One example with a pool of 4 workers:

  • I request computation of graph A at 12pm: worker 1 is busy with some program, worker 2 is busy with another program X.
  • I request computation of graph B (similar to A) at 12:10pm: worker 3 is now busy with some program, worker 4 is free to handle program X but is supposed to wait until the program X session of graph A has been closed.

What is required here is something that can be considered a “starting criterion”. Any idea on a (perhaps even elegant) solution for this? It seems that a task of an earlier graph cannot be referenced (e.g., using wait_on) since the graphs are submitted independently. I saw the coordination primitives, but don’t how they’d apply in this particular situation (see dummy versions of functions I use atm below).

@delayed
def start_programX():
    return programX.start()

@delayed
def stop_program(session):
    return session.stop()

Thanks a lot in advance!

Thanks for the question @hayi! One option could be to use client.persist, which will allow you to keep a result in memory if you need it at a later stage in your workflow. If you don’t use persist, then the scheduler will clean up results as long as no tasks are depending on them. The docs have more information on using client.persist for managing computation.

It’d also be great to understand your workflow a bit better-- I’m not sure I understand why you are looking for a “starting criteria” based on your example.

1 Like

Thanks for the prompt response, @scharlottej13!

Sure, let me try to explain a bit better. I would like to establish communication between graphs that have been submitted independently of each other (i.e., where directly referencing graph A’s task X from graph B is not possible).

Example: Let’s say graph A entails opening a session of some program with a node open_session, doing some work with it, and eventually closing it with a node close_session. A few minutes later, when I submit graph B with the same node open_session, that node should not start as long as graph A’s session is open even if there are enough resources that could handle a second session. That’s what I meant when I wrote “starting criterion”.

I hope that’s a bit clearer. Thanks again!

Of course! Thanks for providing some additional explanation. I think the best way to do this is to make graph B dependent on graph A, then graph B won’t start until graph A is complete. Perhaps this is not possible, and that’s what you meant by “directly referencing graph A’s task X from graph B is not possible”? If so, could you elaborate more on why this is not possible? I’d expect it to be possible even if the graphs were submitted independently of one another.

Sure! In my example, graphs A and B would be exactly the same (i.e., both are supposed to open and close a session of the same program). I am planning on using dask to build these graphs (with different parameters) and “launch” them asynchronously. At 12pm, I could trigger computation of graph A and a few minutes later, that of graph B, so a compile-time dependency between graphs is not feasible (please let me know if that’s not true). What is needed is rather a run-time dependency where a node of graph B (open_session) can “sense” when a node of a different graph has finished (graph A’s close_session).

Thanks for the additional explanation @hayi! I think you could use graph manipulation to do this. Specifically, bind to add implicit dependencies to a Dask collection, and wait_on to ensure dependents of a collection wait on another unrelated collection. Here’s a small toy example:

import dask
from dask import delayed
from dask.graph_manipulation import bind, wait_on

@delayed
def start_program():
    pass

@delayed
def do_computation(x):
    return x

@delayed
def stop_program():
    pass

# create task graphs with dependencies
graph_a = bind(stop_program(), bind(do_computation(5), start_program()))
graph_b = bind(stop_program(), bind(do_computation(10), start_program()))

# graph_b will wait for graph_a to complete
wait_graph = wait_on(graph_a, graph_b)

Where the task graph for graph_a (dask.visualize(graph_a)) looks like:

and the task graph for wait_graph looks like:

It sounds like you’re trying to achieve some form of synchronization across tasks, where there’s some external resource that Dask doesn’t know about, and you need to ensure that only one task can access it at a time. As you originally said, Dask’s synchronization primitives are probably the way to do this.

from distributed import Lock
from dask import delayed

session_lock = Lock('session_lock")

@delayed
def run_program_A():
    session_lock.acquire(timeout="1min")  # always set a timeout!
    try:
        session = open_session()
        result = do_stuff(session)
        close_session(session)
        return result
    finally:
        session_lock.release()

It’s important to be aware that Dask’s synchronization primitives can cause deadlocks. They’re just thin wrappers around standard Python synchronization primitives, so they’re missing features that would be necessary for distributed synchronization:

  • They have no lease period (a lock won’t revert back to un-held after a certain amount of time).
  • Dask doesn’t do anything smart around tracking which task/worker holds a lock, so:
  • If a task fails, and doesn’t release its lock via a try/finally, the lock will remain locked forever.
  • If a worker dies/disconnects mid-task, the lock will remain locked forever.

Obviously, the lock being locked forever is a big problem (no other tasks can run). Hence why calling acquire with a timeout is important; that way, at least your tasks will fail, instead of hanging forever!

One other thing to note is that it’s not really possible to correctly share a lock between multiple tasks that are running in parallel. For example:

@delayed
def get_session():
    session_lock.acquire(timeout="1min")
    try:
        return open_session()
    except Exception:
        session_lock.release()

@delayed
def process_foo(session):
    # assume we already have the lock
    try:
        return foo(session)
    except Exception:
        # but if something goes wrong, we need to release it
        session_lock.release()

@delayed
def process_bar(session):
    # assume we already have the lock
    try:
        return bar(session)
    except Exception:
        # but if something goes wrong, we need to release it
        session_lock.release()

@delayed
def finish(foo_result, bar_result):
    try:
        close_session()
    finally:
        # we're done with the session!
        session_lock.release()
    return process(foo_result, bar_result)

s = get_session()
f, b = process_foo(s), process_bar(s)
result = finish(f, b).compute()

The first task would acquire the lock, but all the tasks need logic to release the lock if they error. However, what if foo errors, but bar doesn’t? Then, since the lock was released, a get_session might be able to run again while bar is still using the session. There would need to be some way for process_foo to communicate with process_bar and say “hey I failed, so you should stop what you’re doing, and when we’re both done, then we’ll release session_lock together”. It might be possible to construct something like this with Events or Pub/Sub, but quite complex, and I’m not sure it would work.

Of course, if you don’t have anything like process_foo and process_bar that need to run in parallel, then this pattern will still work (minus the general caveats about Locks deadlocking).

1 Like