ThreadPool->ProcessPool for 1 UDF switch leads to execution of subsequent UDFs on ProcessPool

I used the changes suggested in Advanced Dask Customization | Customizing Dask Workers | Matt Rocklin - YouTube to move a GIL holding UDF to a ProcessPoolExecutor. However I noticed that the subsequent UDFs which are not annotated with

with dask.annotate(executor="processes"):

might also be executed in the process-pool (based on the placement of the UDFs in the task-stream and the downstream error [1]) . Would anyone here know the mechanism to confirm if this hypothesis is true ? Also if the hypothesis is true would it be possible to ensure that the stages which don’t process pool are executed in the thread-pool ?

[1]:
loky.process_executor.TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker.

I was able to confirm the other stages weren’t being executed in the loky.process_executor by continuously monitoring the CPU usage which seems to shift to the main process and not the loky processes e.g. python -m loky.backend.popen_loky_posix.

I am curious if it would be possible to make the unexpected termination of these processes not affect the main dask process.

Hi @vij, welcome to Dask Discourse forum!

Ok, so this solves your first question and the title of this topics I guess. Nice job verifying this!

Could you be a bit clearer, you want that termination/error occuring in the loky ProcessPoolExecutor doesn’t affect the Dask Worker process? Cloud you post some code snippets on how you set up this executor and a simple example of how you are using it?

Thanks for the response @guillaumeeb .

loky.process_executor.TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker.

Looking at this error message I initially thought the TerminatedWorkerError was the root cause ultimately leading to the cluster shutdown and I wanted to ensure that the worker nodes were more robust to ProcessPool failures.

However over the last couple of days I noticed that this error shows up during cluster shutdown irrespective of the actual cause of the failure in the pipeline e.g. UDF errors, hardware issues, networking issues. So I think this might just be a non-graceful shutdown issue in the ProcessPool plugin.
I have created the ProcessPool according to @mrocklin 's example like this

import dask.distributed
from loky import ProcessPoolExecutor
from dask.distributed import WorkerPlugin


class AddProcessPool(WorkerPlugin):
    def setup(self, worker):
        super(AddProcessPool, self).setup(worker)
        executor = ProcessPoolExecutor(max_workers=worker.state.nthreads)
        worker.executors["processes"] = executor


def add_process_pool(client: dask.distributed.Client):
    client.register_worker_plugin(AddProcessPool())
    client.run(lambda dask_worker: str(dask_worker.executors))

Hi @vij, were you able to confirm that? If so in order to avoid this error, maybe the WorkerPlugin should be improved.

Yes I am almost sure this is shutdown issue. I have seen enough replication of the same stack trace across successful and unsuccessful jobs and across unsuccessful jobs which failed due to different errors. I am currently looking at mechanisms to handle this in the workerplugin. Will post the update once I figure out a reasonable plugin.

1 Like