Collecting artifacts before worker teardown

My dask task output a bunch of files which I’d like to zip up and upload to AWS S3
I’ve created an EC2Cluster and implemented my own WorkerPlugin (below).

I can see the “test_setup.text” file being created on the worker server, but I don’t see “test_teardown.txt” get created.

class DaskWorkHandler(WorkerPlugin):
    """
    Worker life-cycle handler
    """
    def __init__(self, user: str, name: str, output_path: str):
        self.user = user
        self.name = name
        self.output_path = output_path
        self.worker_id = None

    def setup(self, worker):
        self.worker_id = worker.id

        with open("test_setup.txt", "w") as f:
            f.write(f"current time is {datetime.datetime.now()}")

    def teardown(self, worker):
        with open("test_teardown.txt", "w") as f:
            f.write(f"current time is {datetime.datetime.now()}")

        # zip up our folder
        output_file = bz2_folder(self.output_path, f"{self.worker_id}.tar.bz2")

        # upload results to s3
        aws_utils.s3_upload_file(output_file, S3_BUCKET, f"users/{self.user}/{self.name}/{output_file}")
        print("aws s3 upload complete")

I seem to get the exception (which looks like my code is attempting to run after the Worker is shutdown ?):

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7fef20c6a3e0>>, <Task finished name='Task-194' coro=<SpecCluster._correct_state_internal() done, defined at /home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/distributed/deploy/spec.py:314> exception=HTTPClientError('An HTTP Client raised an unhandled exception: cannot schedule new futures after shutdown')>)
Traceback (most recent call last):
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiobotocore/httpsession.py", line 172, in send
    resp = await self._session.request(
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/client.py", line 535, in _request
    conn = await self._connector.connect(
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/connector.py", line 542, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/connector.py", line 907, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/connector.py", line 1154, in _create_direct_connection
    hosts = await asyncio.shield(host_resolved)
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/connector.py", line 880, in _resolve_host
    addrs = await self._resolver.resolve(host, port, family=self._family)
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/resolver.py", line 33, in resolve
    infos = await self._loop.getaddrinfo(
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/asyncio/base_events.py", line 855, in getaddrinfo
    return await self.run_in_executor(
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/asyncio/base_events.py", line 813, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/home/davico/anaconda3/envs/mft/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

During handling of the above exception, another exception occurred:

Correction: The error response is a red herring (due to not calling close on the client and the cluster)
However, I’m still curious why the teardown does not work

Thanks for the question @davico888! I tried your snippet locally and the log files wrote as expected for both setup and teardown:

from dask.distributed import Client, LocalCluster
from dask.distributed.diagnostics.plugin import WorkerPlugin

def my_func():
    return None

class DaskWorkHandler(WorkerPlugin):
    """Worker life-cycle handler."""
    def __init__(self):
        self.worker_id = None

    def setup(self, worker):
        self.worker_id = worker.id

        with open(f"test_setup_worker_{self.worker_id}.txt", "w") as f:
            f.write("setting up")

    def teardown(self, worker):
        with open(f"test_teardown_worker_{self.worker_id}.txt", "w") as f:
            f.write("tearing down")
            
cluster = LocalCluster(n_workers=5)
client = Client(cluster)

shutdown_handler = DaskWorkHandler()
client.register_worker_plugin(shutdown_handler)

future = client.submit(my_func)
result = future.result()

client.close()
cluster.close()

@davico888 does this work for you locally and would you be able to share a minimal reproducer?@jacobtomlinson do you have any thoughts on why this might be happening?

Yep this works on a LocalCluster - just oddly not a EC2Cluster.
I would have expected the teardown behaviour to be common in LocalCluster and EC2Cluster
I’ve attached my code below:

        aws_cfg = get_ec2_config(instance_type, ami_image, docker_image, vpc, subnet, security_group)
        ec2_cluster = EC2Cluster(**aws_cfg)
        client = Client(ec2_cluster)

        worker_handler = DaskWorkHandler(task.name)
        client.register_worker_plugin(worker_handler)
        result = client.submit(task.run).result()

        return result