Hi,
I want to avoid re-executing a task and use cache to store the data.
My question is how come the cache does not store the executed data?
The follwing is the code:
### BEGIN ###
import dask
from dask.distributed import Client
# Start a Dask client
client = Client()
# Dictionary to store results
results_cache = {}
# Define a function to check if a task has already been executed
def is_task_executed(key):
return key in results_cache
# Define a delayed function
@dask.delayed
def my_task(x):
if is_task_executed(x):
return results_cache[x] # Return cached result if available
else:
result = x + 1
results_cache[x] = result # Cache the result
print("= results_cache size =", len(results_cache))
return result
# Example usage
result1 = my_task(10)
result2 = my_task(20)
# Trigger computation
result1.compute()
result2.compute()
# Check if a task has already been executed
print(">>> ", is_task_executed(10)) # I expect the answer is TRUE; but it returned false
print(">>> ", is_task_executed(20)) #
#### END ####
output :
False
False
I am new to Dask. Any idea? Thanks
Todd
Hi @tclin1998, welcome to Dask community!
Since you started a LocalCluster through
You are in a distributed setting. The my_task function is executed on different worker process than your main one, so the local results_cache
variable won’t be updated. Such a cache would work with a Threaded Scheduler, but not more. Having a distributed cache correctly handled is not that easy, do you really need such a feature?
Hi Guillaume:
Thank you for your reply.
Yes, I do need such distributed cache to be correctly handled, because
each task takes hours to complete. I do not want the same task to be executed twice.
Any pointers on this ?
Thanks
Todd
Well, it depends on your workflow and your result size.
You could handle all this logic on the Client side, saving results in a dict if they are not too big.
Or you could persist the result using your client, and keep only a reference to the object stored on distributed memory.
Buut Dask also provide some intelligent caching and keeping results, depending on how your workflow is built. If your result is a dependency of following computation, they should automatically be kept in memory. What is your overall workflow?
Thank you for your reply.
My work flow look like the following:
B C
/ \ /
D E F
task B dependents on : D and E
task C dependents on : E and F
I do not want E to be executed twice.
Thanks
Todd
Well, it depends on your workflow and your result size.
You could handle all this logic on the Client side, saving results in a dict if they are not too big.
" Or you could persist the result using your client, and keep only a reference to the object stored on distributed memory.
Buut Dask also provide some intelligent caching and keeping results, depending on how your workflow is built. If your result is a dependency of following computation, they should automatically be kept in memory."
The above general description is very vague to me. I am new to Dask.
Is there an example available?
Thanks
Todd
Could you first provide some minimal reproducer of your workflow? Are you using Delayed API, or just Future?