Unable to decorate or import functions: pickle.loads raises AttributeError or ModuleNotFoundError

I’ve found that I’m not able to invoke decorated or imported functions when submitting tasks to run on a KubeCluster.

Here are the files required to reproduce:

  1. Create a pod in your k8s cluster with debug-shell.yaml.
  2. Copy debug-scheduler.yaml, debug-worker.yaml, debug.py, and operation.py onto the pod.
  3. Run python debug.py on the pod.

You’ll see something like:

Traceback (most recent call last):
  File "debug.py", line 29, in <module>
    print(x.result())
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 277, in result
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'operation'

Alternatively, if you use the lru_cache decorated version, you’ll see a stacktrace like this:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 277, in result
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x)
AttributeError: Can't get attribute 'expensive_operation' on <module '__mp_main__' from '/opt/conda/bin/dask-worker'>

I assume there is a way to be able to use multiple modules in Dask distributed tasks? And to use stdlib decorators? Feels like I’m missing something basic!

Dask tries to serialize with pickle.dumps.
If that fails, it falls back to cloudpickle.dumps [GitHub - cloudpipe/cloudpickle: Extended pickling support for Python objects].

All functions that need to run on a worker must be importable on all workers. What you typically do is add them to the worker’s docker image in production, and use a shared nfs filesystem in development.

There are a few notable exceptions:

  1. any function defined on your local jupyter notebook will be serialized whole and sent to the workers
  2. anything that pickle fails to serialize but cloudpickle doesn’t - namely, objects in a local scope - will also be sent whole.
  3. anything in cloudpickle’s pickle_by_value list will be serialized directly with cloudpickle: GitHub - cloudpipe/cloudpickle: Extended pickling support for Python objects