Hello,
TLDR: Are there any best practices for ensuring deterministic dask partitioning for machine learning experiments to be reproducible?
I have been using xgboost with dask and noticed that I cannot get deterministic results even when setting the random seed parameter. I created an issue on the xgboost github here. Someone mentioned that it’s actually an issue with dask and not having the same partitioning every time the model is fit. I am hoping there are some best practices in ensuring deterministic partitioning that someone would be willing to share.
I am currently setting the index to a date column then repartitioning my data before I fit my model but I am having it automatically partitioned into 500 MB chunks. I would have originally assumed this data would be partitioned the same every time because of this process but based on what I’m seeing that does not seem to be the case. I could change this to a specific number of partitions if that would help to ensure consistency but I am not sure what would be best.
Thank you in advance for any help or clarification!
I’m not sure if Dask partitioning algorithm is deterministic or not, especially when specifying partition_size as you said.
Maybe you could try to use divisions kwarg of repartition if you know well your dataset? This should be deterministic in partitionning!
However, I’m just wondering if the order in which the model is fit to partitions doesn’t play a role here too? I’m not a XGBoost expert… I’m just thinking that having deterministic results with a ML algorithm in a distributed cluster is probably something challenging!
Hi @guillaumeeb,
Thank you for your response! I tried using divisions kwarg thinking along the same lines as you but my dask dataframe is created from a delayed object so the divisions are None to start which results in an error when trying to set the divisions now. I can revisit this though to see if specifying that at the beginning in from_delayed would allow me to accomplish this.
According to the reply on that xgboost post I linked if the partitions are the exact same every time xgboost should be reproducible. So right now I am printing out the sizes of my partitions at each step to see where they keep varying.
I have also noticed that it seems even if the input partitions are the exact same using set_index() without specifying any other parameters seems to produce different resulting partitions (I haven’t tried using auto npartitions yet). This is also counterintuitive to me considering it is sorting and partitioning the data but it could be that the sorting they are using is not stable I am not sure yet.
I would hope that given the same data with the same partitions that dask would repartition the data deterministically the same way every time. If dask is to be used for data science (dask-ml etc.) I do think it is extremely important that the results be reproducible.
Thank you so much for your thoughts on this problem!! I am open to trying anything at this point so please let me know if you have any other ideas or if I am mistaken in anything I have said so far!
Would this be possible to have a bit of code and a stack-trace to understand better the problem?
As you are setting the index to a date column, maybe you could use the divisions kwarg from set_index?
I’m not an expert in this area, you could expect that by reading the documentation:
Under normal operation this function does an initial pass over the index column to compute approximate quantiles to serve as future divisions
Maybe the approximate quantiles have some random in them? I did have a look to the source code, but I’m under the impression this is not the case. On the contrary, I saw several things indicating a will to have non random partitions like here.
The best thing to do would be to have a reproducible example. Do you think you could build some toy example here?
Hello, @guillaumeeb thank you so much for your willingness to help me out! I seriously appreciate your thoughts and insights! Apologies for such a late reply but work got very busy and I didn’t get a chance to circle back to this.
After spending a lot more time with dask and understanding the various factors contributing to my process not being reproducible I was able to make it as reproducible as possible. By this I mean that the metrics resulting from the ML model (AUC, etc.) are within ±0.01 of each other and the exact same rows are getting the exact same predictions each time the job is run.
To try and help others the same way that you helped me I have compiled a list of best practices that applied to my problem:
(Almost) Ensuring Reproducibility:
To ensure reproducibility with dask for machine learning purposes you have to take extra care to make sure that the data in each partition is exactly the same.
Ensure reproducibility from the start. It is extremely painful to diagnose a data pipeline that is already created instead of ensuring it is reproducible as you build it initially.
The most obvious one is always specify a random seed when sampling data, training models, etc.
When validating reproducibility: always create a brand new dask cluster then connect to it and send it work. If you have an existing cluster and keep it up and running and submit multiple jobs the results will be different because the cluster already has data on each of the workers etc. Starting fresh each time ensures better reproducibility.
Be extremely careful with sorting. Make sure whatever sorting you are doing is both deterministic and stable.
Even ordered data coming out of trino is not stable so if you are sorting on a date column and there are ties rows can come out in different orders each time you run the query.
If querying supplemental data from a database ensure that you order your query explicitly. Otherwise data coming out of the DB may not be ordered the same run to run.
For this it is important to have a unique identifier that you can order by that does not contain duplicates. Date columns will not be enough if they repeat.
Be careful when using sets in general but specifically to create column lists. When a set is created or converted back to a list it looses all ordering information.
Just because the rows in each partition have the same counts does not mean that the data within each partition is in the same order. For debugging you can save a small amount of data from run to run as a parquet file and loop over compute and compare each partition using df.compare() or df.equals().
Disable work stealing. This was a very important one.