I have been using dask for a while to run long simulations. Usually a simulation has two steps
def determine_arguments(config): ... def run_job(args): ... args =  for i in range(100): # takes 1-3 seconds each f = client.submit(determine_arguments, i) args.append(f) # I am forced to gather because the args contain priority information args = client.gather(args) # gets blocked here by waiting for the gather # but workers dont let the args flow through # and it seems they are stuck behind long running jobs for arg in args: # could take 10m-8hr depending on job type f = client.submit(run_job, arg) futures.append(f)
but if I have set up multiple sets of simulations or other long running jobs are going on, some of the
determine_arguments futures get stuck behind long running jobs although there are open workers. Then basically until some random long running job finishes
Is there a way to move futures from one worker to another worker manually? Can I potentially replicate them or something to specific workers I know are open to not get blocked? I am open to anything! I currently have a cluster with around 250 workers and don’t want half of it sitting idle.