I’m currently facing an issue in which the msgpack library used for serialization in Dask is hitting a hard recursion limit when running a large computation in a distributed cluster. The issue has already been reported, albeit with no solution so far.
One possible way to avoid this issue is to change the default serializers / deserializers used by Dask to pickle. This is easily done when creating the Client object, as Client(cluster_address, serializers=['pickle'], deserializers=['pickle']). However, my computation is complex enough to require some workers to launch new tasks on their own, something they do by getting a Client through the worker_client method. Alas, this client is configured with the default serializers / deserializers, and thus tragedy ensues. The worker_client accepts no parameters at all, so my guess is the Client is being built at some previous step that I cannot grasp.
Therefore, my question is: how can I configure the serializers / deserializers in the clients used by the workers?
I did not find a solution to globally set serializers within Dask.
Did you try to just build a Client yourself by getting the Scheduler URL through the get_worker().scheduler attribute? Don’t forget to consider possible Scheduler deadlock with this approach though.
Thanks a lot Guillaume! I didn’t thought of that approach, and it does work! The only quirk I found is that the get_worker().scheduler turned out to be some kind of rpc connection object, so I ended up with the following code
worker = get_worker()
scheduler = worker.scheduler
scheduler_url = scheduler if isinstance(scheduler, str) else scheduler.address # In case we have a connection object
client = Client(scheduler_url, serializers=['pickle'], deserializers=['pickle'])
And with that it seems that the msgpack serializer is no longer being used in that worker. At least I’m no longer hitting the recursion limit I mentioned above. I’m still facing other memory issues that I think are related to the worker trying to serialize too much information, but since that is outside of the scope of my original question I have accepted your answer.