Understanding Client.submit and fire_and_forget

I am currently using dask with a flask webapp server. When the user sends a request to the flask app it uses client.scatter() to put a large dataset sent by the user into shared memory, then client.submit() with the large dataset future in a Variable to submit scheduler_job to the scheduler and then it immediately sends a HTTP response back to the user to say that their long running task has begun and the function returns.

scheduler_job actually does not perform much work, but it schedules a further 1-6 new tasks depending on the options sent by the user. These further tasks actually contain the expensive computations, though scheduler_job returns immediately once these have all been submitted. We will call the expensive tasks training_job

If I have understood correctly, scheduler_job should secede() at the beginning to make space for other more intensive/important tasks. Secondly, I am running into a few bugs/errors which I have an idea about but do not fully understand what the solution should be.

The first one is that as training_jobs begin to get submitted, the first one or two will succeed then the rest disappear. I think this is because the client goes out of scope after it has submitted the training_jobs but we will note this idea down temporarily.

To avoid this I tried using the fire_and_forget method which says I can ensure a future gets executed atleast once even if the client ‘dies’ so to speak. So instead of submitting each task from scheduler_job and returning, I submit each task, collect the futures into an array then use fire_and_forget onto that array at the end of scheduler_job. After using this method I noticed what started to happen instead was that a small selection of training_jobs end up permanently locked and never return. I see this stack trace from the dask dashboard on my currently stalled task.


Call Stack: regress-288716a7-7352-44ad-80be-5d13c4ab41ba

Worker: tcp://127.0.0.1:36603

Key: regress-288716a7-7352-44ad-80be-5d13c4ab41ba

File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 966, in _bootstrap self._bootstrap_inner()

File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 1009, in _bootstrap_inner self.run()

File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 946, in run self._target(*self._args, **self._kwargs)

File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/threadpoolexecutor.py", line 57, in _worker task.run()

File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run result = self.fn(*self.args, **self.kwargs)

File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 3076, in apply_function msg = apply_function_simple(function, args, kwargs, time_delay)

File "/home/user/project/.venv/lib/python3.10/site-packages/distributed/worker.py", line 3098, in apply_function_simple result = function(*args, **kwargs)

File "/home/user/project/app/jobs/linear_regressions/regress_job.py", line 11, in regress ModelFactory(linear(), params, data_frame).train_and_respond()

File "/home/user/project/app/helpers/model_factory.py", line 23, in train_and_respond training_result = self.__train()

File "/home/user/project/app/helpers/model_factory.py", line 49, in __train trained_model = cross_validate(

File "/home/user/project/app/services/cross_validation_service.py", line 12, in cross_validate cross_validator = model_selection.cross_validate(

File "/home/user/project/.venv/lib/python3.10/site-packages/sklearn/model_selection/_validation.py", line 266, in cross_validate results = parallel(

File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/parallel.py", line 1008, in __call__ n_jobs = self._initialize_backend()

File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/parallel.py", line 775, in _initialize_backend n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self,

File "/home/user/project/app/helpers/observed.py", line 48, in configure return super().configure(n_jobs, parallel, prefer, require, idle_worker_timeout, **memmappingexecutor_args)

File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/_parallel_backends.py", line 506, in configure self._workers = get_memmapping_executor(

File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/executor.py", line 20, in get_memmapping_executor return MemmappingExecutor.get_memmapping_executor(n_jobs, **kwargs)

File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/executor.py", line 52, in get_memmapping_executor _executor, executor_is_reused = super().get_reusable_executor(

File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/externals/loky/reusable_executor.py", line 159, in get_reusable_executor executor.shutdown(wait=True, kill_workers=kill_workers)

File "/home/user/project/.venv/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 1199, in shutdown executor_manager_thread.join()

File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 1089, in join self._wait_for_tstate_lock()

File "/home/user/.asdf/installs/python/3.10.2/lib/python3.10/threading.py", line 1109, in _wait_for_tstate_lock if lock.acquire(block, timeout):

Within the parameters for scheduler_job it holds a Variable containing a previously scattered future of a potentially large dataset. I have learned that when you send futures through a submit they get resolved to their values. I need to send this large dataset from scheduler_job to training_job without burdening the scheduler so I keep the future in a Variable whilst it is passed to scheduler_job. Instead of calling .get() then .result() on the Variable in each training_job I call .get() to bring it down to the Future level in scheduler_job, then allow it to naturally get resolved in each of training_job so that I do not need to repeat .get().result() in each of the training_jobs which should be more efficient?

Usually this stack trace causes my workers to return the ShutdownExecutionError but in these cases they just eternally lock. What could be the cause of this issue? My guess is that I am using bad dask practice but I am not sure what else I could be missing…

Hi @Niphemy,

Okay, this is all a bit hard to follow, maybe it would be better with some code snippet of what you are doing, or even better: a small reproducible example using a LocalCluster. Anyway, I’ve got some questions/remarks to better understand:

Why do you use a Dask task in order to do this? Couldn’t you be doing it directly within the Flask callback? This would avoid the need to secede() and the following problems also probably.

Well, if scheduler_job returns immediately and do not wait for results, there is no need to secede, however, you’ll run into your next problem if you don’t use fire_and_forget.

If you don’t keep your Client or a reference to future objects, the tasks will be cancelled. As noted in Futures — Dask documentation

Dask will stop work that doesn’t have any active futures

Well, this seems like a correct solution, I see no reason why training_jobs wouldn’t be executed, and the stack trace doesn’t really help. Did you whatch the Dashboard to have some idea of what was happening?

Well, at least that should work, even if it is a bit complex and I never tested this.

Anyway, you should try to simplify things and avoid using secede without rejoin.

Thanks for getting back to me @guillaumeeb. Apologies for the complicated OP I see that I made it somewhat obscure

  1. You are probably right, The scheduling is now done in the flask callback which makes it less complex.
  2. fire_and_forget has fixed the dropping of tasks issue.
  3. Somehow I seemed to fix the issue caused in the traceback which TLDR; was to related to the use of n_jobs=-1 and verbose=100 with the LokyBackend from the cross_validation and GridSearch methods from sklearn on a less related note to dask… Although I observed this to only occur when the methods ran on the cluster. With verbose set to 50 everything seemed to clear up.

Overall everything seems to be resolved now fire_and_forget carrying out the effect I wanted.

1 Like