Scheduler importing modules meant for the workers

Issue explanation

Hey y’all,

I am using Dask-Jobqueue to deploy Dask on a PBS cluster. Due to various cluster limitations, I run the scheduler in a different, minimal environment than the workers and client. I achieve this by specifying the Python interpreter within the cluster instance:

cluster = await PBSCluster(python="singularity exec image.simg python")

where the workers and client run a singularity container of the same image.simg image.

However, I face an issue with the scheduler (whose environment does not contan scipy) attempting and failing to deserialise tasks meant for the workers:

import numpy as np
from scipy.linalg import eig


def func(x):
    mat = np.random.randint(100, size=(10, 10))
    return eig(mat)

result_ungathered = client.map(func, [0])
client.gather(result_ungathered)

Traceback

---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/scheduler.py:4578, in update_graph()

File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/protocol/serialize.py:434, in deserialize()

File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/protocol/serialize.py:100, in pickle_loads()

File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/protocol/pickle.py:96, in loads()

ModuleNotFoundError: No module named 'scipy'

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
Cell In[9], line 2
      1 result_ungathered = client.map(func, x)
----> 2 client.gather(result_ungathered)

File /opt/conda/lib/python3.11/site-packages/distributed/client.py:2393, in Client.gather(self, futures, errors, direct, asynchronous)
   2390     local_worker = None
   2392 with shorten_traceback():
-> 2393     return self.sync(
   2394         self._gather,
   2395         futures,
   2396         errors=errors,
   2397         direct=direct,
   2398         local_worker=local_worker,
   2399         asynchronous=asynchronous,
   2400     )

File /opt/conda/lib/python3.11/site-packages/distributed/client.py:2253, in Client._gather(self, futures, errors, direct, local_worker)
   2251         exc = CancelledError(key)
   2252     else:
-> 2253         raise exception.with_traceback(traceback)
   2254     raise exc
   2255 if errors == "skip":

RuntimeError: Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments

where client is an instance of the scheduler client.

Additional comments

Because the scheduler is on a different, minimal environment that does not have scipy, it seems to try to import scipy after deserialising the task. However, according to the distributed protocol — that should not happen. After doing some digging, I narrowed down the issue to the following line of code that deserializes graph_frames object.

Questions

  • Why does deserialising graph_frames attempt to import the modules meant for the workers?
  • Is there some workaround which prevents the scheduler from trying to import modules meant for the workers?

Hi @Kostusas,

Does it change something if you do the imports inside the function?

def func(x):
    import numpy as np
    from scipy.linalg import eig
    mat = np.random.randint(100, size=(10, 10))
    return eig(mat)

I’m not sure if this is still valid.

Indeed, we know that importing inside a function is a workaround.

The code that deserializes the frames indeed suggests that the protocol description isn’t valid anymore. However this seems to be problematic because it means either of the two options, both suboptimal in my opinion:

  • Using a scheduler with a minimal environment is not considered a supported use case anymore
  • The code for use with distributed needs to be rewritten to be dask-compatible by inlining all imports, against standard Python practice

Is my understanding correct?

I’ll leave others with better knowledge and understanding on Dask Scheduler mechanisms answer. cc @fjetter @crusaderky.

This is a conscious, somewhat recent (https://github.com/dask/distributed/pull/7564) design choice. Everything is unpickled on the scheduler; you can’t use a minimal environment on the scheduler VM.

The recommendation is to have the exact same environment on the client, scheduler, and all workers.

1 Like

Because the scheduler is on a different, minimal environment that does not have scipy , it seems to try to import scipy after deserialising the task. However, according to the distributed protocol — that should not happen. After doing some digging, I narrowed down the issue to the following line of code that deserializes graph_frames object.

This paragraph is obsolete. Thanks for spotting it!
(remedial work at Document unpickle on scheduler changes · Issue #8304 · dask/distributed · GitHub )

1 Like