Distributed XGBoost with Ray

Ray is a general purpose distributed execution framework. Ray can be used to scale computations from a single node to a cluster of hundreds of nodes without changing any code.

The Python bindings of Ray come with a collection of well maintained machine learning libraries for hyperparameter optimization and model serving.

The XGBoost-Ray project provides an interface to run XGBoost training and prediction jobs on a Ray cluster. It allows to utilize distributed data representations, such as Modin dataframes, as well as distributed loading from cloud storage (e.g. Parquet files).

XGBoost-Ray integrates well with hyperparameter optimization library Ray Tune, and implements advanced fault tolerance handling mechanisms. With Ray you can scale your training jobs to hundreds of nodes just by adding new nodes to a cluster. You can also use Ray to leverage multi GPU XGBoost training.

Installing and starting Ray

Ray can be installed from PyPI like this:

pip install ray

If you’re using Ray on a single machine, you don’t need to do anything else - XGBoost-Ray will automatically start a local Ray cluster when used.

If you want to use Ray on a cluster, you can use the Ray cluster launcher.

Installing XGBoost-Ray

XGBoost-Ray is also available via PyPI:

pip install xgboost_ray

This will install all dependencies needed to run XGBoost on Ray, including Ray itself if it hasn’t been installed before.

Using XGBoost-Ray for training and prediction

XGBoost-Ray uses the same API as core XGBoost. There are only two differences:

  1. Instead of using a xgboost.DMatrix, you’ll use a xgboost_ray.RayDMatrix object

  2. There is an additional ray_params parameter that you can use to configure distributed training.

Simple training example

To run this simple example, you’ll need to install scikit-learn (with pip install sklearn).

In this example, we will load the breast cancer dataset and train a binary classifier using two actors.

from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)

evals_result = {}
bst = train(
    {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train")],
    verbose_eval=False,
    ray_params=RayParams(num_actors=2, cpus_per_actor=1))

bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
    evals_result["train"]["error"][-1]))

The only differences compared to the non-distributed API are the import statement (xgboost_ray instead of xgboost), using the RayDMatrix instead of the DMatrix, and passing a RayParams object.

The return object is a regular xgboost.Booster instance.

Simple prediction example

from xgboost_ray import RayDMatrix, RayParams, predict
from sklearn.datasets import load_breast_cancer
import xgboost as xgb

data, labels = load_breast_cancer(return_X_y=True)

dpred = RayDMatrix(data, labels)

bst = xgb.Booster(model_file="model.xgb")
pred_ray = predict(bst, dpred, ray_params=RayParams(num_actors=2))

print(pred_ray)

In this example, the data will be split across two actors. The result array will integrate this data in the correct order.

The RayParams object

The RayParams object is used to configure various settings relating to the distributed training.

class xgboost_ray.RayParams(num_actors=0, cpus_per_actor=0, gpus_per_actor=- 1, resources_per_actor=None, elastic_training=False, max_failed_actors=0, max_actor_restarts=0, checkpoint_frequency=5, distributed_callbacks=None)

Parameters to configure Ray-specific behavior.

Parameters
  • num_actors (int) – Number of parallel Ray actors.

  • cpus_per_actor (int) – Number of CPUs to be used per Ray actor.

  • gpus_per_actor (int) – Number of GPUs to be used per Ray actor.

  • resources_per_actor (Optional[Dict]) – Dict of additional resources required per Ray actor.

  • elastic_training (bool) – If True, training will continue with fewer actors if an actor fails. Default False.

  • max_failed_actors (int) – If elastic_training is True, this specifies the maximum number of failed actors with which we still continue training.

  • max_actor_restarts (int) – Number of retries when Ray actors fail. Defaults to 0 (no retries). Set to -1 for unlimited retries.

  • checkpoint_frequency (int) – How often to save checkpoints. Defaults to 5 (every 5th iteration).

  • distributed_callbacks (Optional[List[xgboost_ray.callback.DistributedCallback]]) –

Return type

None

Multi GPU training

Ray automatically detects GPUs on cluster nodes. In order to start training on multiple GPUs, all you have to do is to set the gpus_per_actor parameter of the RayParams object, as well as the num_actors parameter for multiple GPUs:

