Hi everyone,
I was encoutenring problems with the unmanaged memory of my workers. I’ve read this article and I tried to add a client.run(gc.collect)
at the end of my code. And effectively, all of the unmanaged memory disapeared.
So as it is advised in the article, I want to set up a worker plugin to collect the unmanaged memory each time my workers start to execute a task. I wrote a GarbageCollector class :
from distributed import WorkerPlugin
import gc
class GarbageCollector(WorkerPlugin):
def __init__(self, client):
self.client = client
def transition(self, key, start, finish, **kwargs):
if start == 'executing':
self.client.run(gc.collect)
And I register my plugin like this :
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
client = Client(cluster)
plugin = garbage_collector.GarbageCollector(client)
client.register_worker_plugin(plugin)
return client
I thought it was the correct way to set up my plugin but I get this error when I add the client.register_worker_plugin(plugin)
line :
TypeError: cannot pickle '_asyncio.Task' object
I really don’t understand why this error occurs when I’m passing a client to my GarbageCollector class, but when I pass an other type of variable (str for example) there is no problem.
Did someone can help me please ?
Regards,
Clément