Trying to shutdown workers with completed tasks in order to reduce costs

Hello,

I’m encountering a problem where Dask Distributed workers are not being properly shut down after completing their tasks. Despite successfully processing the tasks, the Dask scheduler keeps waiting for task completion, which seems to cause the entire process to remain stuck, even though the worker itself is deleted.

Context:

I am using Dask Distributed to parallelize the processing of video segments, and after a worker completes its tasks, I want to ensure that it shuts down properly to free up resources and reduce costs. The goal is to shut down workers once their tasks are completed to minimize resource usage and costs. However, after the worker is retired or deleted, the process does not terminate and remains stuck, as if waiting for task completion, even though no visible tasks are being processed.

What I have tried:

  1. Shutting down workers with client.retire_workers():
    I initially tried using client.retire_workers(workers=[worker_address], close_workers=True) to remove the workers after task completion. However, this didn’t result in a clean shutdown, and the process remains stuck, as if waiting for task completion.

  2. Forcing shutdown with sys.exit(0):
    I then added a forced shutdown using sys.exit(0) after calling retire_workers, but the process still didn’t complete and remained stuck. If I manually trigger a shutdown on the worker right after the worker.close() command, the process does terminate correctly. However, in this case, the worker fails to shut down the container, which is the primary goal (as we want to reduce costs by shutting down the containers after tasks are completed).

  3. Attempt using asyncclose_gracefully_and_exit():
    I tried the following approach to gracefully shut down the worker and ensure the process exits correctly. The process does complete successfully, and the worker is disconnected normally. However, the container does not shut down afterward, keeping the VM running and preventing resource savings.

Here’s the code I used:

def asyncclose_gracefully_and_exit():
    logging.warning(f"Shutting down the worker {os.environ.get('HOSTNAME')}.")

    worker = get_worker()
    
    try:
        # Close Dask Worker correctly
        worker.close()  
        logging.warning(f"Worker {os.environ.get('HOSTNAME')} has been shut down.")
        
        # trying to end the container
        if os.getpid() == 1:
            logging.warning(f"Exiting PID 1 (container).")
            sys.exit(0)
        else:
            logging.warning(f"Exiting normal process {os.getpid()}.")
            sys.exit(0)

    except Exception as e:
        logging.error(f"Error shutting down worker: {e}")
        sys.exit(1)


def process_and_shutdown_segment(segment_path, frame_rate, ffmpeg_commands, id_pipeline, ai_model_path):
    process_segment(segment_path, frame_rate, ffmpeg_commands, id_pipeline, ai_model_path)

    client = get_client()
    client.submit(asyncclose_gracefully_and_exit)


def process_video_segments(client, temp_folder, frame_rate_in, ffmpeg_commands, pipeline, ai_model_path):
    all_futures = []
    for segment in os.listdir(temp_folder):
        segment_path = os.path.join(temp_folder, segment)
        # Send each video segment to a worker that will be shutdown after the task
        future = client.submit(process_and_shutdown_segment, segment_path, frame_rate_in, ffmpeg_commands, pipeline, ai_model_path)
        all_futures.append(future)

    client.gather(all_futures)  # Waiting for the tasks

In this approach:

  • The worker successfully completes its tasks and is shut down, but the container doesn’t exit even when sys.exit(0) is called, keeping the VM running and preventing cost savings.

Current Behavior:

  • If I try to shut down the worker immediately after worker.close(), the worker is deleted, and the process terminates, but the container is not shut down, which is necessary for our cost-reduction strategy.
  • If I attempt to wait for the tasks to finish and then shut down the worker, the process gets stuck, as if it’s waiting for tasks that have already been processed.

Expected behavior:

  • Workers should be shut down properly after all tasks are completed.
  • The scheduler should not be stuck after the worker is removed.
  • The container hosting the worker should be shut down after tasks are completed to save costs.

Actual behavior:

  • Workers are deleted, but the scheduler remains stuck waiting for task completion.
  • Even after using sys.exit(0), the process doesn’t terminate as expected.
  • When trying to shut down the worker immediately after worker.close(), the container is not shut down.

Environment:

  • Dask version: 2024.7.1
  • Deployment: Running Dask Distributed in containers on VMs on GCP
  • Task type: Video segment processing using client.submit

Goal:

The primary goal is to reduce costs by shutting down workers (and their containers) after tasks are completed. However, we are encountering difficulties either with the process getting stuck or the container not shutting down.

Any insights or suggestions on how to handle this would be greatly appreciated! Is there something I’m missing in properly shutting down the workers and containers after task completion?

Thank you!

Dask is a parallel computing library designed for scaling python computations across multiple cores or nodes, but as far as i know, it does not handle the lifecycle management of virtual machines (VMs) or containers. Therefore, when you disconnect workers from VMs, dask will not automatically shut down those VMs or containers.

To effectively reduce costs by shutting down resources after task completion, you’ll need to integrate infrastructure management solutions alongside dask.

That being said! there are features in dask that try to achieve this for example k8s - you can scale to 0 - though from my experience it does not work as expected.
you can try look at Dask API at the worker and client sections, you do have client.shutdown which does shutdown the cluster but behind the scenes i still see that k8s runs the pods so i have code to handle that as well.

1 Like

Thank you @Hvuj

I will try using custom instructions for the worker lifecycle as per Customize Initialization — Dask documentation
Have just stumbled on this documentation right now.

