I am using Dask-Jobqueue to deploy Dask on a PBS cluster. Due to various cluster limitations, I run the scheduler in a different, minimal environment than the workers and client. I achieve this by specifying the Python interpreter within the cluster instance:
cluster = await PBSCluster(python="singularity exec image.simg python")
where the workers and client run a singularity container of the same
However, I face an issue with the scheduler (whose environment does not contan
scipy) attempting and failing to deserialise tasks meant for the workers:
import numpy as np from scipy.linalg import eig def func(x): mat = np.random.randint(100, size=(10, 10)) return eig(mat) result_ungathered = client.map(func, ) client.gather(result_ungathered)
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/scheduler.py:4578, in update_graph() File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/protocol/serialize.py:434, in deserialize() File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/protocol/serialize.py:100, in pickle_loads() File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/protocol/pickle.py:96, in loads() ModuleNotFoundError: No module named 'scipy' The above exception was the direct cause of the following exception: RuntimeError Traceback (most recent call last) Cell In, line 2 1 result_ungathered = client.map(func, x) ----> 2 client.gather(result_ungathered) File /opt/conda/lib/python3.11/site-packages/distributed/client.py:2393, in Client.gather(self, futures, errors, direct, asynchronous) 2390 local_worker = None 2392 with shorten_traceback(): -> 2393 return self.sync( 2394 self._gather, 2395 futures, 2396 errors=errors, 2397 direct=direct, 2398 local_worker=local_worker, 2399 asynchronous=asynchronous, 2400 ) File /opt/conda/lib/python3.11/site-packages/distributed/client.py:2253, in Client._gather(self, futures, errors, direct, local_worker) 2251 exc = CancelledError(key) 2252 else: -> 2253 raise exception.with_traceback(traceback) 2254 raise exc 2255 if errors == "skip": RuntimeError: Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
client is an instance of the scheduler client.
Because the scheduler is on a different, minimal environment that does not have
scipy, it seems to try to import
scipy after deserialising the task. However, according to the distributed protocol — that should not happen. After doing some digging, I narrowed down the issue to the following line of code that deserializes
- Why does deserialising
graph_framesattempt to import the modules meant for the workers?
- Is there some workaround which prevents the scheduler from trying to import modules meant for the workers?