Distributed XGBoost with Dask

Dask is a parallel computing library built on Python. Dask allows easy management of distributed workers and excels handling large distributed data science workflows. The implementation in XGBoost originates from dask-xgboost with some extended functionalities and a different interface. Right now it is still under construction and may change (with proper warnings) in the future.


Dask is trivial to install using either pip or conda. See here for official install documentation. For accelerating XGBoost with GPU, dask-cuda is recommended for creating GPU clusters.


There are 3 different components in dask from a user’s perspective, namely a scheduler, bunch of workers and some clients connecting to the scheduler. For using XGBoost with dask, one needs to call XGBoost dask interface from the client side. A small example illustrates the basic usage:

cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)

dtrain = xgb.dask.DaskDMatrix(client, X, y)  # X and y are dask dataframes or arrays

output = xgb.dask.train(client,
                        {'verbosity': 2,
                         'nthread': 1,
                         'tree_method': 'hist'},
                        num_boost_round=4, evals=[(dtrain, 'train')])

Here we first create a cluster in signle-node mode wtih distributed.LocalCluster, then connect a client to this cluster, setting up environment for later computation. Similar to non-distributed interface, we create a DMatrix object and pass it to train along with some other parameters. Except in dask interface, client is an extra argument for carrying out the computation, when set to None XGBoost will use the default client returned from dask.

There are two sets of APIs implemented in XGBoost. The first set is functional API illustrated in above example. Given the data and a set of parameters, train function returns a model and the computation history as Python dictionary

{'booster': Booster,
 'history': dict}

For prediction, pass the output returned by train into xgb.dask.predict

prediction = xgb.dask.predict(client, output, dtrain)

Or equivalently, pass output['booster']:

prediction = xgb.dask.predict(client, output['booster'], dtrain)

Here prediction is a dask Array object containing predictions from model.

Another set of API is a Scikit-Learn wrapper, which mimics the stateful Scikit-Learn interface with DaskXGBClassifier and DaskXGBRegressor. See xgboost/demo/dask for more examples.

Why is the initialization of DaskDMatrix so slow and throws weird errors

The dask API in XGBoost requires construction of DaskDMatrix. With Scikit-Learn interface, DaskDMatrix is implicitly constructed for each input data during fit or predict. You might have observed its construction is taking incredible amount of time, and sometimes throws error that doesn’t seem to be relevant to DaskDMatrix. Here is a brief explanation for why. By default most of dask’s computation is lazy, which means the computation is not carried out until you explicitly ask for result, either by calling compute() or wait(). See above link for details in dask, and this wiki for general concept of lazy evaluation. The DaskDMatrix constructor forces all lazy computation to materialize, which means it’s where all your earlier computation actually being carried out, including operations like dd.read_csv(). To isolate the computation in DaskDMatrix from other lazy computations, one can explicitly wait for results of input data before calling constructor of DaskDMatrix. Also dask’s web interface can be used to monitor what operations are currently being performed.


Basic functionalities including training and generating predictions for regression and classification are implemented. But there are still some other limitations we haven’t addressed yet.

  • Label encoding for Scikit-Learn classifier.

  • Ranking

  • Callback functions are not tested.

  • To use cross validation one needs to explicitly train different models instead of using a functional API like xgboost.cv.