How do I modularize functions that work with dask?

I’m trying to modularize my functions that use Dask, but I keep encountering the error "No module named 'my_module'". I can’t import any local module that is related to Dask, and currently, everything I do with Dask has to be in a single .py file. How do I properly modularize functions that use Dask?

Here’s an example of my setup:

  • main.py: Contains functions that utilize Dask.
  • my_module.py: A local module I created, which also contains functions using Dask.

I’m attempting to import the my_module module in main.py like this:

from my_module import *

class CustomModulePlugin(WorkerPlugin):
    def setup(self, worker):
        import my_module

    def start(self, worker):
        logging.warning("Plugin started and added to the worker.")

client = Client('tcp://127.0.0.1:8786')
client.register_worker_plugin(CustomModulePlugin())

But I keep getting the following error:

No module named ‘my_module’

I tried client.register_plugin(UploadDirectory(“src/workers/brasil_setup.py”), nanny=True) but i found another error that said i used a unexpected argument “nanny=True”

I have tested the solution with upload_file and it did not work in my case. I work with a system of backup workers that are automatically created when the current VMs go down. Because of this, I cannot use upload_file , as I need something that works for both the workers that have already been created and those that will be created in the future. Someone have any suggestions on how to dynamically synchronize the environment to cover future workers as well?

Im using the last dask version published 2 months ago

Hi @2dask-crls, welcome to Dask community!

First, can you explain with more details how your Worker are started, on which nodes/infra, in which Python environment (different from the Client ?)?

Using distributed means having all component using equivalent environments.

Plugins are a good solution to this.

Note plugins work like that (see Plugins — Dask.distributed 2024.8.2+7.gb28822b documentation)

Run when the plugin is attached to a worker. This happens when the plugin is registered and attached to existing workers, or when a worker is created after the plugin has been registered.

So both UploadDirectory and UploadFile should work. I’m a bit surprised about the error, could you print the stack trace?

We are using Google Cloud to run a cluster of VMs for parallel video processing with FFmpeg. Each VM contains a container running Dask. The worker directory of the container is an NFS file system shared among all the Dask workers and the scheduler.

Inside the worker i get this:

2024-09-11 18:15:39,018 - root - WARNING - Plugin registered successfully.
/data/superresolution/src/workers/superresolution_dask.py:795: DeprecationWarning: `Client.register_worker_plugin` has been deprecated; please use `Client.register_plugin` instead
  client.register_worker_plugin(CustomModulePlugin())
2024-09-11 18:15:39,020 - root - WARNING - Splitting video into pieces...
--- Logging error ---
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/logging/__init__.py", line 1100, in emit
    msg = self.format(record)
  File "/usr/local/lib/python3.10/logging/__init__.py", line 943, in format
    return fmt.format(record)
  File "/usr/local/lib/python3.10/logging/__init__.py", line 678, in format
    record.message = record.getMessage()
  File "/usr/local/lib/python3.10/logging/__init__.py", line 368, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
  File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 83, in _worker
    work_item.run()
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 409, in execute_callback
    fn(fut)
  File "/data/superresolution/src/workers/superresolution_dask.py", line 288, in check_future
    logging.error("Error logging command:", f.exception())
Message: 'Error logging command:'
Arguments: (RuntimeError('Error during deserialization of the task graph. This frequently\noccurs if the Scheduler and Client have different environments.\nFor more information, see\nhttps://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments\n'),)

Inside the scheduler i get the “No module named ‘my_module’”

startup-script: Dask client initialized.
startup-script: Executing command: /usr/bin/ffmpeg7.0.1/bin/ffmpeg -i SD/pillarbox/video.mxf -ss 0.0 -to 60.027 -c copy -map 0:v -map 0:a -y temp_segments/segment_000.mxf
startup-script:     graph = deserialize(graph_header, graph_frames).data
startup-script:   File \"/usr/local/lib/python3.10/site-packages/distributed/protocol/serialize.py\", line 449, in deserialize
startup-script:     return loads(header, frames)
startup-script:   File \"/usr/local/lib/python3.10/site-packages/distributed/protocol/serialize.py\", line 111, in pickle_loads
startup-script:     return pickle.loads(pik, buffers=buffers)
startup-script:   File \"/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 96, in loads
startup-script:     return pickle.loads(x)
startup-script: ModuleNotFoundError: No module named 'my_module'

Given this setup, any insights on what might be causing this?

Hi see you posted the same question on Stackoverflow and get a nice answer by @martindurant, did you try his suggestions?

How is Dask installed in the environment? Can you mount directory with your module? Your module is only on client side? Could you deploy it in the containers?

Yes, we tested the suggestions, but unfortunately, they didn’t solve the issue.

How is Dask installed in the environment?
Dask is installed via pip inside the container.

Can you mount a directory with your module?
We arent building the containers. We are executing the container with a volume attached to the shared NFS directory

Is your module only on the client side?
No, the module is used by both the scheduler and the worker.

Could you deploy it in the containers?
We are using containers on virtual machines within Google Cloud, and the machines are managed by our code.

When a local module isn’t involved in Dask operations, everything works fine. The issue only arises when we try to modularize a function that utilizes Dask.

Can’t you also install your module inside the container? Or is this module in constant evolution?

Then you are mounting a directory, can you put your module in this directory?

I don’t think this answers the question. You could do that and deploy the code in the containers.

The question was from where is the module accessible?

Using plugin or publishing your code as a package should work anyway. And is compatible with your needs.

We suspect that the issue is related to the serialization of some functions with pickle. Although the error message points to a missing module, the logs suggest that the actual problem is a serialization failure.

Additionally, using upload_file worked, but we needed to provide the full path of the file from the container’s perspective (e.g., “/data/project/src/workers/my_module.py”). This raises a few questions:

Why is it necessary to upload the file if it’s already present in the environment?
Will upload_file work for any worker that is created in the future?
From what we understand, upload_file is more useful in a development environment where the code needs to be updated frequently. In our case, the environment is fixed and should not change often.

That’s probably link, it can’t be unserialized because the module is unknown on Worker side.

In which environment is it present? Client? Scheduler? Worker?

Yes, it should.

Totally. If the envionment is fixed, just install it in the docker image or make a wheel package.