I have a data analysis pipeline with many functions that consume and produce DataFrames in a DAG-like way.
What I’m trying to accomplish
- i want dask to save the output of any DataFrame-returning task (each
delayed
?) as a parquet file on a file system I specify (e.g. a path in a fsspec.FileSystem)- i’m guessing I’d provide
serialize
/deserialize
methods forDataFrame
s?
- i’m guessing I’d provide
- if i re-run a pipeline…
- I’d like dask to reuse saved results
- if i delete results in the file system, for dask to re-compute them
In other words:
- I want to memoize tasks
- if you’ve used Luigi: I want Luigi’s result persistence and fault tolerance functionality (without the bulky
Target
API)
What’s a good way to accomplish this? (Is there a way?)
After reading a lot of GitHub issues and stackoverflow questions, it sounds like i might want to"
- make a scheduler plugin
- make a custom
MutableMapping
for worker data, overriding how tasks results are handled- related: Create MutableMapping for automatic compression #3656
- related:
github.com/dask/zict
(2 link limit, sorry)
- maybe a custom DataFrame store?
- related: John Kirkham’s DistributedArrayStore
gist.github.com/jakirkham/56982b7c845906cfe6fb2e9d92af48a0
(2 link limit, sorry)
- related: John Kirkham’s DistributedArrayStore
Simple example…
here’s a DataFrame flow that i’d like to “daskify.”
import numpy as np
import pandas as pd
from dask import compute, delayed
from distributed import Client
def make_df(seed: int = 0) -> pd.DataFrame:
print("running computation")
rng = np.random.default_rng(seed)
return pd.DataFrame(
data=rng.standard_normal((3, 2)),
columns=list("AB"),
)
def summarize_dataframes(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
return pd.concat([df1, df2], axis=0).describe()
if __name__ == "__main__":
client = Client()
df_1 = delayed(make_df)(1)
df_2 = delayed(make_df)(2)
df_aggregated = delayed(summarize_dataframes)(df_1, df_2)
print(compute(df_aggregated))
MutableMapping?
i started implementing a MutableMapping, but I’m not sure how (a) the internals are supposed to work and (b) how i make a worker use this:
from collections.abc import MutableMapping
import upath
class SavedDataFrameMutableMapping(MutableMapping):
def __init__(self, path_root: upath.UPath):
# what do i need here?
self.path_root = path_root
self.storage = {}
def __setitem__(self, key, value):
if isinstance(value, pd.DataFrame):
path = self.path_root / str(key)
value.to_parquet(path)
value = path
self.storage[key] = value
def __getitem__(self, key):
value = self.storage[key]
path = self.path_root / str(key)
if path in self.storage:
return pd.read_parquet(path)
return value
...
where_to_save_stuff = upath.UPath("gs://my_bucket/some_folder/")
my_mapping = SavedDataFrameMutableMapping(where_to_save_stuff)
with Worker(data=my_mapping):
# how does this work?
...