Hello,
I have been using dask for a while to run long simulations. Usually a simulation has two steps
def determine_arguments(config):
...
def run_job(args):
...
args = []
for i in range(100):
# takes 1-3 seconds each
f = client.submit(determine_arguments, i)
args.append(f)
# I am forced to gather because the args contain priority information
args = client.gather(args)
# gets blocked here by waiting for the gather
# but workers dont let the args flow through
# and it seems they are stuck behind long running jobs
for arg in args:
# could take 10m-8hr depending on job type
f = client.submit(run_job, arg)
futures.append(f)
but if I have set up multiple sets of simulations or other long running jobs are going on, some of the determine_arguments
futures get stuck behind long running jobs although there are open workers. Then basically until some random long running job finishes
Is there a way to move futures from one worker to another worker manually? Can I potentially replicate them or something to specific workers I know are open to not get blocked? I am open to anything! I currently have a cluster with around 250 workers and don’t want half of it sitting idle.
Thank you!