Dask hangs after install plugin

I was experimenting with an apt install plugin (like the pipinstall plugin), but after the installation, dask seems to hang and not execute the job. Any pointers?

class AptInstall(PackageInstall):
    INSTALLER = "apt"

    def __init__(self, packages: list[str], restart: bool = False):
        super().__init__(packages, restart)

    def install(self) -> None:
        import sys
        import subprocess
        import logging

        proc = subprocess.Popen(
            "/bin/bash",
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        commands = f"""
        sudo apt update
        sudo apt install -y {" ".join(self.packages)}
        """
        _, stderr = proc.communicate(commands.encode("utf-8"))
        returncode = proc.wait()
        if returncode != 0:
            msg = f"sudo apt install failed with '{stderr.decode().strip()}'"
            logger = logging.getLogger(__name__)
            logger.error(msg)
            raise RuntimeError(msg)
        else:
            import log

            log.info(f"Successfully installed {' '.join(self.packages)}")

The log looks something like this, and then hangs:

15:39:42 distributed.diagnostics.plugin:354 208435 The following packages have already been installed: ['protobuf-compiler']
15:39:42 distributed.scheduler:5350 208298 Receive client connection: Client-worker-ae33874e-114d-11ee-ae26-dad3d397d1d2

Thanks!

Hi @yifanwu,

I just tried your plugin, and did not experienced any hanging. Could you detail a bit what you are doing after registering the plugin?

I executed the following code:

from distributed.diagnostics.plugin import PackageInstall
from distributed import Client

class AptInstall(PackageInstall):
    INSTALLER = "apt"

    def __init__(self, packages: list[str], restart: bool = False):
        super().__init__(packages, restart)

    def install(self) -> None:
        import sys
        import subprocess
        import logging
        #logger = logging.getLogger(__name__)
        logger = logging.getLogger("distributed.worker.AptInstall")

        proc = subprocess.Popen(
            "/bin/bash",
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        commands = f"""
        sudo apt update
        sudo apt install -y {" ".join(self.packages)}
        """
        stdout, stderr = proc.communicate(commands.encode("utf-8"))
        returncode = proc.wait()
        logger.info(f"apt install output:  {stdout}")
        if returncode != 0:
            msg = f"sudo apt install failed with '{stderr.decode().strip()}'"
            logger.error(msg)
            raise RuntimeError(msg)
        else:
            logger.info(f"Successfully installed {' '.join(self.packages)}")

client = Client()

plugin = AptInstall(['vim'])
client.register_worker_plugin(plugin)

r = client.submit(lambda x: x+1, 10)
r.result()