What's an idiomatic way to create a resilient, distributed counter?

I’m using Dask to distribute nodes (Python tasks) in a Kedro data pipeline (i.e. a DAG). Each task can have inputs and outputs, and outputs from one node can be used as inputs to another node.

Since an output dataset can be an input to several nodes, I need to know when a dataset has been loaded X times, where X is the number of nodes that the dataset is an input for; once it’s been loaded X times, I can release it from memory.

Each node is executed on a worker, so the simplest way to accomplish this is to wait for a node to complete and report back to the scheduler, and the scheduler can keep track of load counts/release datasets at the right time. However, this isn’t necessarily the most efficient, as this means that we wait for a node to load input datasets, perform whatever calculations/logic, and save output datasets before reporting back to the scheduler and potentially releasing some of the node’s inputs.

The input datasets that we were completely done with could have been released right after they were read. However, this means that workers have the responsibility to release datasets, which means they need access to the counters. Multiple workers could also be loading and then modifying the load counts for each dataset, so we need to lock each counter.

Is there a standard pattern wherein we can manage a set of counters to be modified by workers? I saw the Counter example using actors (Actors — Dask.distributed 2022.8.1 documentation), but I’m not sure if actors are the right approach here, mostly because they’re not resilient.

Here is the simple approach that I have to releasing datasets:

...

        for i, (_, node) in enumerate(
            as_completed(node_futures.values(), with_results=True)
        ):
            self._logger.info("Completed node: %s", node.name)
            self._logger.info("Completed %d out of %d tasks", i + 1, len(nodes))
            # Decrement load counts, and release any datasets we
            # have finished with. This is particularly important
            # for the shared, default datasets we created above.
            for data_set in node.inputs:
                load_counts[data_set] -= 1
                if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
                    catalog.release(data_set)
            for data_set in node.outputs:
                if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
                    catalog.release(data_set)

For more context, if necessary: Document distribution of Kedro pipelines with Dask by deepyaman · Pull Request #1131 · kedro-org/kedro · GitHub

Thanks @deepyaman for the question! There are ways to create a Distributed counter, but based on your description it seems like more what you’re looking for is a way to execute a release function once your workers have finished. This can be done with graph manipulation, where you can explicitly add a release task that is dependent on your input tasks. Here is a snippet with a simple example:

import dask
from dask import delayed
from dask.graph_manipulation import bind, wait_on

@delayed
def func(x):
    return x

@delayed
def release():
    pass

delayeds = [func(x) for x in range(3)]

# release is now dependent on delayeds
new_cleanup = bind(release(), delayeds)

Here is what dask.visualize(delayeds + [new_cleanup]) looks like:

There’s also a nice, more detailed SO post on this, if you’re interested! If you are after a Distributed counter, there is a Global variable that you could use, but the graph manipulation approach seems to better fit your question.

Regarding using Actors, I agree that this isn’t needed here, they are indeed more experimental at the moment.

2 Likes

Thanks @scharlottej13! This looks like a great solution, and it even makes me think that I should look at using bind rather than passing Futures in a lot of instances where I don’t need the result of the Future (and it really is just graph dependency specification). I will give it a try and report back, whether it works or I face issues.

1 Like

@deepyaman I can’t help but feel like what you’re describing here is, well… Dask. Executing a DAG across multiple machines, and releasing data when it’s no longer needed, is basically the main job that the distributed scheduler does. I wonder, if you integrated more tightly with Dask than just using client.submit, if Dask itself could end up handling these things for you. Since you already have a DAG, maybe you could translate that into Dask’s DAG, using delayed for instance? (That would also be a prerequisite to using bind like @scharlottej13 suggested.)

I’m not familiar with what a “dataset” means in your system, but getting some cleanup code to run when Dask keys are no longer needed will be the trickiest part. I think the graph manipulations Sarah mentioned will be the best solution, but there might be some other tricks too. When you get to that point, it might be worth opening another discussion for that topic, since I think it comes up somewhat frequently.

3 Likes

@gjoseph92 Thank you for your comment, and sorry for the delay in getting back to you! To provide a bit of context, Kedro is a popular open-source framework for defining data pipelines. Most Kedro users don’t currently use Dask, but they do frequently deploy these pipelines in a wide variety of ways, including locally (optionally with threading or multiprocessing) on things like Kubeflow, AWS Batch, etc. My goal is to give Kedro users a way to distribute their pipelines with Dask, without changing their code significantly or even knowing much about Dask.

Since you already have a DAG, maybe you could translate that into Dask’s DAG, using delayed for instance? (That would also be a prerequisite to using bind like @scharlottej13 suggested.)

Does this imply that you can’t use bind with Futures? I was planning to test it out, but haven’t had the chance to yet.

Yeah, I wasn’t at all trying to say “you should use dask instead”, or “your users should use dask instead”. What I meant was that you might get more benefit from dask if you translated your DAG into Dask’s DAG format, then used Dask to execute the full DAG. As opposed to what you’re doing right now, where you’re basically traversing the DAG yourself in topological order, and just using Dask almost like a fancy concurrent.futures.Executor and just submitting tasks into it.

Making that conversion should be quite simple: basically just swap client.submit with constructing and calling Delayed objects. (And adding binds as necessary.) At the end, you’ll end up with one Delayed object representing the entire DAG, which you can just call compute() on.

I don’t believe you can.

1 Like

I thought if I used Delayed objects and call compute() on it, that wouldn’t be something that could get distributed on a Dask cluster. However, looking into it further (and seeing articles like https://coiled.io/blog/how-to-parallelize-python-code-with-dask-delayed/), it seems I’m mistaken and code using Delayed can also be used with a distributed.Client. If that’s the case, I will look into it more.

Thanks again for the help, and tolerating my newness to Dask!

4 Likes