I’m using Dask to distribute nodes (Python tasks) in a Kedro data pipeline (i.e. a DAG). Each task can have inputs and outputs, and outputs from one node can be used as inputs to another node.
Since an output dataset can be an input to several nodes, I need to know when a dataset has been loaded X times, where X is the number of nodes that the dataset is an input for; once it’s been loaded X times, I can release it from memory.
Each node is executed on a worker, so the simplest way to accomplish this is to wait for a node to complete and report back to the scheduler, and the scheduler can keep track of load counts/release datasets at the right time. However, this isn’t necessarily the most efficient, as this means that we wait for a node to load input datasets, perform whatever calculations/logic, and save output datasets before reporting back to the scheduler and potentially releasing some of the node’s inputs.
The input datasets that we were completely done with could have been released right after they were read. However, this means that workers have the responsibility to release datasets, which means they need access to the counters. Multiple workers could also be loading and then modifying the load counts for each dataset, so we need to lock each counter.
Is there a standard pattern wherein we can manage a set of counters to be modified by workers? I saw the Counter
example using actors (Actors — Dask.distributed 2022.8.1 documentation), but I’m not sure if actors are the right approach here, mostly because they’re not resilient.
Here is the simple approach that I have to releasing datasets:
...
for i, (_, node) in enumerate(
as_completed(node_futures.values(), with_results=True)
):
self._logger.info("Completed node: %s", node.name)
self._logger.info("Completed %d out of %d tasks", i + 1, len(nodes))
# Decrement load counts, and release any datasets we
# have finished with. This is particularly important
# for the shared, default datasets we created above.
for data_set in node.inputs:
load_counts[data_set] -= 1
if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
catalog.release(data_set)
for data_set in node.outputs:
if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
catalog.release(data_set)
For more context, if necessary: Document distribution of Kedro pipelines with Dask by deepyaman · Pull Request #1131 · kedro-org/kedro · GitHub