Hello,
I’m encountering a problem where Dask Distributed workers are not being properly shut down after completing their tasks. Despite successfully processing the tasks, the Dask scheduler keeps waiting for task completion, which seems to cause the entire process to remain stuck, even though the worker itself is deleted.
Context:
I am using Dask Distributed to parallelize the processing of video segments, and after a worker completes its tasks, I want to ensure that it shuts down properly to free up resources and reduce costs. The goal is to shut down workers once their tasks are completed to minimize resource usage and costs. However, after the worker is retired or deleted, the process does not terminate and remains stuck, as if waiting for task completion, even though no visible tasks are being processed.
What I have tried:
-
Shutting down workers with
client.retire_workers()
:
I initially tried usingclient.retire_workers(workers=[worker_address], close_workers=True)
to remove the workers after task completion. However, this didn’t result in a clean shutdown, and the process remains stuck, as if waiting for task completion. -
Forcing shutdown with
sys.exit(0)
:
I then added a forced shutdown usingsys.exit(0)
after callingretire_workers
, but the process still didn’t complete and remained stuck. If I manually trigger a shutdown on the worker right after theworker.close()
command, the process does terminate correctly. However, in this case, the worker fails to shut down the container, which is the primary goal (as we want to reduce costs by shutting down the containers after tasks are completed). -
Attempt using
asyncclose_gracefully_and_exit()
:
I tried the following approach to gracefully shut down the worker and ensure the process exits correctly. The process does complete successfully, and the worker is disconnected normally. However, the container does not shut down afterward, keeping the VM running and preventing resource savings.
Here’s the code I used:
def asyncclose_gracefully_and_exit():
logging.warning(f"Shutting down the worker {os.environ.get('HOSTNAME')}.")
worker = get_worker()
try:
# Close Dask Worker correctly
worker.close()
logging.warning(f"Worker {os.environ.get('HOSTNAME')} has been shut down.")
# trying to end the container
if os.getpid() == 1:
logging.warning(f"Exiting PID 1 (container).")
sys.exit(0)
else:
logging.warning(f"Exiting normal process {os.getpid()}.")
sys.exit(0)
except Exception as e:
logging.error(f"Error shutting down worker: {e}")
sys.exit(1)
def process_and_shutdown_segment(segment_path, frame_rate, ffmpeg_commands, id_pipeline, ai_model_path):
process_segment(segment_path, frame_rate, ffmpeg_commands, id_pipeline, ai_model_path)
client = get_client()
client.submit(asyncclose_gracefully_and_exit)
def process_video_segments(client, temp_folder, frame_rate_in, ffmpeg_commands, pipeline, ai_model_path):
all_futures = []
for segment in os.listdir(temp_folder):
segment_path = os.path.join(temp_folder, segment)
# Send each video segment to a worker that will be shutdown after the task
future = client.submit(process_and_shutdown_segment, segment_path, frame_rate_in, ffmpeg_commands, pipeline, ai_model_path)
all_futures.append(future)
client.gather(all_futures) # Waiting for the tasks
In this approach:
- The worker successfully completes its tasks and is shut down, but the container doesn’t exit even when
sys.exit(0)
is called, keeping the VM running and preventing cost savings.
Current Behavior:
- If I try to shut down the worker immediately after
worker.close()
, the worker is deleted, and the process terminates, but the container is not shut down, which is necessary for our cost-reduction strategy. - If I attempt to wait for the tasks to finish and then shut down the worker, the process gets stuck, as if it’s waiting for tasks that have already been processed.
Expected behavior:
- Workers should be shut down properly after all tasks are completed.
- The scheduler should not be stuck after the worker is removed.
- The container hosting the worker should be shut down after tasks are completed to save costs.
Actual behavior:
- Workers are deleted, but the scheduler remains stuck waiting for task completion.
- Even after using
sys.exit(0)
, the process doesn’t terminate as expected. - When trying to shut down the worker immediately after
worker.close()
, the container is not shut down.
Environment:
- Dask version: 2024.7.1
- Deployment: Running Dask Distributed in containers on VMs on GCP
- Task type: Video segment processing using
client.submit
Goal:
The primary goal is to reduce costs by shutting down workers (and their containers) after tasks are completed. However, we are encountering difficulties either with the process getting stuck or the container not shutting down.
Any insights or suggestions on how to handle this would be greatly appreciated! Is there something I’m missing in properly shutting down the workers and containers after task completion?
Thank you!