Dear all,
I am trying to launch a workflow on Slurm allocated ressources to write a HDF5 file. The workflow is composed of several tasks that themselves launch other tasks. As it raised several questions and issues (detailed below), I am looking for recommendations about best practices to do it with dask. I first provide the exact context, then a minimal example, and finally issues and things I have tried.
Context
I am creating a dataset of astronomical information that would be stored as a HDF5 file. The raw data lie in a number of FITS formatted files that are read as a table with astropy
. The table information is grouped by region in the sky. For each region, several processes are launched to handle sub-regions independently. For each sub-region a process loads images from disk and loop over objects in the image. This process makes the match between FITS files and images and write the output to a HDF5 file. The process is expected to fail in some case, where the files are missing. We would like to report the missing files and the errors in general without impacting the rest of the workflow.
Minimal Example
Below, I tried to provide a minimal example of the workflow.
import random
from dask.distributed import (
Client,
LocalCluster,
Lock,
worker_client,
)
def read_input(n: int):
"""Read the input files and submit new tasks for each region in the file."""
futures = []
with worker_client() as client:
for i in range(100):
future = client.submit(process_region, i)
futures.append(future)
return futures
def process_region(i: int):
"""Process a region. For each subregion in the region submit a new task."""
futures = []
with worker_client() as client:
for k in range(1, i + 1):
future = client.submit(process_subregion, k)
futures.append(future)
return futures
def process_subregion(k: int):
"""Process a subregion."""
# The lock mimics what is necessary to write concurently in the HDF5 output
# HDF5 supports parallel writing only with MPI.
with Lock(name=str(k)):
div = random.randint(0, 3)
return k / div
if __name__ == "__main__":
# n_proc would be the number of processes allocated for the Slurm task
n_proc = 4
# Assume the local cluster can work with the ressources allocated by Slurm
cluster = LocalCluster(n_workers=n_proc, processes=True)
client = Client(cluster)
print(f"Successfully created client with {n_proc} workers: {client}")
print(f"{client.dashboard_link}")
inputs = list(range(50))
futures = client.map(read_input, inputs)
results = client.gather(futures)
client.shutdown()
Errors
Launching the minimal example above produces a series of warnings:
distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name=“execute(‘process_region-e3efeea7b6d3b6e346d65cbf5c4dc7a1’)” coro=<Worker.execute() done, defined at python3.10/site-packages/distributed/worker_state_machine.py:3615>> ended with CancelledError
and
WARNING - Removing worker ‘tcp://127.0.0.1:43281’ caused the cluster to lose already computed task(s)
In practice, launching the real workflow on a single node of my cluster leads to similar warnings in addition to the following error:
ERROR - cannot schedule new futures after shutdown
distributed.lock - ERROR -
Traceback (most recent call last):
File “python3.10/site-packages/distributed/lock.py”, line 48, in acquire
await future
File “python3.10/asyncio/locks.py”, line 214, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File “python3.10/site-packages/distributed/utils.py”, line 832, in wrapper
return await func(*args, **kwargs)
File “python3.10/site-packages/distributed/lock.py”, line 56, in acquire
assert event is event2
AssertionError
What I have tried
Instead of returning the futures directly in the different functions, I tried to store the futures in a queue. Indeed, I am only interested in the subprocess_region
futures, to know if they failed. However, it still raises the error and warnings above.
Instead of returning the futures, I tried calling fire_and_forget
. I obtained still the same kinds of errors.
I also tried to call client.gather
within the context manager of each function. In combination of catching the error directly in the process_subregion
this seems to fix the issues. However, I understand the client.gather
will actually call the computation of the functions and seem to result in much slower computation due to poor overlapping of the functions.
Questions
-
What would be the correct approach for a hierarchical workflow that triggers new tasks? Which
dask
functions must be called to ensure the proper parallel execution of the tasks? How to fix the warnings and issues observed above? -
Working in a Slurm environment, is it ok to assume that once the ressources have been allocated, dask can work with the
LocalCluster
? -
Regarding writing output in parallel in a HDF5, what would be the best approach? Is there any additional precaution to take with the use of a lock in
dask
(e.g. timeout issue to check if the worker is alive while waiting for the lock)?