How do I make dask memoize intermediate task results on a file system?

I have a data analysis pipeline with many functions that consume and produce DataFrames in a DAG-like way.

What I’m trying to accomplish

  • i want dask to save the output of any DataFrame-returning task (each delayed?) as a parquet file on a file system I specify (e.g. a path in a fsspec.FileSystem)
    • i’m guessing I’d provide serialize / deserialize methods for DataFrames?
  • if i re-run a pipeline…
    • I’d like dask to reuse saved results
    • if i delete results in the file system, for dask to re-compute them

In other words:

  • I want to memoize tasks :slight_smile:
  • if you’ve used Luigi: I want Luigi’s result persistence and fault tolerance functionality (without the bulky Target API)

What’s a good way to accomplish this? (Is there a way?)

After reading a lot of GitHub issues and stackoverflow questions, it sounds like i might want to"

Simple example…

here’s a DataFrame flow that i’d like to “daskify.”

import numpy as np
import pandas as pd
from dask import compute, delayed
from distributed import Client


def make_df(seed: int = 0) -> pd.DataFrame:
    print("running computation")
    rng = np.random.default_rng(seed)
    return pd.DataFrame(
        data=rng.standard_normal((3, 2)),
        columns=list("AB"),
    )


def summarize_dataframes(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    return pd.concat([df1, df2], axis=0).describe()


if __name__ == "__main__":
    client = Client()
    df_1 = delayed(make_df)(1) 
    df_2 = delayed(make_df)(2)
    df_aggregated = delayed(summarize_dataframes)(df_1, df_2)
    print(compute(df_aggregated))

MutableMapping?

i started implementing a MutableMapping, but I’m not sure how (a) the internals are supposed to work and (b) how i make a worker use this:

from collections.abc import MutableMapping

import upath


class SavedDataFrameMutableMapping(MutableMapping):
    def __init__(self, path_root: upath.UPath):
        # what do i need here?
        self.path_root = path_root
        self.storage = {}

    def __setitem__(self, key, value):
        if isinstance(value, pd.DataFrame):
            path = self.path_root / str(key)
            value.to_parquet(path)
            value = path
        self.storage[key] = value

    def __getitem__(self, key):
        value = self.storage[key]
        path = self.path_root / str(key)
        if path in self.storage:
            return pd.read_parquet(path)
        return value

...
where_to_save_stuff = upath.UPath("gs://my_bucket/some_folder/")
my_mapping = SavedDataFrameMutableMapping(where_to_save_stuff)
with Worker(data=my_mapping):
    # how does this work?
   ...