I am currently using dask with a flask webapp server. When the user sends a request to the flask app it uses client.scatter()
to put a large dataset sent by the user into shared memory, then client.submit()
with the large dataset future in a Variable
to submit scheduler_job
to the scheduler and then it immediately sends a HTTP response back to the user to say that their long running task has begun and the function returns.
scheduler_job
actually does not perform much work, but it schedules a further 1-6 new tasks depending on the options sent by the user. These further tasks actually contain the expensive computations, though scheduler_job
returns immediately once these have all been submitted. We will call the expensive tasks training_job
If I have understood correctly, scheduler_job
should secede()
at the beginning to make space for other more intensive/important tasks. Secondly, I am running into a few bugs/errors which I have an idea about but do not fully understand what the solution should be.
The first one is that as training_jobs
begin to get submitted, the first one or two will succeed then the rest disappear. I think this is because the client goes out of scope after it has submitted the training_jobs
but we will note this idea down temporarily.
To avoid this I tried using the fire_and_forget
method which says I can ensure a future gets executed atleast once even if the client ‘dies’ so to speak. So instead of submitting each task from scheduler_job
and returning, I submit each task, collect the futures into an array then use fire_and_forget
onto that array at the end of scheduler_job
. After using this method I noticed what started to happen instead was that a small selection of training_jobs
end up permanently locked and never return. I see this stack trace from the dask dashboard on my currently stalled task.
Call Stack: regress-288716a7-7352-44ad-80be-5d13c4ab41ba
Worker: tcp://127.0.0.1:36603
Key: regress-288716a7-7352-44ad-80be-5d13c4ab41ba
File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 966, in _bootstrap self._bootstrap_inner()
File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 1009, in _bootstrap_inner self.run()
File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 946, in run self._target(*self._args, **self._kwargs)
File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/threadpoolexecutor.py", line 57, in _worker task.run()
File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run result = self.fn(*self.args, **self.kwargs)
File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 3076, in apply_function msg = apply_function_simple(function, args, kwargs, time_delay)
File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 3098, in apply_function_simple result = function(*args, **kwargs)
File "/home/user/project/app/jobs/linear_regressions/regress_job.py", line 11, in regress ModelFactory(linear(), params, data_frame).train_and_respond()
File "/home/user/project/app/helpers/model_factory.py", line 23, in train_and_respond training_result = self.__train()
File "/home/user/project/app/helpers/model_factory.py", line 49, in __train trained_model = cross_validate(
File "/home/user/project/app/services/cross_validation_service.py", line 12, in cross_validate cross_validator = model_selection.cross_validate(
File "/home/user/project/.venv/lib/python3.10/site-packages/sklearn/model_selection/_validation.py", line 266, in cross_validate results = parallel(
File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/parallel.py", line 1008, in __call__ n_jobs = self._initialize_backend()
File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/parallel.py", line 775, in _initialize_backend n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self,
File "/home/user/project/app/helpers/observed.py", line 48, in configure return super().configure(n_jobs, parallel, prefer, require, idle_worker_timeout, **memmappingexecutor_args)
File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/_parallel_backends.py", line 506, in configure self._workers = get_memmapping_executor(
File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/executor.py", line 20, in get_memmapping_executor return MemmappingExecutor.get_memmapping_executor(n_jobs, **kwargs)
File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/executor.py", line 52, in get_memmapping_executor _executor, executor_is_reused = super().get_reusable_executor(
File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/externals/loky/reusable_executor.py", line 159, in get_reusable_executor executor.shutdown(wait=True, kill_workers=kill_workers)
File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 1199, in shutdown executor_manager_thread.join()
File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 1089, in join self._wait_for_tstate_lock()
File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 1109, in _wait_for_tstate_lock if lock.acquire(block, timeout):
Within the parameters for scheduler_job
it holds a Variable
containing a previously scattered future of a potentially large dataset. I have learned that when you send futures through a submit they get resolved to their values. I need to send this large dataset from scheduler_job
to training_job
without burdening the scheduler so I keep the future in a Variable
whilst it is passed to scheduler_job
. Instead of calling .get()
then .result()
on the Variable
in each training_job
I call .get()
to bring it down to the Future level in scheduler_job
, then allow it to naturally get resolved in each of training_job
so that I do not need to repeat .get().result()
in each of the training_jobs
which should be more efficient?
Usually this stack trace causes my workers to return the ShutdownExecutionError
but in these cases they just eternally lock. What could be the cause of this issue? My guess is that I am using bad dask practice but I am not sure what else I could be missing…