xgboost
threading_utils.h
Go to the documentation of this file.
1 
4 #ifndef XGBOOST_COMMON_THREADING_UTILS_H_
5 #define XGBOOST_COMMON_THREADING_UTILS_H_
6 
7 #include <dmlc/common.h>
8 #include <dmlc/omp.h>
9 
10 #include <algorithm>
11 #include <limits>
12 #include <type_traits> // std::is_signed
13 #include <vector>
14 
15 #include "xgboost/logging.h"
16 
17 #if !defined(_OPENMP)
18 extern "C" {
19 inline int32_t omp_get_thread_limit() __GOMP_NOTHROW { return 1; } // NOLINT
20 }
21 #endif // !defined(_OPENMP)
22 
23 // MSVC doesn't implement the thread limit.
24 #if defined(_OPENMP) && defined(_MSC_VER)
25 extern "C" {
26 inline int32_t omp_get_thread_limit() { return std::numeric_limits<int32_t>::max(); } // NOLINT
27 }
28 #endif // defined(_MSC_VER)
29 
30 namespace xgboost {
31 namespace common {
32 
33 // Represent simple range of indexes [begin, end)
34 // Inspired by tbb::blocked_range
35 class Range1d {
36  public:
37  Range1d(size_t begin, size_t end): begin_(begin), end_(end) {
38  CHECK_LT(begin, end);
39  }
40 
41  size_t begin() const { // NOLINT
42  return begin_;
43  }
44 
45  size_t end() const { // NOLINT
46  return end_;
47  }
48 
49  private:
50  size_t begin_;
51  size_t end_;
52 };
53 
54 
55 // Split 2d space to balanced blocks
56 // Implementation of the class is inspired by tbb::blocked_range2d
57 // However, TBB provides only (n x m) 2d range (matrix) separated by blocks. Example:
58 // [ 1,2,3 ]
59 // [ 4,5,6 ]
60 // [ 7,8,9 ]
61 // But the class is able to work with different sizes in each 'row'. Example:
62 // [ 1,2 ]
63 // [ 3,4,5,6 ]
64 // [ 7,8,9]
65 // If grain_size is 2: It produces following blocks:
66 // [1,2], [3,4], [5,6], [7,8], [9]
67 // The class helps to process data in several tree nodes (non-balanced usually) in parallel
68 // Using nested parallelism (by nodes and by data in each node)
69 // it helps to improve CPU resources utilization
71  public:
72  // Example of space:
73  // [ 1,2 ]
74  // [ 3,4,5,6 ]
75  // [ 7,8,9]
76  // BlockedSpace2d will create following blocks (tasks) if grain_size=2:
77  // 1-block: first_dimension = 0, range of indexes in a 'row' = [0,2) (includes [1,2] values)
78  // 2-block: first_dimension = 1, range of indexes in a 'row' = [0,2) (includes [3,4] values)
79  // 3-block: first_dimension = 1, range of indexes in a 'row' = [2,4) (includes [5,6] values)
80  // 4-block: first_dimension = 2, range of indexes in a 'row' = [0,2) (includes [7,8] values)
81  // 5-block: first_dimension = 2, range of indexes in a 'row' = [2,3) (includes [9] values)
82  // Arguments:
83  // dim1 - size of the first dimension in the space
84  // getter_size_dim2 - functor to get the second dimensions for each 'row' by row-index
85  // grain_size - max size of produced blocks
86  template<typename Func>
87  BlockedSpace2d(size_t dim1, Func getter_size_dim2, size_t grain_size) {
88  for (size_t i = 0; i < dim1; ++i) {
89  const size_t size = getter_size_dim2(i);
90  const size_t n_blocks = size/grain_size + !!(size % grain_size);
91  for (size_t iblock = 0; iblock < n_blocks; ++iblock) {
92  const size_t begin = iblock * grain_size;
93  const size_t end = std::min(begin + grain_size, size);
94  AddBlock(i, begin, end);
95  }
96  }
97  }
98 
99  // Amount of blocks(tasks) in a space
100  size_t Size() const {
101  return ranges_.size();
102  }
103 
104  // get index of the first dimension of i-th block(task)
105  size_t GetFirstDimension(size_t i) const {
106  CHECK_LT(i, first_dimension_.size());
107  return first_dimension_[i];
108  }
109 
110  // get a range of indexes for the second dimension of i-th block(task)
111  Range1d GetRange(size_t i) const {
112  CHECK_LT(i, ranges_.size());
113  return ranges_[i];
114  }
115 
116  private:
117  void AddBlock(size_t first_dimension, size_t begin, size_t end) {
118  first_dimension_.push_back(first_dimension);
119  ranges_.emplace_back(begin, end);
120  }
121 
122  std::vector<Range1d> ranges_;
123  std::vector<size_t> first_dimension_;
124 };
125 
126 
127 // Wrapper to implement nested parallelism with simple omp parallel for
128 template <typename Func>
129 void ParallelFor2d(const BlockedSpace2d& space, int nthreads, Func func) {
130  const size_t num_blocks_in_space = space.Size();
131  CHECK_GE(nthreads, 1);
132 
133  dmlc::OMPException exc;
134 #pragma omp parallel num_threads(nthreads)
135  {
136  exc.Run([&]() {
137  size_t tid = omp_get_thread_num();
138  size_t chunck_size =
139  num_blocks_in_space / nthreads + !!(num_blocks_in_space % nthreads);
140 
141  size_t begin = chunck_size * tid;
142  size_t end = std::min(begin + chunck_size, num_blocks_in_space);
143  for (auto i = begin; i < end; i++) {
144  func(space.GetFirstDimension(i), space.GetRange(i));
145  }
146  });
147  }
148  exc.Rethrow();
149 }
150 
154 struct Sched {
155  enum {
160  } sched;
161  size_t chunk{0};
162 
163  Sched static Auto() { return Sched{kAuto}; }
164  Sched static Dyn(size_t n = 0) { return Sched{kDynamic, n}; }
165  Sched static Static(size_t n = 0) { return Sched{kStatic, n}; }
166  Sched static Guided() { return Sched{kGuided}; }
167 };
168 
169 template <typename Index, typename Func>
170 void ParallelFor(Index size, int32_t n_threads, Sched sched, Func fn) {
171 #if defined(_MSC_VER)
172  // msvc doesn't support unsigned integer as openmp index.
173  using OmpInd = std::conditional_t<std::is_signed<Index>::value, Index, omp_ulong>;
174 #else
175  using OmpInd = Index;
176 #endif
177  OmpInd length = static_cast<OmpInd>(size);
178  CHECK_GE(n_threads, 1);
179 
180  dmlc::OMPException exc;
181  switch (sched.sched) {
182  case Sched::kAuto: {
183 #pragma omp parallel for num_threads(n_threads)
184  for (OmpInd i = 0; i < length; ++i) {
185  exc.Run(fn, i);
186  }
187  break;
188  }
189  case Sched::kDynamic: {
190  if (sched.chunk == 0) {
191 #pragma omp parallel for num_threads(n_threads) schedule(dynamic)
192  for (OmpInd i = 0; i < length; ++i) {
193  exc.Run(fn, i);
194  }
195  } else {
196 #pragma omp parallel for num_threads(n_threads) schedule(dynamic, sched.chunk)
197  for (OmpInd i = 0; i < length; ++i) {
198  exc.Run(fn, i);
199  }
200  }
201  break;
202  }
203  case Sched::kStatic: {
204  if (sched.chunk == 0) {
205 #pragma omp parallel for num_threads(n_threads) schedule(static)
206  for (OmpInd i = 0; i < length; ++i) {
207  exc.Run(fn, i);
208  }
209  } else {
210 #pragma omp parallel for num_threads(n_threads) schedule(static, sched.chunk)
211  for (OmpInd i = 0; i < length; ++i) {
212  exc.Run(fn, i);
213  }
214  }
215  break;
216  }
217  case Sched::kGuided: {
218 #pragma omp parallel for num_threads(n_threads) schedule(guided)
219  for (OmpInd i = 0; i < length; ++i) {
220  exc.Run(fn, i);
221  }
222  break;
223  }
224  }
225  exc.Rethrow();
226 }
227 
228 template <typename Index, typename Func>
229 void ParallelFor(Index size, int32_t n_threads, Func fn) {
230  ParallelFor(size, n_threads, Sched::Static(), fn);
231 }
232 
233 inline int32_t OmpGetThreadLimit() {
234  int32_t limit = omp_get_thread_limit();
235  CHECK_GE(limit, 1) << "Invalid thread limit for OpenMP.";
236  return limit;
237 }
238 
239 int32_t GetCfsCPUCount() noexcept;
240 
241 inline int32_t OmpGetNumThreads(int32_t n_threads) {
242  if (n_threads <= 0) {
243  n_threads = std::min(omp_get_num_procs(), omp_get_max_threads());
244  }
245  n_threads = std::min(n_threads, OmpGetThreadLimit());
246  n_threads = std::max(n_threads, 1);
247  return n_threads;
248 }
249 
250 
256 template <typename T, size_t MaxStackSize>
258  public:
259  explicit MemStackAllocator(size_t required_size) : required_size_(required_size) {
260  if (MaxStackSize >= required_size_) {
261  ptr_ = stack_mem_;
262  } else {
263  ptr_ = reinterpret_cast<T*>(malloc(required_size_ * sizeof(T)));
264  }
265  if (!ptr_) {
266  throw std::bad_alloc{};
267  }
268  }
269  MemStackAllocator(size_t required_size, T init) : MemStackAllocator{required_size} {
270  std::fill_n(ptr_, required_size_, init);
271  }
272 
274  if (required_size_ > MaxStackSize) {
275  free(ptr_);
276  }
277  }
278  T& operator[](size_t i) { return ptr_[i]; }
279  T const& operator[](size_t i) const { return ptr_[i]; }
280 
281  private:
282  T* ptr_ = nullptr;
283  size_t required_size_;
284  T stack_mem_[MaxStackSize];
285 };
286 } // namespace common
287 } // namespace xgboost
288 
289 #endif // XGBOOST_COMMON_THREADING_UTILS_H_
xgboost::common::Sched::kAuto
@ kAuto
Definition: threading_utils.h:156
xgboost::common::BlockedSpace2d::Size
size_t Size() const
Definition: threading_utils.h:100
xgboost::common::Sched::kGuided
@ kGuided
Definition: threading_utils.h:159
xgboost::common::MemStackAllocator::~MemStackAllocator
~MemStackAllocator()
Definition: threading_utils.h:273
xgboost::common::Index
Optionally compressed gradient index. The compression works only with dense data.
Definition: hist_util.h:207
xgboost::common::OmpGetNumThreads
int32_t OmpGetNumThreads(int32_t n_threads)
Definition: threading_utils.h:241
xgboost::common::BlockedSpace2d::GetFirstDimension
size_t GetFirstDimension(size_t i) const
Definition: threading_utils.h:105
xgboost::common::Range1d::begin
size_t begin() const
Definition: threading_utils.h:41
xgboost::common::ParallelFor
void ParallelFor(Index size, int32_t n_threads, Sched sched, Func fn)
Definition: threading_utils.h:170
xgboost::common::Sched::Auto
static Sched Auto()
Definition: threading_utils.h:163
xgboost::omp_ulong
dmlc::omp_ulong omp_ulong
define unsigned long for openmp loop
Definition: base.h:271
xgboost::common::MemStackAllocator::operator[]
T const & operator[](size_t i) const
Definition: threading_utils.h:279
xgboost::common::BlockedSpace2d
Definition: threading_utils.h:70
omp_get_thread_limit
int32_t omp_get_thread_limit() __GOMP_NOTHROW
Definition: threading_utils.h:19
xgboost::common::MemStackAllocator::operator[]
T & operator[](size_t i)
Definition: threading_utils.h:278
xgboost::common::BlockedSpace2d::BlockedSpace2d
BlockedSpace2d(size_t dim1, Func getter_size_dim2, size_t grain_size)
Definition: threading_utils.h:87
xgboost::common::BlockedSpace2d::GetRange
Range1d GetRange(size_t i) const
Definition: threading_utils.h:111
xgboost::common::Sched::Static
static Sched Static(size_t n=0)
Definition: threading_utils.h:165
xgboost::common::Sched::Guided
static Sched Guided()
Definition: threading_utils.h:166
xgboost::common::GetCfsCPUCount
int32_t GetCfsCPUCount() noexcept
xgboost::common::MemStackAllocator::MemStackAllocator
MemStackAllocator(size_t required_size)
Definition: threading_utils.h:259
xgboost::common::MemStackAllocator::MemStackAllocator
MemStackAllocator(size_t required_size, T init)
Definition: threading_utils.h:269
xgboost::common::Sched::sched
enum xgboost::common::Sched::@0 sched
xgboost::common::Range1d
Definition: threading_utils.h:35
xgboost::common::OmpGetThreadLimit
int32_t OmpGetThreadLimit()
Definition: threading_utils.h:233
xgboost::common::Range1d::Range1d
Range1d(size_t begin, size_t end)
Definition: threading_utils.h:37
xgboost::common::Sched::kStatic
@ kStatic
Definition: threading_utils.h:158
xgboost::common::MemStackAllocator
A C-style array with in-stack allocation. As long as the array is smaller than MaxStackSize,...
Definition: threading_utils.h:257
xgboost::common::Sched
Definition: threading_utils.h:154
xgboost::common::ParallelFor2d
void ParallelFor2d(const BlockedSpace2d &space, int nthreads, Func func)
Definition: threading_utils.h:129
xgboost::common::Sched::kDynamic
@ kDynamic
Definition: threading_utils.h:157
xgboost::common::Range1d::end
size_t end() const
Definition: threading_utils.h:45
xgboost::common::Sched::Dyn
static Sched Dyn(size_t n=0)
Definition: threading_utils.h:164
xgboost::common::Sched::chunk
size_t chunk
Definition: threading_utils.h:161
xgboost
namespace of xgboost
Definition: base.h:110