1 Like

Hello Dask Community,

I recently implemented a solution to automatically shut down Dask workers after they complete their assigned tasks, which has significantly helped in reducing operational costs on GCP. This approach is particularly useful in environments where resources are dynamically managed, such as cloud-based deployments.

Problem Overview

When running multiple Dask workers, they execute their initial tasks and some of them are left in idle state increasing costs.

Solution: WorkerPlugin with Idle Timeout

To address this, we created a WorkerPlugin that initiates a graceful shutdown of workers after they have been idle for a specified timeout period. This ensures that workers remain available for new tasks within the timeout window before shutting down, preventing premature termination.

Implementation Steps

  1. Create the ShutdownWorkerPlugin:

    from distributed import WorkerPlugin
    import logging
    import os
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    logger.info("ShutdownWorkerPlugin is loading.")
    
    class ShutdownWorkerPlugin(WorkerPlugin):
        name = "shutdown_worker_plugin"
    
        def setup(self, worker):
            self.worker = worker
            self.idle_timeout = 30  # Idle time in seconds before shutdown
            self.last_task_time = time.time()
            self.shutdown_scheduled = False
            logger.info(f"ShutdownWorkerPlugin configured for worker {worker.address}")
    
            # Start periodic idle checks
            worker.loop.call_later(5, self.check_idle)
    
        def transition(self, key, start, finish, *args, **kwargs):
            if start == 'executing':
                # Update last task time when a new task starts
                self.last_task_time = time.time()
                self.shutdown_scheduled = False
    
            if finish in ('memory', 'released'):
                logger.debug(f"Task {key} completed.")
                self.last_task_time = time.time()
    
        def check_idle(self):
            current_time = time.time()
            # Check if worker is idle and the idle timeout has been exceeded
            if not self.worker.state.executing and (current_time - self.last_task_time) > self.idle_timeout and not self.shutdown_scheduled:
                logger.warning(f"Worker {self.worker.address} shutting down after being idle for {self.idle_timeout} seconds.")
                self.shutdown_scheduled = True
                self.worker.loop.add_callback(self.worker.close_gracefully)
            
            # Schedule the next idle check
            self.worker.loop.call_later(5, self.check_idle)
    
  2. Register the Plugin in Your Dask Setup Script:

    Ensure that the plugin is registered when initializing the Dask client. You can load this plugin using the --preload option.

    import logging
    from dask.distributed import Client
    from shutdown_worker_plugin import ShutdownWorkerPlugin  # Ensure this path is correct
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    def register_shutdown_plugin(client):
        client.register_worker_plugin(ShutdownWorkerPlugin(), name='shutdown_worker_plugin')
        logger.info("ShutdownWorkerPlugin registered successfully.")
    
    def main():
        # Initialize Dask client
        client = Client('tcp://127.0.0.1:8786')
        logger.info("Dask client initialized.")
    
        # Register the shutdown plugin
        register_shutdown_plugin(client)
    
        # Submit your tasks as usual
        futures = client.map(your_task_function, your_data)
        results = client.gather(futures)
        logger.info("All tasks completed successfully.")
    
        # Close the client
        client.close()
        logger.info("Dask client closed.")
    
    if __name__ == '__main__':
        main()
    
  3. Load the Plugin Using --preload:

    When starting your Dask workers, use the --preload option to load the plugin script.

    dask-worker scheduler-address:port --preload /path/to/shutdown_worker_plugin.py
    

    Replace /path/to/shutdown_worker_plugin.py with the actual path to your plugin script.

Key Points:

  • Idle Timeout (idle_timeout): Defines how long (in seconds) a worker should wait after completing tasks before shutting down. Adjust this value based on your workload and performance requirements.

  • Periodic Checks (check_idle): The plugin periodically checks if the worker has been idle beyond the specified timeout and initiates a graceful shutdown if conditions are met.

  • Graceful Shutdown: Ensures that workers finish their current tasks before shutting down, preventing abrupt termination and potential data loss.

Benefits:

  • Cost Efficiency: Automatically shuts down idle workers, reducing unnecessary resource usage and costs.

  • Resource Optimization: Keeps workers available for new tasks within the idle timeout period, ensuring efficient task handling without premature shutdowns.

  • Simplicity: Easy to implement without requiring changes to the scheduler or additional plugins.

Best regards

Hi @AlverGant,

Thanks for sharing all this, but I don’t understand quite right the initial problem… How were you launching your Workers? Were you using cluster managers like dask-kubernetes or dask-cloudprovider? Or something else? These two packages should already be doing what you want.

1 Like

Hi @guillaumeeb,

The fact is that I am relatively new to Dask and I totally wasn’t familiar with its cluster managers like dask-cloudprovider…
I am launching the VMs on Google Cloud Platform using its Compute Engine API, it took me a lot of work to customize and manage the cluster…
I really do appreciate the suggestion and will definitely explore dask-cloudprovider from now on as my next step is to use GPUs.

There is one particular thing that I am doing that its not clear to me if the cluster manager will do, and that is I am using preemptible machines as workers and if Google preempts them the cluster manager will automatically relaunch the evicted workers?

Thank you and best regards!

It should, at least using adaptive clusters.

1 Like

Also keep in mind there are other fully packaged solutions for managing Dask clusters like Coiled or Nebari.

3 Likes