I am running a Dask cluster on HPC resources using distributed
and dask-jobqueue
. I separated the scheduler into its own SLURM job, and the clients connect to the address of the scheduler. Workers are requested to SLURM by the scheduler as needed.
My issue is that, for some reason, the scheduler is being overwhelmed and has trouble keeping up with the load. This results in the dask dashboard being very slow or unavailable and tasks end up being dropped due to lost connections to the scheduler. In the dask dashboard, GIL contention on the scheduler is 90% to 100%, which would explain this slowdown. Looking at the system resources usage with htop
on the node running the scheduler, I see that one core is used at 100% and the other cores are idle. I have currently 5 running worker nodes with each 24 cores, with a maximum of 32 threads used by dask.
Firstly, it seems somewhat strange that this fairly reasonable load (?) results in a very slow scheduler process. The network is supposed to be quite fast and I’m not dealing with huge volumes of data anyway. I’m not sure how to figure out if I’m doing something wrong which would explain this.
Also, is it possible to launch a scheduler with multiple processes so that it can handle more operations in parallel? There is a “multiprocess scheduler” mentioned at a couple of places in the documentation (Here, for example), but I seems like it could just be multiple processes for the dask workers.
Any idea would be very much appreciated!
dask==2024.8.2
dask-jobqueue==0.9.0
distributed==2024.8.2
What do yoiu mean by fairly reasonable load? Scheduler process is monothreaded. When it is using 100% of CPU, it’s its maximum, and not a good sign.
Well, we need to have at least a grasp of your Workflow, but this kind of load means either lots of Workers, and more probably lots of small tasks being submitted.
No, as saif above, Scheduler process is currently mono threaded.
Exactly.
Thanks @guillaumeeb for the prompt feedback!
I was suggesting that it is a fairly reasonable load because it involves hundreds to thousands of tasks, which in itself does not seem excessive. It does depend on the tasks themselves, and it seems that it could be because of the size of the objects that I am passing, as I get this warning:
[site-packages]/distributed/client.py:3358: UserWarning: Sending large graph of size 12.59 MiB.
This is strange, because I can pickle/cloudpickle each task object and get a file around 10 KB. Digging further, it’s probably because some of the objects passed to the scheduler are SQLAlchemy objects. pympler
gives me a reading of 2-4 MB per task object, with the overwhelming majority of it being SQLAlchemy-related fields.
Is it possible that this SQLAlchemy-related fields, which evidently don’t get pickled, are passed through the scheduler and thus massively increasing the volume of data to process?
Expunging or expiring the SQLAlchemy objects doesn’t seem to change their size in memory according to pympler
. I will try pickling and unpickling the objects when submitting tasks to verify that this solves the issue. I assume that Distributed already does something similar though?
Also, I know that it is recommended to load data from the workers themselves when possible, but I was having load issues with my database. This is a last resort currently.
Okay, I tried having an explicit cloudpickling before the submission to the scheduler and before returning the results from the worker. There doesn’t seem to be a significant change, the scheduler still gets overwhelmed rather quickly. I also still get the warning that large graphs are being sent.
Some more details about my setup: I have been using this exact way of submitting and running tasks for around a year now. I’ve been having some various issues, but nothing like this. The different workflows that I run can have objects of variable size and take more tasks in total, so it seems that could be linked to the problem.
Here’s a screenshot of the dashboard around the time when it started getting overwhelmed. There were around 700 tasks on 7 workers, with tasks taking a couple minutes to an hour. Almost all the bytes stored by worker are unmanaged.
The worker bandwidth seems fairly low, although I don’t have a concrete point of reference.
Here are the stats for the scheduler. The bandwidth is also quite reasonable, but the CPU usage is at 100%. The number of file descriptor is suspicious, I’m not sure what the scheduler is doing there.
I will try to produce a minimal reproducible example to narrow down the issue further.
Okay, I think that I figured out what’s happening.
The issue is not with serializing SQLAlchemy objects per se. Digging more into how the objects change during the runtime, I found out that each task object can grow larger, in the range of 100-200 KB and possibly more depending on the run parameters.
The unexpectedly large size is partly because of a heterogeneous data structure that I’m using, namely arrays of arrays of strings and numpy arrays. It turns out that this data structure has a very large overhead, especially when serialized. By separating it into arrays of strings and a multidimensional numpy array, the size is cut down by more than a factor 3.
I will optimize further the data that ends up being transferred in order to reduce the load on the scheduler. Already with this change, there is a noticeable improvement in scheduler responsiveness. I must have not bumped into this issue before because I was using less data in this problematic data structure and running at a smaller scale.
Final update on the issue. Reducing the size of the SQLAlchemy objects did help a bit, but the issue remained significant. Looking into the scheduler profile, I saw that it spent most of its time serializing and deserializing SQLAlchemy objects, with some calls to SQLAlchemy functions accounting for most of the runtime.
I changed up my code to cache the required fields from the SQLAlchemy objects and not pass those objects themselves to the dask scheduler. The GIL contention now sits at 0.0% with several thousand jobs and a hundred nodes. The difference is massive.
TLDR: don’t make dask serialize SQLAlchemy objects, even if they are small
1 Like