Runtime of currently processing jobs to cancel/restart long running jobs

Hi all,

Apologies if this has been answered before and I couldn’t find the search term. I using job_queue to run about 10-20 thousand fairly long running embarrassingly parallel jobs (5-10 minutes) as futures submitted to a SLURM cluster with a few thousand workers. A small and unpredictable proportion of jobs take too long to be feasible and I would like to run an alternative version of the job script for these inputs - my idea was to have the process that submits futures iterate across jobs and cancel and restart the alternative script for those that pass a threshold processing time.

I am struggling to find a way to measure the runtime of a currently running job on a given worker or future from the submitting process. What I have found already are spans and using wait or gather on the future + a timeout. My issue with spans is they appear to only update when a job is complete and I need to apply timeouts to currently running jobs. My issue with a timeout on wait or gather is the timeout appears to count from when the wait or gather is called, not from the start of the process on the worker.

Any tips on metrics I may have missed are appreciated!

Hi @pculviner, welcome to Dask Discourse!

I have to admit that I took some time to search for a solution, but I didn’t find anything really suitable… You can have a look in tasks transition log (client.cluster.scheduler.transition_log), or use a SchedulerPlugin and the transition method. However I’m not sure you’ll be able to get the real start time, but only the time where a task get assigned to a Worker. This assignment can mean the task is only queued on the Worker…

I’m a bit frustrated by this, and I’m pretty sure there might be a real way to get a task real starting time…

Another solution would be to whatch your futures list and just periodically check their state.

Another completly different solution would be to not use Dask, only Slurm, or maybe submitit.

Thank you for looking into it and for the welcome! And thank you for the tip about submitit. One of the reasons I started using dask is our cluster often has a long wait time to start a job but I’m able to keep started jobs up for long periods of time - so it’s nice to have a pool of workers that can rapidly cycle through jobs of arbitrary lengths, but I will consider this solution.

It occurred to me after writing for this particular application I might be able to just integrate a timeout into the job script and have it automatically switch to the alternative version upon timeout - no dask-side task monitoring required.

I admit I’m still curious about this though… I will take a look at transition_log; with complete enough logs, I might be able to watch the tasks tick on and off a worker and back-calculate the start times based on when other assigned jobs complete.