Signalling that a task should be rescheduled at a later time

Note: I’ve asked this prevoisly on github, but agree that this is likely better discussed (first?) on discourse. Now that I’m here, I’d like to re-state this question for a discussion:

I’m wondering how to best handle reads from high latency storage (e.g. tape) or other tasks which may take a long time without the task itself making any progress (i.e. the task has to wait for something else). In this setting, I’ll maybe have a task graph which contains many parallel reads of different chunks, each of which may either be only on tape or may already be available on disk cache. If I start reading data which is still on tape, this read will encounter a large delay. But maybe that time could be used better if this read attempt triggers loading from tape while the dask scheduler would continue running other tasks which may have their data readily available. If none of the data is readily available, this would lead to a quick pass across all necessary tape reads and the second pass would benefit from all data being available on disk.

So I’m wondering if it would be possible to notify the scheduler from an already running task about the request to reschedule the task itself at a later time, e.g.:

def some_read_function():
    if not_yet_available:
        raise RetryLater(estimated_delay=5*60)  # e.g. seconds, could also be np.timedelta64() etc...

A scheduler could catch this exception and might adjust its scheduling strategy accordingly.

Is something like this behaviour already possible? Are there better options? Would it be feasible to implement something like this using dask?

I’m tempted to think about something like an async task at this point, but probably that would be a bad idea, because a suspended async task would require some memory while an aborted and rescheduled task wouldn’t require any memory to be kept.

@d70-t Thanks for the question! I’m wondering if annotations can help – for example, you can reserve certain workers for the read operation while the rest are free for other tasks?

Hi @pavithraes, thanks for the reply!

I don’t think annotations (or reserving individual workers for reading) would be the right thing here, but maybe I got it wrong.

The use case I’ve in mind would be that some storage system in the background starts loading some data into cache in background once someone asks for the data. The storage system is able to fetch many pieces of data concurrently, but needs to know which pieces to fetch. Thus, I want to start fetching all (or very many) pieces concurrently such that my storage system preloads the cache. This could be done on all workers concurrently.
However, I can’t really open all the data concurrently, because that would overflow my RAM, so I don’t want all my opening tasks to actually run in parallel. It’s a bit like asynchronously fetching the data, but the tasks shouldn’t continue working once the data is ready, but only once the scheduler allows this to happen.

If it would be possible to completely abort a task if it takes too long (and prevent it from being rescheduled for a while), this would open a spot to start the next loading task (which may or may not be aborted as well). That way, it should be possible to limit the amount of concurrently running tasks (which take up RAM) while simultaneously triggering many more concurrent loads into the cache of the storage system behind the scenes. And once the previously aborted tasks are rescheduled, it would be more likely to quickly find the data in cache.