might also be executed in the process-pool (based on the placement of the UDFs in the task-stream and the downstream error [1]) . Would anyone here know the mechanism to confirm if this hypothesis is true ? Also if the hypothesis is true would it be possible to ensure that the stages which don’t process pool are executed in the thread-pool ?
[1]: loky.process_executor.TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker.
I was able to confirm the other stages weren’t being executed in the loky.process_executor by continuously monitoring the CPU usage which seems to shift to the main process and not the loky processes e.g. python -m loky.backend.popen_loky_posix.
I am curious if it would be possible to make the unexpected termination of these processes not affect the main dask process.
Ok, so this solves your first question and the title of this topics I guess. Nice job verifying this!
Could you be a bit clearer, you want that termination/error occuring in the loky ProcessPoolExecutor doesn’t affect the Dask Worker process? Cloud you post some code snippets on how you set up this executor and a simple example of how you are using it?
loky.process_executor.TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker.
Looking at this error message I initially thought the TerminatedWorkerError was the root cause ultimately leading to the cluster shutdown and I wanted to ensure that the worker nodes were more robust to ProcessPool failures.
However over the last couple of days I noticed that this error shows up during cluster shutdown irrespective of the actual cause of the failure in the pipeline e.g. UDF errors, hardware issues, networking issues. So I think this might just be a non-graceful shutdown issue in the ProcessPool plugin.
I have created the ProcessPool according to @mrocklin 's example like this
import dask.distributed
from loky import ProcessPoolExecutor
from dask.distributed import WorkerPlugin
class AddProcessPool(WorkerPlugin):
def setup(self, worker):
super(AddProcessPool, self).setup(worker)
executor = ProcessPoolExecutor(max_workers=worker.state.nthreads)
worker.executors["processes"] = executor
def add_process_pool(client: dask.distributed.Client):
client.register_worker_plugin(AddProcessPool())
client.run(lambda dask_worker: str(dask_worker.executors))
Yes I am almost sure this is shutdown issue. I have seen enough replication of the same stack trace across successful and unsuccessful jobs and across unsuccessful jobs which failed due to different errors. I am currently looking at mechanisms to handle this in the workerplugin. Will post the update once I figure out a reasonable plugin.