ray_params = RayParams(
    num_actors=4,
    gpus_per_actor=1,
)

This will train on four GPUs in parallel.

Note that it usually does not make sense to allocate more than one GPU per actor, as XGBoost relies on distributed libraries such as Dask or Ray to utilize multi GPU taining.

Setting the number of CPUs per actor

XGBoost natively utilizes multi threading to speed up computations. Thus if your are training on CPUs only, there is likely no benefit in using more than one actor per node. In that case, assuming you have a cluster of homogeneous nodes, set the number of CPUs per actor to the number of CPUs available on each node, and the number of actors to the number of nodes.

If you are using multi GPU training on a single node, divide the number of available CPUs evenly across all actors. For instance, if you have 16 CPUs and 4 GPUs available, each actor should access 1 GPU and 4 CPUs.

If you are using a cluster of heterogeneous nodes (with different amounts of CPUs), you might just want to use the greatest common divisor for the number of CPUs per actor. E.g. if you have a cluster of three nodes with 4, 8, and 12 CPUs, respectively, you’d start 6 actors with 4 CPUs each for maximum CPU utilization.

Fault tolerance

XGBoost-Ray supports two fault tolerance modes. In non-elastic training, whenever a training actor dies (e.g. because the node goes down), the training job will stop, XGBoost-Ray will wait for the actor (or its resources) to become available again (this might be on a different node), and then continue training once all actors are back.

In elastic-training, whenever a training actor dies, the rest of the actors continue training without the dead actor. If the actor comes back, it will be re-integrated into training again.

Please note that in elastic-training this means that you will train on fewer data for some time. The benefit is that you can continue training even if a node goes away for the remainder of the training run, and don’t have to wait until it is back up again. In practice this usually leads to a very minor decrease in accuracy but a much shorter training time compared to non-elastic training.

Both training modes can be configured using the respective RayParams parameters.

Hyperparameter optimization

XGBoost-Ray integrates well with hyperparameter optimization framework Ray Tune. Ray Tune uses Ray to start multiple distributed trials with different hyperparameter configurations. If used with XGBoost-Ray, these trials will then start their own distributed training jobs.

XGBoost-Ray automatically reports evaluation results back to Ray Tune. There’s only a few things you need to do:

  1. Put your XGBoost-Ray training call into a function accepting parameter configurations (train_model in the example below).

  2. Create a RayParams object (ray_params in the example below).

  3. Define the parameter search space (config dict in the example below).

  4. Call tune.run():
    • The metric parameter should contain the metric you’d like to optimize. Usually this consists of the prefix passed to the evals argument of xgboost_ray.train(), and an eval_metric passed in the XGBoost parameters (train-error in the example below).

    • The mode should either be min or max, depending on whether you’d like to minimize or maximize the metric

    • The resources_per_actor should be set using ray_params.get_tune_resources(). This will make sure that each trial has the necessary resources available to start their distributed training jobs.

from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

num_actors = 4
num_cpus_per_actor = 1

ray_params = RayParams(
    num_actors=num_actors, cpus_per_actor=num_cpus_per_actor)

def train_model(config):
    train_x, train_y = load_breast_cancer(return_X_y=True)
    train_set = RayDMatrix(train_x, train_y)

    evals_result = {}
    bst = train(
        params=config,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(train_set, "train")],
        verbose_eval=False,
        ray_params=ray_params)
    bst.save_model("model.xgb")

from ray import tune

# Specify the hyperparameter search space.
config = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
    "eta": tune.loguniform(1e-4, 1e-1),
    "subsample": tune.uniform(0.5, 1.0),
    "max_depth": tune.randint(1, 9)
}

# Make sure to use the `get_tune_resources` method to set the `resources_per_trial`
analysis = tune.run(
    train_model,
    config=config,
    metric="train-error",
    mode="min",
    num_samples=4,
    resources_per_trial=ray_params.get_tune_resources())
print("Best hyperparameters", analysis.best_config)

Ray Tune supports various search algorithms and libraries (e.g. BayesOpt, Tree-Parzen estimators), smart schedulers like successive halving, and other features. Please refer to the Ray Tune documentation for more information.

Additional resources