Local Threaded Scheduler instantiates multiple ThreadPoolExecutors

This PR from 2016 changed the behavior of the local threaded scheduler to instantiate a new ThreadPoolExecutor per thread ID.

Now, when a Dask graph is invoked from a function that is dispatched to a ThreadPoolExecutor, it may trigger the creation of up to N ThreadPoolExecutors where N is the worker count of the original.

A probably common idiomatic pattern when using Dask in asyncio code is to offload it to the event loops thread pool executor via asyncio.to_thread as to not block the event loop. However, this puts us into the situation described above.

When I run pstree on a long-lived server, I see hundreds of threads due to Dask keeping this global map of ThreadPoolExecutors.

As far as I can tell, the ThreadPoolExecutor implementation is now thread-safe (patch). It acquires a lock during both submit() and shutdown(). Is there a reason to still instantiate a different ThreadPoolExecutor per thread ID?

Having this multiplicative factor of running threads is a bit problematic in some environments like Kubernetes where cgroup throttling can occur due to thread oversubscription. It also changes the dynamic on the memory footprint of the application since the total number of active workers can be a multiple of what dask.config.set(num_workers)says it is.

Hi @y4n9squared, welcome to Dask community!

I would recommend to create an issue directly in dask github issue tracker for this, it’s way below my knowledge.

But it’s also indicated in the PR that these ThreadPollExecutors are cleaned at some points, doesn’t they?

The ThreadPoolExecutors created by Dask have an atexit handler registered so they are cleaned up on process termination. However, in my case, I am running a web server so these threads just idle.

I’ll follow-up with a GitHub issue. Thanks.

EDIT: Local Threaded Scheduler instantiates multiple ThreadPoolExecutors · Issue #12121 · dask/dask · GitHub