Unable to register a worker plugin

Hi everyone,

I was encoutenring problems with the unmanaged memory of my workers. I’ve read this article and I tried to add a client.run(gc.collect) at the end of my code. And effectively, all of the unmanaged memory disapeared.

So as it is advised in the article, I want to set up a worker plugin to collect the unmanaged memory each time my workers start to execute a task. I wrote a GarbageCollector class :

from distributed import WorkerPlugin
import gc


class GarbageCollector(WorkerPlugin):
    def __init__(self, client):
        self.client = client

    def transition(self, key, start, finish, **kwargs):
        if start == 'executing':
            self.client.run(gc.collect)

And I register my plugin like this :

    cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
    client = Client(cluster)
    plugin = garbage_collector.GarbageCollector(client)
    client.register_worker_plugin(plugin)
    return client

I thought it was the correct way to set up my plugin but I get this error when I add the client.register_worker_plugin(plugin) line :

TypeError: cannot pickle '_asyncio.Task' object

I really don’t understand why this error occurs when I’m passing a client to my GarbageCollector class, but when I pass an other type of variable (str for example) there is no problem.

Did someone can help me please ?

Regards,

Clément

@ClementAlba Since the WorkerPlugin runs on the workers, you don’t need to reference the client. The following should work:

from distributed import WorkerPlugin, LocalCluster, Client
import gc


class GarbageCollector(WorkerPlugin):
    def __init__(self):
        pass

    def transition(self, key, start, finish, **kwargs):
        if start == 'executing':
            gc.collect()


cluster = LocalCluster()
client = Client(cluster)

plugin = GarbageCollector()

client.register_worker_plugin(plugin)

A note about this approach – It will run on each worker after every task transition, and can get expensive to do if you have a lot of small tasks. If this happens, you can consider running it every few seconds instead.

Hi @pavithraes,

I had put this question aside for a while and now I’m thinking about it again.

Your solution works great, indeed the error not occurs anymore. Perfect ! But I still have unmanaged memory issues (memory not released back to the os). I’ve read this article and it provides solutions for Linux and MacOS workers only. Is there any solutions for Windows workers ?

Regards.