Is there a good way to force move blocked futures?


I have been using dask for a while to run long simulations. Usually a simulation has two steps

def determine_arguments(config):

def run_job(args):

args = []
for i in range(100):
   # takes 1-3 seconds each
   f = client.submit(determine_arguments, i)

# I am forced to gather because the args contain priority information 
args = client.gather(args)

# gets blocked here by waiting for the gather 
# but workers dont let the args flow through
# and it seems they are stuck behind long running jobs
for arg in args:
   # could take 10m-8hr depending on job type
   f = client.submit(run_job, arg)

but if I have set up multiple sets of simulations or other long running jobs are going on, some of the determine_arguments futures get stuck behind long running jobs although there are open workers. Then basically until some random long running job finishes

Is there a way to move futures from one worker to another worker manually? Can I potentially replicate them or something to specific workers I know are open to not get blocked? I am open to anything! I currently have a cluster with around 250 workers and don’t want half of it sitting idle.

Thank you!

I’m not sure to understand: your example doesn’t block by itself, it is blocked because you’ve submitted other tasks to the same Dask Cluster, and some futures are stuck by these tasks submitted by another client/process ?

You can always force the tasks to execute on some specific worker using workers= keyword when submitting work.

But you shouldn’t have to do that, because Dask implements work stealing mechanism. I’m not sure why it is not happening in your case.