Testing a Dask pipeline (fixtures generation, non regression tests)

I plan to write a blog post on testing data pipelines, with a focus on dask for the DAG evaluation, and on pytest fixtures for providing the values of the different tasks or task types in the pipeline to the test functions.

My first question is, are aware of any post or talk on the subject?

What I plan to cover is

  1. how to generate the fixtures by just calling the functions directly
  2. how to generate the fixtures with a complete evaluation of the pipeline
  3. how to do a non-regression on each task of the pipeline

Regarding the non-regression, I’d like to re-run each task of the pipeline, given the values of parent tasks taken from a reference run. Here I have a more technical question: is it possible to substitute the parent nodes of a delayed object with a value? I mean, something like:

import pytest
from dask import delayed


def inc(x):
    return x+1


@pytest.fixture
def pipeline():
    a = delayed(inc)(1)
    b = delayed(inc)(a)
    return b


def test_compute(pipeline):
    assert pipeline.compute() == 3


def test_compute_partial(pipeline):
    # TODO: now I'd like to evaluate b given a=4 - is this possible?
    assert pipeline.compute(a=4) == 5

Thanks @mwouts for the questions! I don’t know of any posts/talks on this subject, sounds like an interesting use of graph manipulation. Here’s the API, but I’m not sure it has what you’re looking for. I can do some more digging and get back to you!

I would write the fixture as returning a closure

@pytest.fixture
def pipeline_gen():
    def _(value=1):
        a = delayed(inc)(value)
        b = delayed(inc)(a)
        return b
    return _

def test_compute(pipeline_gen):
    pipeline = pipeline_gen(40)
    assert pipeline.compute() == 42

or alternatively, see Parametrizing tests — pytest documentation

@pytest.fixture
def pipeline(request):
    a = delayed(inc)(request.param)
    b = delayed(inc)(a)
    return b 
    
@pytest.mark.parametrize("pipeline", [-5, 0, 12345], indirect=True)
def test_compute(pipeline):
     assert pipeline.compute() > 0

Thank you @scharlottej13 for your answer! Then I’ll share the post when it becomes ready :slight_smile:

I also found a local Dask expert in the person of Emmanuel Serié and he showed me how to evaluate a delayed task with custom inputs:

# Use task.dask.dependencies to identify the inputs for a task
assert b.dask.dependencies()["b"] == {"a"}

# Create a new task with the same function but with custom inputs
b_mod = Delayed("b", dict(b.dask, a=4))

# The new output is the expected one (a+1=5)
assert b_mod.compute() == 5

Below is the full test for completedness:

import pytest
from dask.delayed import Delayed, delayed


def inc(x):
    return x + 1


@pytest.fixture
def b():
    a = delayed(inc)(1, dask_key_name="a")
    return delayed(inc)(a, dask_key_name="b")


def test_compute(b):
    assert b.compute() == 3


def test_compute_partial(b):
    assert b.dask.dependencies()["b"] == {"a"}
    b_mod = Delayed("b", dict(b.dask, a=4))
    assert b_mod.compute() == 5

Thank you @fjetter for your comment - sorry maybe I’ve not been clear with my two questions :laughing:

The first one was about turning dask tasks into pytest fixtures - I will cover this in the article and I was curious to know if people had seen blog posts about this.

The second question is how to evaluate a task given custom inputs. From the answer above I understand that it is required to make a copy of the task (as tasks are immutable) with the custom values for the inputs. Also the tasks must have names (set with dask_key_name), otherwise we could not use a=4 in

# Create a new task with the same function but with custom inputs
b_mod = Delayed("b", dict(b.dask, a=4))

Thanks for your inputs, and also, if anyone is willing to help me review the coming blog post I can open the repo to them (I expect to have a draft ready by tomorrow - please share your github user name).

The idea of hotswapping the leaf node(s) of the graph is something I’m familiar with. Namely, it is something desirable when the definition of the graph itself is a very CPU-intensive affair. I fiddled with this idea back when I had a graph with ~12 million nodes and building the graph (not computing it) took in excess of 30 minutes - and because of its nature, it was extremely hard to parallelise.

Hotswapping a node in the graph requires deep understanding of the low level internal implementation of your collections and how they use the HighLevelGraph. As it is an implementation detail, it is something that is bound to break without warning at any new release of dask. So unless you’re desperate for it (as in my use case above), it’s something that should be avoided.

I second @fjetter’s suggestion that you should just regenerate the graph from scratch from the bottom if that’s not onerous in terms of CPU.

1 Like

Hi @crusaderky , thanks for the advice! Well maybe I should recall the context for evaluating a task with custom inputs - I want to run a non regression on all the nodes of the graph, and for every node, evaluate the node given the inputs from a reference run, to be able to detect any breaking change in the code.

In particular I am not building a complex graph, but many independent graphs made of just one node.

Indeed I could evaluate the node function rather than building another Delayed object - is the function available somewhere in node.dask?

Currently my code for the non-regression looks like this, do you think we could make it neater?

def node_iterator():
    """Iterate over the nodes in the pipeline"""
    for name, node in get_non_regression_pipeline().items():
        yield name, node


@pytest.mark.parametrize("name,node", node_iterator())
def test_non_regression(name, node, non_regression_data):
    """For each node in the data pipeline, load the inputs from a
    reference run, evaluate the node, and compare the new output with
    the output from the reference run"""
    expected = non_regression_data[name]

    # Load the inputs for the given node from the reference non-reg data
    inputs = {
        input_name: non_regression_data[input_name]
        for input_name in node.dask.dependencies[name]
    }

    # And evaluate the node given the inputs above
    node_with_inputs_from_non_reg_data = Delayed(name, dict(node.dask, **inputs))
    actual = node_with_inputs_from_non_reg_data.compute()

    # ######################################
    # ### There should be no difference! ###
    # ######################################
    diff = DeepDiff(actual, expected)
    assert not diff

You could replace node_iterator() with get_non_regression_pipeline().items().
Other than that, I can’t see a way to improve things given your constraints.

1 Like

It looks like @mwouts 's blog post is out, it’s a good read!

2 Likes

Thank you @ian, indeed the post is ready - It is available on Medium, and also on GitHub/CFMTech. All the examples are tested (and under a MIT license), I hope you find it useful! Also please let me know if you have remarks, this is what I used but I’m sure some aspects can still be improved!

1 Like