I am currently using dask with a flask webapp server. When the user sends a request to the flask app it uses
client.scatter() to put a large dataset sent by the user into shared memory, then
client.submit() with the large dataset future in a
Variable to submit
scheduler_job to the scheduler and then it immediately sends a HTTP response back to the user to say that their long running task has begun and the function returns.
scheduler_job actually does not perform much work, but it schedules a further 1-6 new tasks depending on the options sent by the user. These further tasks actually contain the expensive computations, though
scheduler_job returns immediately once these have all been submitted. We will call the expensive tasks
If I have understood correctly,
secede() at the beginning to make space for other more intensive/important tasks. Secondly, I am running into a few bugs/errors which I have an idea about but do not fully understand what the solution should be.
The first one is that as
training_jobs begin to get submitted, the first one or two will succeed then the rest disappear. I think this is because the client goes out of scope after it has submitted the
training_jobs but we will note this idea down temporarily.
To avoid this I tried using the
fire_and_forget method which says I can ensure a future gets executed atleast once even if the client ‘dies’ so to speak. So instead of submitting each task from
scheduler_job and returning, I submit each task, collect the futures into an array then use
fire_and_forget onto that array at the end of
scheduler_job. After using this method I noticed what started to happen instead was that a small selection of
training_jobs end up permanently locked and never return. I see this stack trace from the dask dashboard on my currently stalled task.
Call Stack: regress-288716a7-7352-44ad-80be-5d13c4ab41ba Worker: tcp://127.0.0.1:36603 Key: regress-288716a7-7352-44ad-80be-5d13c4ab41ba File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 966, in _bootstrap self._bootstrap_inner() File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 1009, in _bootstrap_inner self.run() File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 946, in run self._target(*self._args, **self._kwargs) File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/threadpoolexecutor.py", line 57, in _worker task.run() File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run result = self.fn(*self.args, **self.kwargs) File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 3076, in apply_function msg = apply_function_simple(function, args, kwargs, time_delay) File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 3098, in apply_function_simple result = function(*args, **kwargs) File "/home/user/project/app/jobs/linear_regressions/regress_job.py", line 11, in regress ModelFactory(linear(), params, data_frame).train_and_respond() File "/home/user/project/app/helpers/model_factory.py", line 23, in train_and_respond training_result = self.__train() File "/home/user/project/app/helpers/model_factory.py", line 49, in __train trained_model = cross_validate( File "/home/user/project/app/services/cross_validation_service.py", line 12, in cross_validate cross_validator = model_selection.cross_validate( File "/home/user/project/.venv/lib/python3.10/site-packages/sklearn/model_selection/_validation.py", line 266, in cross_validate results = parallel( File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/parallel.py", line 1008, in __call__ n_jobs = self._initialize_backend() File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/parallel.py", line 775, in _initialize_backend n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self, File "/home/user/project/app/helpers/observed.py", line 48, in configure return super().configure(n_jobs, parallel, prefer, require, idle_worker_timeout, **memmappingexecutor_args) File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/_parallel_backends.py", line 506, in configure self._workers = get_memmapping_executor( File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/executor.py", line 20, in get_memmapping_executor return MemmappingExecutor.get_memmapping_executor(n_jobs, **kwargs) File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/executor.py", line 52, in get_memmapping_executor _executor, executor_is_reused = super().get_reusable_executor( File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/externals/loky/reusable_executor.py", line 159, in get_reusable_executor executor.shutdown(wait=True, kill_workers=kill_workers) File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 1199, in shutdown executor_manager_thread.join() File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 1089, in join self._wait_for_tstate_lock() File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 1109, in _wait_for_tstate_lock if lock.acquire(block, timeout):
Within the parameters for
scheduler_job it holds a
Variable containing a previously scattered future of a potentially large dataset. I have learned that when you send futures through a submit they get resolved to their values. I need to send this large dataset from
training_job without burdening the scheduler so I keep the future in a
Variable whilst it is passed to
scheduler_job. Instead of calling
.result() on the
Variable in each
training_job I call .
get() to bring it down to the Future level in
scheduler_job, then allow it to naturally get resolved in each of
training_job so that I do not need to repeat
.get().result() in each of the
training_jobs which should be more efficient?
Usually this stack trace causes my workers to return the
ShutdownExecutionError but in these cases they just eternally lock. What could be the cause of this issue? My guess is that I am using bad dask practice but I am not sure what else I could be missing…