Simulating Federated Learning of XGBoost with Dask (simulating local servers)

Hey everyone, I’m trying to implement a federated learning simulation with XGBoost. I don’t need any privacy measures (because I’m just simulating federated learning). I was wondering if I could use dask xgboost (Distributed XGBoost with Dask — xgboost 1.6.2 documentation) for this.

Essentially, I want to create a model combining all the data from the different simulated local servers without each local server ever knowing the data from another server. From what I understand, Dask computing between nodes doesn’t share data between nodes. The statistics being communicated through clients are just the gradients and hessians of the tree model. I wanted to see if the same principle could be used for federated learning.

I have split a large dataset into 8 local datasets within my script. I want to create a federated XGBoost model with these 8 datasets. I assume that by using distributed learning, the data itself isn’t shared between the simulated local ‘servers’. Is there some code showing how I can implement this in a python script?

Here’s how I currently have the data setup in my python script.

# dictionaries to hold training and testing data frames
X_training_dictionary = {}  
X_testing_dictionary = {}
y_training_dictionary = {}
y_testing_dictionary = {}

# each hosp is considered a simulated local server
for i, hosp in enumerate(data_dictionary): 
    #data_dictionary was initialized earlier and contains complete data for each local hospital
    data = data_dictionary[hosp]
    
    X = data[:, :-1]
    Y = data[:, -1]
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.20, random_state=rs)
    X_training_dictionary[hosp] = X_train
    X_testing_dictionary[hosp] = X_test
    y_training_dictionary[hosp] = y_train
    y_testing_dictionary[hosp] = y_test

Thanks!

Hi @surajraj99 and welcome to discourse!

For many users, the magic of using XGBoost with Dask is the ability to train a model in parallel, without sacrificing data. For instance, in this abbreviated example from the XGBoost with Dask docs, all chunks of X and y are used to train one model:

...
X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))

dtrain = xgb.dask.DaskDMatrix(client, X, y)

output = xgb.dask.train(
    client,
    {"verbosity": 2, "tree_method": "hist", "objective": "reg:squarederror"},
    dtrain,
...

If I’m understanding correctly, you’d like to separately train models on different datasets (the “local servers”), then combine those model results into a new model. This is a bit different, and I think you could use XGBoost, with some assistance from Dask Futures or Dask Delayed. Here’s a very simple example, using client.map from Futures:

from dask.distributed import LocalCluster, Client
import numpy as np

cluster = LocalCluster(n_workers=2)
client = Client(cluster)

# this can be any function, e.g. xgb.train
def my_func(x):
    return len(x)

data = [np.arange(10), np.arange(20)]

# applies my_func to each element in data
list_of_futures = client.map(my_func, data)
results = client.gather(list_of_futures)
print(results)
# prints [10, 20]

I showed an example using Futures rather than Delayed, anticipating that you may want to have more freedom regarding what will happen to the result (i.e. an output, a parameter to another function, or a condition that controls what happens next).

You’ve probably already thought about this, but it’s also worth noting if your individual datasets aren’t very large, then you could always serially train the models and you may not need Dask at all.

Hope that helps!

Thanks @scharlottej13!

This method looks interesting, I’ll definitely explore it a bit more.

Just a couple questions. In my case, would the client.gather() provide the functionality needed to combine the local models? And would this combined model be stored in ‘results’?

When you say ‘combine model results’, will these models be sharing the data that was used to train them on the backend? Or will these models simply combine themselves by aggregating model parameters such as gradients and hessians?

Thanks again!

No problem @surajraj99! Thinking about your use case more, I would suggest you try to simulate a federated model without Dask first, maybe on a subset of your data. Then, if your results seem plausible, you could try moving to Dask to make the computation more efficient.

To answer your questions:

In my case, would the client.gather() provide the functionality needed to combine the local models? And would this combined model be stored in ‘results’?

No, this function gathers the results of the futures-- this is related to parallel computing, not model estimation.

When you say ‘combine model results’, will these models be sharing the data that was used to train them on the backend?

This just depends on whatever function you use! I’m not an XGBoost expert, but perhaps there is some functionality for this?