Issue explanation
Hey y’all,
I am using Dask-Jobqueue to deploy Dask on a PBS cluster. Due to various cluster limitations, I run the scheduler in a different, minimal environment than the workers and client. I achieve this by specifying the Python interpreter within the cluster instance:
cluster = await PBSCluster(python="singularity exec image.simg python")
where the workers and client run a singularity container of the same image.simg
image.
However, I face an issue with the scheduler (whose environment does not contan scipy
) attempting and failing to deserialise tasks meant for the workers:
import numpy as np
from scipy.linalg import eig
def func(x):
mat = np.random.randint(100, size=(10, 10))
return eig(mat)
result_ungathered = client.map(func, [0])
client.gather(result_ungathered)
Traceback
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/scheduler.py:4578, in update_graph()
File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/protocol/serialize.py:434, in deserialize()
File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/protocol/serialize.py:100, in pickle_loads()
File /home/kostas/mambaforge/envs/quantumtinkerer/lib/python3.11/site-packages/distributed/protocol/pickle.py:96, in loads()
ModuleNotFoundError: No module named 'scipy'
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
Cell In[9], line 2
1 result_ungathered = client.map(func, x)
----> 2 client.gather(result_ungathered)
File /opt/conda/lib/python3.11/site-packages/distributed/client.py:2393, in Client.gather(self, futures, errors, direct, asynchronous)
2390 local_worker = None
2392 with shorten_traceback():
-> 2393 return self.sync(
2394 self._gather,
2395 futures,
2396 errors=errors,
2397 direct=direct,
2398 local_worker=local_worker,
2399 asynchronous=asynchronous,
2400 )
File /opt/conda/lib/python3.11/site-packages/distributed/client.py:2253, in Client._gather(self, futures, errors, direct, local_worker)
2251 exc = CancelledError(key)
2252 else:
-> 2253 raise exception.with_traceback(traceback)
2254 raise exc
2255 if errors == "skip":
RuntimeError: Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
where client
is an instance of the scheduler client.
Additional comments
Because the scheduler is on a different, minimal environment that does not have scipy
, it seems to try to import scipy
after deserialising the task. However, according to the distributed protocol — that should not happen. After doing some digging, I narrowed down the issue to the following line of code that deserializes graph_frames
object.
Questions
- Why does deserialising
graph_frames
attempt to import the modules meant for the workers? - Is there some workaround which prevents the scheduler from trying to import modules meant for the workers?