I was encoutenring problems with the unmanaged memory of my workers. I’ve read this article and I tried to add a
client.run(gc.collect) at the end of my code. And effectively, all of the unmanaged memory disapeared.
So as it is advised in the article, I want to set up a worker plugin to collect the unmanaged memory each time my workers start to execute a task. I wrote a GarbageCollector class :
from distributed import WorkerPlugin import gc class GarbageCollector(WorkerPlugin): def __init__(self, client): self.client = client def transition(self, key, start, finish, **kwargs): if start == 'executing': self.client.run(gc.collect)
And I register my plugin like this :
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) client = Client(cluster) plugin = garbage_collector.GarbageCollector(client) client.register_worker_plugin(plugin) return client
I thought it was the correct way to set up my plugin but I get this error when I add the
client.register_worker_plugin(plugin) line :
TypeError: cannot pickle '_asyncio.Task' object
I really don’t understand why this error occurs when I’m passing a client to my GarbageCollector class, but when I pass an other type of variable (str for example) there is no problem.
Did someone can help me please ?