Best practices for hierarchical workflow to write HDF5 with Slurm allocated ressources

Dear all,

I am trying to launch a workflow on Slurm allocated ressources to write a HDF5 file. The workflow is composed of several tasks that themselves launch other tasks. As it raised several questions and issues (detailed below), I am looking for recommendations about best practices to do it with dask. I first provide the exact context, then a minimal example, and finally issues and things I have tried.

Context

I am creating a dataset of astronomical information that would be stored as a HDF5 file. The raw data lie in a number of FITS formatted files that are read as a table with astropy. The table information is grouped by region in the sky. For each region, several processes are launched to handle sub-regions independently. For each sub-region a process loads images from disk and loop over objects in the image. This process makes the match between FITS files and images and write the output to a HDF5 file. The process is expected to fail in some case, where the files are missing. We would like to report the missing files and the errors in general without impacting the rest of the workflow.

Minimal Example

Below, I tried to provide a minimal example of the workflow.

import random
from dask.distributed import (
    Client,
    LocalCluster,
    Lock,
    worker_client,
)


def read_input(n: int):
    """Read the input files and submit new tasks for each region in the file."""
    futures = []
    with worker_client() as client:
        for i in range(100):
            future = client.submit(process_region, i)
            futures.append(future)
    return futures


def process_region(i: int):
    """Process a region. For each subregion in the region submit a new task."""
    futures = []
    with worker_client() as client:
        for k in range(1, i + 1):
            future = client.submit(process_subregion, k)
            futures.append(future)
    return futures


def process_subregion(k: int):
    """Process a subregion."""
    # The lock mimics what is necessary to write concurently in the HDF5 output
    # HDF5 supports parallel writing only with MPI.
    with Lock(name=str(k)):
        div = random.randint(0, 3)
        return k / div


if __name__ == "__main__":
    # n_proc would be the number of processes allocated for the Slurm task
    n_proc = 4
    # Assume the local cluster can work with the ressources allocated by Slurm
    cluster = LocalCluster(n_workers=n_proc, processes=True)
    client = Client(cluster)
    print(f"Successfully created client with {n_proc} workers: {client}")
    print(f"{client.dashboard_link}")
    inputs = list(range(50))
    futures = client.map(read_input, inputs)
    results = client.gather(futures)
    client.shutdown()

Errors

Launching the minimal example above produces a series of warnings:

distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name=“execute(‘process_region-e3efeea7b6d3b6e346d65cbf5c4dc7a1’)” coro=<Worker.execute() done, defined at python3.10/site-packages/distributed/worker_state_machine.py:3615>> ended with CancelledError

and

WARNING - Removing worker ‘tcp://127.0.0.1:43281’ caused the cluster to lose already computed task(s)

In practice, launching the real workflow on a single node of my cluster leads to similar warnings in addition to the following error:

ERROR - cannot schedule new futures after shutdown

distributed.lock - ERROR -
Traceback (most recent call last):
File “python3.10/site-packages/distributed/lock.py”, line 48, in acquire
await future
File “python3.10/asyncio/locks.py”, line 214, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File “python3.10/site-packages/distributed/utils.py”, line 832, in wrapper
return await func(*args, **kwargs)
File “python3.10/site-packages/distributed/lock.py”, line 56, in acquire
assert event is event2
AssertionError

What I have tried

Instead of returning the futures directly in the different functions, I tried to store the futures in a queue. Indeed, I am only interested in the subprocess_region futures, to know if they failed. However, it still raises the error and warnings above.
Instead of returning the futures, I tried calling fire_and_forget. I obtained still the same kinds of errors.
I also tried to call client.gather within the context manager of each function. In combination of catching the error directly in the process_subregion this seems to fix the issues. However, I understand the client.gather will actually call the computation of the functions and seem to result in much slower computation due to poor overlapping of the functions.

Questions

  • What would be the correct approach for a hierarchical workflow that triggers new tasks? Which dask functions must be called to ensure the proper parallel execution of the tasks? How to fix the warnings and issues observed above?

  • Working in a Slurm environment, is it ok to assume that once the ressources have been allocated, dask can work with the LocalCluster?

  • Regarding writing output in parallel in a HDF5, what would be the best approach? Is there any additional precaution to take with the use of a lock in dask (e.g. timeout issue to check if the worker is alive while waiting for the lock)?

Hi @LTMeyer, welcome to Dask community!

At a first glance in your code, you have two nested levels of Futures submission, but you are waiting only one one level in the end. This seems to be confirmed by:

I think the error you are seeing is because of the second level of nested futures that are still executing when you shutdown the cluster.

client.gather don’t call the computation, computations are already running in the background, it just waits for the end of these computations. This should not be a problem, but I didn’t took a close look or tried to reproduce.

Idealy, you shouldn’t have to have these two nested level of computation submission. Couldn’t you call the read_input sequentially on Client side? Does it takes time to read the input and submit computations?

Else, gathering results in the process_region function seems a good thing to me, but again this depends on your profile. Maybe a solution is just to not submit any Futures in the process_region function. How many regeion do you have to process? Is there enough parallelization at this level?

You might also call another gather on the first gathered results. Because in your current code, you’ve gather Future, which you’ll need to gather again to wait for their end.

So I think I already answered partly to all this. But I will recomment to keep it simple: parallelize only at the sufficient level, don’t try to parallelize too much, especially with nested calls. Your code already ensures the proper parallel execution of the tasks.

I’m not sure about what you mean here. You can perfectly submit a Slurm job that inside uses a LocalCluster. SLURMCluster allows you to take advantage of multiple Slurm nodes.

Well, I would recommend either writing individual files, or using another format. But if HDF5 is required, you might want to use Distributed locks, and the writing performance won’t be high.