Updating packages in a Dask cluster

I read this page, and I still have (sub) questions:

In my use case, most of the custom data-analysis code resides in its own package. The primary workflow for single-processor is git clone library, cd library, emacs library, python -m library.main. I am wondering how to adapt this process to Dask. I was thinking of having all of the packages or (maybe just the one package I am interested in editing) to NFS, replicating that on the workers, and putting that into their $PYTHONPATH.

  1. Will the workers see the updated package, when the client process is restarted? I.e., if I edit a package called x, run a script that {starts a Dask client, imports a delayed function from x, and runs it}, will the workers use the updated copy of x?
  2. Are the consistency guarantees of NFS strong enough such that if a developer updates the source code, it will change on all of the workers?
  3. How do I tell dask-worker to pass -X pycache_prefix=/path/outside/nfs to Python (also, is this even necessary)?
  4. Will using NFS to store Python package source code (not data) cause a performance hit?
  5. This could all be avoided if it were possible to tell “treat x module as unimportable like main; serialize it when it is referenced”. Yes it will take a long time, but this is a package that is constantly changing, so 80% of the times, the package would have to be sent just before runtime anyway. Is there such an option?

The containerization/packaging methods seem like you have to redeploy the cluster (i.e., docker stop dask-worker, docker rm dask-worker, docker run dask-worker:new-image on every node), which makes sense if updating the package set is rare (not my use case). Then client.upload_file and distributed.diagnostics.plugin.PipInstall seem too ad hoc, but maybe I could put a convincing wrapper around it that hides everything?

Any answer, even to a subset of the questions, is much appreciated. If there is a consensus that explaining the operation of NFS clusters is worthwhile, I will add them to this page.

Hi @charmoniumQ, welcome to Dask community!

Theoretically, NFS should work for this. But you might have to restart your Dask cluster every time you change something in your code. Consistency should be alright, but it’s true that NFS is not always good on this at subsecond level. Performance should also be enough, but it might take a few seconds or more at startup time.

But anyway, I would recommand using the UploadDirectory plugin for your use case, see:

or PipInstall as you said if you can commit code changes to a repository every time you do some modifications.

1 Like

Would PipInstall/upload_file require restarting the cluster too?

Yes, at least the Workers!

1 Like

Yes, when change things in your code you will need to restart.

You will also need to make sure when you restart or scale that your workers get things installed. In the past I’ve seen people using a custom nanny plugin to make sure that what you upload with upload directory gets installed. Something like this

class InstallModulePlugin(UploadDirectory):
    '''Use this plugging to upload a directory and install that directory in the workers
    def __init__(self, dir_path, module):
        '''Initializes the plugin
        dir_path: str, path to the directory you want to upload
        module: directory name
        super().__init__(dir_path, update_path=True)
        self.module = module
    async def setup(self, nanny):
        await super().setup(nanny)
        import os
        import subprocess
        path_to_module = os.path.join(nanny.local_directory, self.module)
        subprocess.call(["pip", "install", "-e", path_to_module])

plugin = InstallModulePlugin("path_to_directory", "directory_name")
client.register_worker_plugin(plugin, nanny=True)

What kind of cluster are you running things on? I know this might not help but Coiled Clusters have package sync and will make things easier as part of this process. You atr least won’t need a plugin. You can see how it works here Dask Demo Day - 2023-03-16 - YouTube