ModuleNotFoundError with CLI distributed workers in pipenv virtualenv

I am facing a familiar problem where a worker has been submitted a task of some function call but the worker fails to run it because it does not have access to the modules necessary.

My project structure is roughly as this follows

Project/
├─ app/
│  ├─ jobs/
│  │  ├─ neural_net_regressions/
│  │  │  ├─ regress_job.py
│  ├─ helper_code/
│  │  ├─ helper.py
├─ .venv/

(There are empty __init__.py in each subfolder)

Inside the Project dir we run these commands to deploy local workers with the scheduler as follows

FLASK_ENV=development exec pipenv run dask scheduler --host="localhost:8786"
FLASK_ENV=development exec pipenv run dask worker --nworkers=8 --nthreads=1 "localhost:8786"

There is code within regress_job.py that goes something like this:

from app.helper_code.helper import helper_function

def regress_job():
  Client("address").submit(regress)

def regress():
  helper_function()

Yet when regress_job is called we see this error:
ModuleNotFoundError: No module named 'app'

The full stack strace

2023-02-01 12:48:47,560 - distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x05\x95;\x00\x00\x00\x00\x00\x00\x00\x8c+app.jobs.neural_net_regressions.regress_job\x94\x8c\x07regress\x94\x93\x94.'
Traceback (most recent call last):
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 2885, in loads_function
    result = cache_loads[bytes_object]
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/collections.py", line 24, in __getitem__
    value = super().__getitem__(key)
  File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/collections/__init__.py", line 1102, in __getitem__
    raise KeyError(key)
KeyError: b'\x80\x05\x95;\x00\x00\x00\x00\x00\x00\x00\x8c+app.jobs.neural_net_regressions.regress_job\x94\x8c\x07regress\x94\x93\x94.'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'app'
2023-02-01 12:48:47,561 - distributed.worker - ERROR - Could not deserialize task regress-4027ca92571b3c202720a04a6d5ae1f9
Traceback (most recent call last):
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 2885, in loads_function
    result = cache_loads[bytes_object]
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/collections.py", line 24, in __getitem__
    value = super().__getitem__(key)
  File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/collections/__init__.py", line 1102, in __getitem__
    raise KeyError(key)
KeyError: b'\x80\x05\x95;\x00\x00\x00\x00\x00\x00\x00\x8c+app.jobs.neural_net_regressions.regress_job\x94\x8c\x07regress\x94\x93\x94.'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 2232, in execute
    function, args, kwargs = await self._maybe_deserialize_task(ts)
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 2204, in _maybe_deserialize_task
    function, args, kwargs = _deserialize(*ts.run_spec)
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 2896, in _deserialize
    function = loads_function(function)
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 2887, in loads_function
    result = pickle.loads(bytes_object)
  File "/home/user/Project/.venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'app'

What I have tried:

  • Ensuring the working directory of all the workers is the Project folder
  • Making sure the same pipenv virtual env deploys the worker (see above)
  • Making sure the app folder has init.py so python recognises it as a module
  • Adding the --preload=“app” argument to the cli (still fails to find app module)
  • Adding the project to the python default sys path by running “sys.path.append(‘project_root’)” as a preload argument

Hi @Niphemy, thanks for the question.

Working with your own module on a shared file system is always something difficult to get right in the first place.

You’ve got some documentation here about all that.

In your case, I would say that somehow the Workers don’t get the same PYTHONPATH as your original process. Could you try to set it up?

Some other things to try:

  • Use directly a LocalCluster object in your main code, does it works then?
  • Try to look at worker env, using code like client.run(lambda: sys.path)
  • Just add a some packaging tooling (e.g. setup.py) in your module, and install it in your venv
  • Try to run a function that only imports your module on the Workers

In the end, if you’re Worker Python environment is the same that your main code, it should work, but that’s not always easy to get, especially if your distributed cluster is launched externally (FLASK_ENV?).

@guillaumeeb Thanks for your help, deploying the cluster from the pipenv executing this python script sitting in the project was successful

if __name__ == "__main__":
  LocalCluster(...)

We now execute this script at launch using the commandline but we are yet to get around the issue of the program immediately exiting and preventing the cluster from fully launching without adding something like sleep(99999) so that the cluster stays alive.

I’m glad you found a solution, but this looks more like a workaround to me.

I think you should either use LocalCluster in the same script where you call the client.submit, either find a better way to launch a Dask Cluster in another process, e.g. finding the good setup for running the dask worker cli.

Do you have errors when you use the --preload=“app” argument?

I think this is definitely a PYTHONPATH problem, see python 3.x - dask worker cannot import module - Stack Overflow or python - Start dask worker with local directory - Stack Overflow.