It sounds like you’re trying to achieve some form of synchronization across tasks, where there’s some external resource that Dask doesn’t know about, and you need to ensure that only one task can access it at a time. As you originally said, Dask’s synchronization primitives are probably the way to do this.
from distributed import Lock
from dask import delayed
session_lock = Lock('session_lock")
@delayed
def run_program_A():
session_lock.acquire(timeout="1min") # always set a timeout!
try:
session = open_session()
result = do_stuff(session)
close_session(session)
return result
finally:
session_lock.release()
It’s important to be aware that Dask’s synchronization primitives can cause deadlocks. They’re just thin wrappers around standard Python synchronization primitives, so they’re missing features that would be necessary for distributed synchronization:
- They have no lease period (a lock won’t revert back to un-held after a certain amount of time).
- Dask doesn’t do anything smart around tracking which task/worker holds a lock, so:
- If a task fails, and doesn’t release its lock via a try/finally, the lock will remain locked forever.
- If a worker dies/disconnects mid-task, the lock will remain locked forever.
Obviously, the lock being locked forever is a big problem (no other tasks can run). Hence why calling acquire
with a timeout is important; that way, at least your tasks will fail, instead of hanging forever!
One other thing to note is that it’s not really possible to correctly share a lock between multiple tasks that are running in parallel. For example:
@delayed
def get_session():
session_lock.acquire(timeout="1min")
try:
return open_session()
except Exception:
session_lock.release()
@delayed
def process_foo(session):
# assume we already have the lock
try:
return foo(session)
except Exception:
# but if something goes wrong, we need to release it
session_lock.release()
@delayed
def process_bar(session):
# assume we already have the lock
try:
return bar(session)
except Exception:
# but if something goes wrong, we need to release it
session_lock.release()
@delayed
def finish(foo_result, bar_result):
try:
close_session()
finally:
# we're done with the session!
session_lock.release()
return process(foo_result, bar_result)
s = get_session()
f, b = process_foo(s), process_bar(s)
result = finish(f, b).compute()
The first task would acquire the lock, but all the tasks need logic to release the lock if they error. However, what if foo
errors, but bar
doesn’t? Then, since the lock was released, a get_session
might be able to run again while bar
is still using the session. There would need to be some way for process_foo
to communicate with process_bar
and say “hey I failed, so you should stop what you’re doing, and when we’re both done, then we’ll release session_lock
together”. It might be possible to construct something like this with Events or Pub/Sub, but quite complex, and I’m not sure it would work.
Of course, if you don’t have anything like process_foo
and process_bar
that need to run in parallel, then this pattern will still work (minus the general caveats about Lock
s deadlocking).