How to avoid re-execution

Hi,
I want to avoid re-executing a task and use cache to store the data.
My question is how come the cache does not store the executed data?
The follwing is the code:

### BEGIN ###
import dask
from dask.distributed import Client

# Start a Dask client
client = Client()

# Dictionary to store results
results_cache = {}

# Define a function to check if a task has already been executed
def is_task_executed(key):
    return key in results_cache

# Define a delayed function
@dask.delayed
def my_task(x):
    if is_task_executed(x):
        return results_cache[x]  # Return cached result if available
    else:
        result = x + 1
        results_cache[x] = result  # Cache the result
        print("= results_cache size =", len(results_cache))
        return result

# Example usage
result1 = my_task(10)
result2 = my_task(20)

# Trigger computation
result1.compute()
result2.compute()

# Check if a task has already been executed
print(">>> ", is_task_executed(10))  #  I expect the answer is TRUE; but it returned false
print(">>> ", is_task_executed(20))  # 

#### END ####

output :

False
False

I am new to Dask. Any idea? Thanks

Todd

Hi @tclin1998, welcome to Dask community!

Since you started a LocalCluster through

You are in a distributed setting. The my_task function is executed on different worker process than your main one, so the local results_cache variable won’t be updated. Such a cache would work with a Threaded Scheduler, but not more. Having a distributed cache correctly handled is not that easy, do you really need such a feature?

Hi Guillaume:
Thank you for your reply.
Yes, I do need such distributed cache to be correctly handled, because
each task takes hours to complete. I do not want the same task to be executed twice.
Any pointers on this ?
Thanks
Todd

Well, it depends on your workflow and your result size.

You could handle all this logic on the Client side, saving results in a dict if they are not too big.

Or you could persist the result using your client, and keep only a reference to the object stored on distributed memory.

Buut Dask also provide some intelligent caching and keeping results, depending on how your workflow is built. If your result is a dependency of following computation, they should automatically be kept in memory. What is your overall workflow?

Thank you for your reply.
My work flow look like the following:
B C
/ \ /
D E F

task B dependents on : D and E
task C dependents on : E and F
I do not want E to be executed twice.

Thanks
Todd

Well, it depends on your workflow and your result size.

You could handle all this logic on the Client side, saving results in a dict if they are not too big.

" Or you could persist the result using your client, and keep only a reference to the object stored on distributed memory.

Buut Dask also provide some intelligent caching and keeping results, depending on how your workflow is built. If your result is a dependency of following computation, they should automatically be kept in memory."

The above general description is very vague to me. I am new to Dask.
Is there an example available?
Thanks
Todd

Could you first provide some minimal reproducer of your workflow? Are you using Delayed API, or just Future?