xgboost
threading_utils.h
Go to the documentation of this file.
1 
6 #ifndef XGBOOST_COMMON_THREADING_UTILS_H_
7 #define XGBOOST_COMMON_THREADING_UTILS_H_
8 
9 #include <dmlc/common.h>
10 #include <dmlc/omp.h>
11 
12 #include <algorithm>
13 #include <limits>
14 #include <type_traits> // std::is_signed
15 #include <vector>
16 
17 #include "xgboost/logging.h"
18 
19 #if !defined(_OPENMP)
20 extern "C" {
21 inline int32_t omp_get_thread_limit() __GOMP_NOTHROW { return 1; } // NOLINT
22 }
23 #endif // !defined(_OPENMP)
24 
25 // MSVC doesn't implement the thread limit.
26 #if defined(_OPENMP) && defined(_MSC_VER)
27 extern "C" {
28 inline int32_t omp_get_thread_limit() { return std::numeric_limits<int32_t>::max(); } // NOLINT
29 }
30 #endif // defined(_MSC_VER)
31 
32 namespace xgboost {
33 namespace common {
34 
35 // Represent simple range of indexes [begin, end)
36 // Inspired by tbb::blocked_range
37 class Range1d {
38  public:
39  Range1d(size_t begin, size_t end): begin_(begin), end_(end) {
40  CHECK_LT(begin, end);
41  }
42 
43  size_t begin() const { // NOLINT
44  return begin_;
45  }
46 
47  size_t end() const { // NOLINT
48  return end_;
49  }
50 
51  private:
52  size_t begin_;
53  size_t end_;
54 };
55 
56 
57 // Split 2d space to balanced blocks
58 // Implementation of the class is inspired by tbb::blocked_range2d
59 // However, TBB provides only (n x m) 2d range (matrix) separated by blocks. Example:
60 // [ 1,2,3 ]
61 // [ 4,5,6 ]
62 // [ 7,8,9 ]
63 // But the class is able to work with different sizes in each 'row'. Example:
64 // [ 1,2 ]
65 // [ 3,4,5,6 ]
66 // [ 7,8,9]
67 // If grain_size is 2: It produces following blocks:
68 // [1,2], [3,4], [5,6], [7,8], [9]
69 // The class helps to process data in several tree nodes (non-balanced usually) in parallel
70 // Using nested parallelism (by nodes and by data in each node)
71 // it helps to improve CPU resources utilization
73  public:
74  // Example of space:
75  // [ 1,2 ]
76  // [ 3,4,5,6 ]
77  // [ 7,8,9]
78  // BlockedSpace2d will create following blocks (tasks) if grain_size=2:
79  // 1-block: first_dimension = 0, range of indexes in a 'row' = [0,2) (includes [1,2] values)
80  // 2-block: first_dimension = 1, range of indexes in a 'row' = [0,2) (includes [3,4] values)
81  // 3-block: first_dimension = 1, range of indexes in a 'row' = [2,4) (includes [5,6] values)
82  // 4-block: first_dimension = 2, range of indexes in a 'row' = [0,2) (includes [7,8] values)
83  // 5-block: first_dimension = 2, range of indexes in a 'row' = [2,3) (includes [9] values)
84  // Arguments:
85  // dim1 - size of the first dimension in the space
86  // getter_size_dim2 - functor to get the second dimensions for each 'row' by row-index
87  // grain_size - max size of produced blocks
88  template<typename Func>
89  BlockedSpace2d(size_t dim1, Func getter_size_dim2, size_t grain_size) {
90  for (size_t i = 0; i < dim1; ++i) {
91  const size_t size = getter_size_dim2(i);
92  const size_t n_blocks = size/grain_size + !!(size % grain_size);
93  for (size_t iblock = 0; iblock < n_blocks; ++iblock) {
94  const size_t begin = iblock * grain_size;
95  const size_t end = std::min(begin + grain_size, size);
96  AddBlock(i, begin, end);
97  }
98  }
99  }
100 
101  // Amount of blocks(tasks) in a space
102  size_t Size() const {
103  return ranges_.size();
104  }
105 
106  // get index of the first dimension of i-th block(task)
107  size_t GetFirstDimension(size_t i) const {
108  CHECK_LT(i, first_dimension_.size());
109  return first_dimension_[i];
110  }
111 
112  // get a range of indexes for the second dimension of i-th block(task)
113  Range1d GetRange(size_t i) const {
114  CHECK_LT(i, ranges_.size());
115  return ranges_[i];
116  }
117 
118  private:
119  void AddBlock(size_t first_dimension, size_t begin, size_t end) {
120  first_dimension_.push_back(first_dimension);
121  ranges_.emplace_back(begin, end);
122  }
123 
124  std::vector<Range1d> ranges_;
125  std::vector<size_t> first_dimension_;
126 };
127 
128 
129 // Wrapper to implement nested parallelism with simple omp parallel for
130 template <typename Func>
131 void ParallelFor2d(const BlockedSpace2d& space, int nthreads, Func func) {
132  const size_t num_blocks_in_space = space.Size();
133  nthreads = std::min(nthreads, omp_get_max_threads());
134  nthreads = std::max(nthreads, 1);
135 
136  dmlc::OMPException exc;
137 #pragma omp parallel num_threads(nthreads)
138  {
139  exc.Run([&]() {
140  size_t tid = omp_get_thread_num();
141  size_t chunck_size =
142  num_blocks_in_space / nthreads + !!(num_blocks_in_space % nthreads);
143 
144  size_t begin = chunck_size * tid;
145  size_t end = std::min(begin + chunck_size, num_blocks_in_space);
146  for (auto i = begin; i < end; i++) {
147  func(space.GetFirstDimension(i), space.GetRange(i));
148  }
149  });
150  }
151  exc.Rethrow();
152 }
153 
157 struct Sched {
158  enum {
163  } sched;
164  size_t chunk{0};
165 
166  Sched static Auto() { return Sched{kAuto}; }
167  Sched static Dyn(size_t n = 0) { return Sched{kDynamic, n}; }
168  Sched static Static(size_t n = 0) { return Sched{kStatic, n}; }
169  Sched static Guided() { return Sched{kGuided}; }
170 };
171 
172 template <typename Index, typename Func>
173 void ParallelFor(Index size, int32_t n_threads, Sched sched, Func fn) {
174 #if defined(_MSC_VER)
175  // msvc doesn't support unsigned integer as openmp index.
176  using OmpInd = std::conditional_t<std::is_signed<Index>::value, Index, omp_ulong>;
177 #else
178  using OmpInd = Index;
179 #endif
180  OmpInd length = static_cast<OmpInd>(size);
181 
182  dmlc::OMPException exc;
183  switch (sched.sched) {
184  case Sched::kAuto: {
185 #pragma omp parallel for num_threads(n_threads)
186  for (OmpInd i = 0; i < length; ++i) {
187  exc.Run(fn, i);
188  }
189  break;
190  }
191  case Sched::kDynamic: {
192  if (sched.chunk == 0) {
193 #pragma omp parallel for num_threads(n_threads) schedule(dynamic)
194  for (OmpInd i = 0; i < length; ++i) {
195  exc.Run(fn, i);
196  }
197  } else {
198 #pragma omp parallel for num_threads(n_threads) schedule(dynamic, sched.chunk)
199  for (OmpInd i = 0; i < length; ++i) {
200  exc.Run(fn, i);
201  }
202  }
203  break;
204  }
205  case Sched::kStatic: {
206  if (sched.chunk == 0) {
207 #pragma omp parallel for num_threads(n_threads) schedule(static)
208  for (OmpInd i = 0; i < length; ++i) {
209  exc.Run(fn, i);
210  }
211  } else {
212 #pragma omp parallel for num_threads(n_threads) schedule(static, sched.chunk)
213  for (OmpInd i = 0; i < length; ++i) {
214  exc.Run(fn, i);
215  }
216  }
217  break;
218  }
219  case Sched::kGuided: {
220 #pragma omp parallel for num_threads(n_threads) schedule(guided)
221  for (OmpInd i = 0; i < length; ++i) {
222  exc.Run(fn, i);
223  }
224  break;
225  }
226  }
227  exc.Rethrow();
228 }
229 
230 template <typename Index, typename Func>
231 void ParallelFor(Index size, size_t n_threads, Func fn) {
232  ParallelFor(size, n_threads, Sched::Static(), fn);
233 }
234 
235 // FIXME(jiamingy): Remove this function to get rid of `omp_set_num_threads`, which sets a
236 // global variable in runtime and affects other programs in the same process.
237 template <typename Index, typename Func>
238 void ParallelFor(Index size, Func fn) {
239  ParallelFor(size, omp_get_max_threads(), Sched::Static(), fn);
240 } // !defined(_OPENMP)
241 
242 
243 inline int32_t OmpGetThreadLimit() {
244  int32_t limit = omp_get_thread_limit();
245  CHECK_GE(limit, 1) << "Invalid thread limit for OpenMP.";
246  return limit;
247 }
248 
249 /* \brief Configure parallel threads.
250  *
251  * \param p_threads Number of threads, when it's less than or equal to 0, this function
252  * will change it to number of process on system.
253  *
254  * \return Global openmp max threads before configuration.
255  */
256 inline int32_t OmpSetNumThreads(int32_t* p_threads) {
257  auto& threads = *p_threads;
258  int32_t nthread_original = omp_get_max_threads();
259  if (threads <= 0) {
260  threads = omp_get_num_procs();
261  }
262  threads = std::min(threads, OmpGetThreadLimit());
263  omp_set_num_threads(threads);
264  return nthread_original;
265 }
266 
267 inline int32_t OmpSetNumThreadsWithoutHT(int32_t* p_threads) {
268  auto& threads = *p_threads;
269  int32_t nthread_original = omp_get_max_threads();
270  if (threads <= 0) {
271  threads = nthread_original;
272  }
273  threads = std::min(threads, OmpGetThreadLimit());
274  omp_set_num_threads(threads);
275  return nthread_original;
276 }
277 
278 inline int32_t OmpGetNumThreads(int32_t n_threads) {
279  if (n_threads <= 0) {
280  n_threads = omp_get_num_procs();
281  }
282  n_threads = std::min(n_threads, OmpGetThreadLimit());
283  return n_threads;
284 }
285 } // namespace common
286 } // namespace xgboost
287 
288 #endif // XGBOOST_COMMON_THREADING_UTILS_H_
xgboost::common::Sched::kAuto
@ kAuto
Definition: threading_utils.h:159
xgboost::common::BlockedSpace2d::Size
size_t Size() const
Definition: threading_utils.h:102
xgboost::common::OmpSetNumThreadsWithoutHT
int32_t OmpSetNumThreadsWithoutHT(int32_t *p_threads)
Definition: threading_utils.h:267
xgboost::common::Sched::kGuided
@ kGuided
Definition: threading_utils.h:162
xgboost::common::Index
Definition: hist_util.h:146
xgboost::common::OmpGetNumThreads
int32_t OmpGetNumThreads(int32_t n_threads)
Definition: threading_utils.h:278
xgboost::common::BlockedSpace2d::GetFirstDimension
size_t GetFirstDimension(size_t i) const
Definition: threading_utils.h:107
xgboost::common::Range1d::begin
size_t begin() const
Definition: threading_utils.h:43
xgboost::common::ParallelFor
void ParallelFor(Index size, int32_t n_threads, Sched sched, Func fn)
Definition: threading_utils.h:173
xgboost::common::Sched::Auto
static Sched Auto()
Definition: threading_utils.h:166
xgboost::omp_ulong
dmlc::omp_ulong omp_ulong
define unsigned long for openmp loop
Definition: base.h:271
xgboost::common::BlockedSpace2d
Definition: threading_utils.h:72
omp_get_thread_limit
int32_t omp_get_thread_limit() __GOMP_NOTHROW
Definition: threading_utils.h:21
xgboost::common::BlockedSpace2d::BlockedSpace2d
BlockedSpace2d(size_t dim1, Func getter_size_dim2, size_t grain_size)
Definition: threading_utils.h:89
xgboost::common::BlockedSpace2d::GetRange
Range1d GetRange(size_t i) const
Definition: threading_utils.h:113
xgboost::common::Sched::Static
static Sched Static(size_t n=0)
Definition: threading_utils.h:168
xgboost::common::Sched::Guided
static Sched Guided()
Definition: threading_utils.h:169
xgboost::common::Sched::sched
enum xgboost::common::Sched::@0 sched
xgboost::common::Range1d
Definition: threading_utils.h:37
xgboost::common::OmpGetThreadLimit
int32_t OmpGetThreadLimit()
Definition: threading_utils.h:243
xgboost::common::Range1d::Range1d
Range1d(size_t begin, size_t end)
Definition: threading_utils.h:39
xgboost::common::Sched::kStatic
@ kStatic
Definition: threading_utils.h:161
xgboost::common::Sched
Definition: threading_utils.h:157
xgboost::common::ParallelFor2d
void ParallelFor2d(const BlockedSpace2d &space, int nthreads, Func func)
Definition: threading_utils.h:131
xgboost::common::OmpSetNumThreads
int32_t OmpSetNumThreads(int32_t *p_threads)
Definition: threading_utils.h:256
xgboost::common::Sched::kDynamic
@ kDynamic
Definition: threading_utils.h:160
xgboost::common::Range1d::end
size_t end() const
Definition: threading_utils.h:47
xgboost::common::Sched::Dyn
static Sched Dyn(size_t n=0)
Definition: threading_utils.h:167
xgboost::common::Sched::chunk
size_t chunk
Definition: threading_utils.h:164
xgboost
namespace of xgboost
Definition: base.h:110