I have multiple computations to trigger. They all based on the same input data(from a database). I trigger all the computations at the same time. Is there a good way to let a worker reuse the data from other computation instead of fetching from the database again? I read Opportunistic Caching — Dask documentation. but it does not feel reliable at all…
Also, maybe a dumb question, what’s the proper way of merging multiple computations? In my case, the data are just written back to another table, so it won’t return a dataframe, but just a delayed object. Im thinking if I merge all the delayed objects into one then call compute on that one. It’ll probably be better.
I agree, opportunistic caching probably isn’t what I’d recommend. From your description I think looking over the Dask docs on data locality would help, but it’s hard to say without more information. Could you provide more details on your specific use case, or, even better, a minimally reproducible example?
By default Dask will attempt to evict data from memory as soon as possible. If you triggered all of your computations sequentially for example, Dask would read your data in memory first. Then run your computation and immediatly evict the in-memory data. And repeat that for each iteration.
If each of your computations involves creating a dataframe from the same source data, I’m not sure if Dask will be aware that it’s the exact same Dataframe.
A simple way to enforce this behaviour is to first read in all your source data to a Dataframe, and then kick off your computations which now can use a reference to that same dataframe variable like so:
import dask.dataframe as dd
from dask.distributed import wait, progress
# .persist() makes sure this data is kept in-memory. You can use the dashboard to monitor this
df = dd.read_csv('s3://...').persist()
# .persist() will kick off reading in data, but won't block the interpreter. You can use wait() and progress() to actually wait for the entire dataframe to be in memory before moving on
progress(df) # will show a nice widget with a loading bar
wait(df) # block until data is in memory
# now move on and kick off multiple computations by passing df
df\
.do_something()\
.do_another_thing_with_same_data()\
.do_something_really_expensive_with_data()
Thanks!! @matthiasdv@scharlottej13. There are 2 use case I have in mind, I’ll explain with semi-pseudo python code in below:
Sequential use case:
def calculates_a_boolean(df: dd.DataFrame) -> Delayed:
...
def calculates_main_output(df: dd.DataFrame) -> Delayed:
# this is the punchline. The main graph cannot be built until some other computation is done.
# and they share some computation. I want to make that computation reused
flag = calculates_a_boolean(df).compute()
if flag:
# something fancy
# could be more collection API call
# delayed, map_partitions..
...
else:
# something totally different, but still fancy
# could be more collection API call
# delayed, map_partitions..
...
# return something
with Client() as client:
input_df = dd.read_sql_table(...).persist()
delayed_result = calculates_main_output(input_df)
# because we created local variable input_df that should hold the remote object in memory even after the 1st computation is done
# so hopefully this will do the trick for reusing
wait(delayed_result.persist())
parallel use case:
def fun1(df: dd.DataFrame) -> Delayed:
...
def fun2(df: dd.DataFrame) -> Delayed:
...
def fun3(df: dd.DataFrame) -> Delayed:
...
with Client() as client:
input_df = dd.read_sql_table(...).persist()
delayed_results = []
delayed_results.append(func1(input_df))
delayed_results.append(func2(input_df))
delayed_results.append(func3(input_df))
del input_df # we don't need this local variable to hold remote object because everything will be in one merged graph which is done by client.persist([...])
all = client.persist(delayed_results)
wait(all)
Thanks for providing some pseudo-code @ubw218! I think how you’ve used @matthiasdv’s suggestion for persist and wait makes sense for sending input_df to a number of different computations.
For the “sequential use case”, I’m not sure you need the wait(delayed_result.persist()), but I guess that depends on what comes next!
In the “parallel use case”, appending the delayed objects and then calling compute does indeed look reasonable for introducing parallelism. I would only add a caution for mixing Dask dataframe and Dask delayed, which can sometimes add unnecessary complexity (using map_partitions can often be all you need). Since you mention the data will be written to another table and not returned, though, delayed is probably the right way to go (for more see the docs on combining high- and low- level interfaces).