My dask task output a bunch of files which I’d like to zip up and upload to AWS S3
I’ve created an EC2Cluster and implemented my own WorkerPlugin (below).
I can see the “test_setup.text” file being created on the worker server, but I don’t see “test_teardown.txt” get created.
class DaskWorkHandler(WorkerPlugin):
"""
Worker life-cycle handler
"""
def __init__(self, user: str, name: str, output_path: str):
self.user = user
self.name = name
self.output_path = output_path
self.worker_id = None
def setup(self, worker):
self.worker_id = worker.id
with open("test_setup.txt", "w") as f:
f.write(f"current time is {datetime.datetime.now()}")
def teardown(self, worker):
with open("test_teardown.txt", "w") as f:
f.write(f"current time is {datetime.datetime.now()}")
# zip up our folder
output_file = bz2_folder(self.output_path, f"{self.worker_id}.tar.bz2")
# upload results to s3
aws_utils.s3_upload_file(output_file, S3_BUCKET, f"users/{self.user}/{self.name}/{output_file}")
print("aws s3 upload complete")
I seem to get the exception (which looks like my code is attempting to run after the Worker is shutdown ?):
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7fef20c6a3e0>>, <Task finished name='Task-194' coro=<SpecCluster._correct_state_internal() done, defined at /home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/distributed/deploy/spec.py:314> exception=HTTPClientError('An HTTP Client raised an unhandled exception: cannot schedule new futures after shutdown')>)
Traceback (most recent call last):
File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiobotocore/httpsession.py", line 172, in send
resp = await self._session.request(
File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/client.py", line 535, in _request
conn = await self._connector.connect(
File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/connector.py", line 542, in connect
proto = await self._create_connection(req, traces, timeout)
File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/connector.py", line 907, in _create_connection
_, proto = await self._create_direct_connection(req, traces, timeout)
File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/connector.py", line 1154, in _create_direct_connection
hosts = await asyncio.shield(host_resolved)
File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/connector.py", line 880, in _resolve_host
addrs = await self._resolver.resolve(host, port, family=self._family)
File "/home/davico/anaconda3/envs/mft/lib/python3.10/site-packages/aiohttp/resolver.py", line 33, in resolve
infos = await self._loop.getaddrinfo(
File "/home/davico/anaconda3/envs/mft/lib/python3.10/asyncio/base_events.py", line 855, in getaddrinfo
return await self.run_in_executor(
File "/home/davico/anaconda3/envs/mft/lib/python3.10/asyncio/base_events.py", line 813, in run_in_executor
executor.submit(func, *args), loop=self)
File "/home/davico/anaconda3/envs/mft/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
During handling of the above exception, another exception occurred: