xgboost
threading_utils.h
Go to the documentation of this file.
1 
6 #ifndef XGBOOST_COMMON_THREADING_UTILS_H_
7 #define XGBOOST_COMMON_THREADING_UTILS_H_
8 
9 #include <dmlc/common.h>
10 #include <vector>
11 #include <algorithm>
12 #include "xgboost/logging.h"
13 
14 namespace xgboost {
15 namespace common {
16 
17 // Represent simple range of indexes [begin, end)
18 // Inspired by tbb::blocked_range
19 class Range1d {
20  public:
21  Range1d(size_t begin, size_t end): begin_(begin), end_(end) {
22  CHECK_LT(begin, end);
23  }
24 
25  size_t begin() const { // NOLINT
26  return begin_;
27  }
28 
29  size_t end() const { // NOLINT
30  return end_;
31  }
32 
33  private:
34  size_t begin_;
35  size_t end_;
36 };
37 
38 
39 // Split 2d space to balanced blocks
40 // Implementation of the class is inspired by tbb::blocked_range2d
41 // However, TBB provides only (n x m) 2d range (matrix) separated by blocks. Example:
42 // [ 1,2,3 ]
43 // [ 4,5,6 ]
44 // [ 7,8,9 ]
45 // But the class is able to work with different sizes in each 'row'. Example:
46 // [ 1,2 ]
47 // [ 3,4,5,6 ]
48 // [ 7,8,9]
49 // If grain_size is 2: It produces following blocks:
50 // [1,2], [3,4], [5,6], [7,8], [9]
51 // The class helps to process data in several tree nodes (non-balanced usually) in parallel
52 // Using nested parallelism (by nodes and by data in each node)
53 // it helps to improve CPU resources utilization
55  public:
56  // Example of space:
57  // [ 1,2 ]
58  // [ 3,4,5,6 ]
59  // [ 7,8,9]
60  // BlockedSpace2d will create following blocks (tasks) if grain_size=2:
61  // 1-block: first_dimension = 0, range of indexes in a 'row' = [0,2) (includes [1,2] values)
62  // 2-block: first_dimension = 1, range of indexes in a 'row' = [0,2) (includes [3,4] values)
63  // 3-block: first_dimension = 1, range of indexes in a 'row' = [2,4) (includes [5,6] values)
64  // 4-block: first_dimension = 2, range of indexes in a 'row' = [0,2) (includes [7,8] values)
65  // 5-block: first_dimension = 2, range of indexes in a 'row' = [2,3) (includes [9] values)
66  // Arguments:
67  // dim1 - size of the first dimension in the space
68  // getter_size_dim2 - functor to get the second dimensions for each 'row' by row-index
69  // grain_size - max size of produced blocks
70  template<typename Func>
71  BlockedSpace2d(size_t dim1, Func getter_size_dim2, size_t grain_size) {
72  for (size_t i = 0; i < dim1; ++i) {
73  const size_t size = getter_size_dim2(i);
74  const size_t n_blocks = size/grain_size + !!(size % grain_size);
75  for (size_t iblock = 0; iblock < n_blocks; ++iblock) {
76  const size_t begin = iblock * grain_size;
77  const size_t end = std::min(begin + grain_size, size);
78  AddBlock(i, begin, end);
79  }
80  }
81  }
82 
83  // Amount of blocks(tasks) in a space
84  size_t Size() const {
85  return ranges_.size();
86  }
87 
88  // get index of the first dimension of i-th block(task)
89  size_t GetFirstDimension(size_t i) const {
90  CHECK_LT(i, first_dimension_.size());
91  return first_dimension_[i];
92  }
93 
94  // get a range of indexes for the second dimension of i-th block(task)
95  Range1d GetRange(size_t i) const {
96  CHECK_LT(i, ranges_.size());
97  return ranges_[i];
98  }
99 
100  private:
101  void AddBlock(size_t first_dimension, size_t begin, size_t end) {
102  first_dimension_.push_back(first_dimension);
103  ranges_.emplace_back(begin, end);
104  }
105 
106  std::vector<Range1d> ranges_;
107  std::vector<size_t> first_dimension_;
108 };
109 
110 
111 // Wrapper to implement nested parallelism with simple omp parallel for
112 template <typename Func>
113 void ParallelFor2d(const BlockedSpace2d& space, int nthreads, Func func) {
114  const size_t num_blocks_in_space = space.Size();
115  nthreads = std::min(nthreads, omp_get_max_threads());
116  nthreads = std::max(nthreads, 1);
117 
118  dmlc::OMPException exc;
119 #pragma omp parallel num_threads(nthreads)
120  {
121  exc.Run([&]() {
122  size_t tid = omp_get_thread_num();
123  size_t chunck_size =
124  num_blocks_in_space / nthreads + !!(num_blocks_in_space % nthreads);
125 
126  size_t begin = chunck_size * tid;
127  size_t end = std::min(begin + chunck_size, num_blocks_in_space);
128  for (auto i = begin; i < end; i++) {
129  func(space.GetFirstDimension(i), space.GetRange(i));
130  }
131  });
132  }
133  exc.Rethrow();
134 }
135 
136 template <typename Index, typename Func>
137 void ParallelFor(Index size, size_t nthreads, Func fn) {
138  dmlc::OMPException exc;
139 #pragma omp parallel for num_threads(nthreads) schedule(static)
140  for (Index i = 0; i < size; ++i) {
141  exc.Run(fn, i);
142  }
143  exc.Rethrow();
144 }
145 
146 template <typename Index, typename Func>
147 void ParallelFor(Index size, Func fn) {
148  ParallelFor(size, omp_get_max_threads(), fn);
149 }
150 
151 /* \brief Configure parallel threads.
152  *
153  * \param p_threads Number of threads, when it's less than or equal to 0, this function
154  * will change it to number of process on system.
155  *
156  * \return Global openmp max threads before configuration.
157  */
158 inline int32_t OmpSetNumThreads(int32_t* p_threads) {
159  auto& threads = *p_threads;
160  int32_t nthread_original = omp_get_max_threads();
161  if (threads <= 0) {
162  threads = omp_get_num_procs();
163  }
164  omp_set_num_threads(threads);
165  return nthread_original;
166 }
167 inline int32_t OmpSetNumThreadsWithoutHT(int32_t* p_threads) {
168  auto& threads = *p_threads;
169  int32_t nthread_original = omp_get_max_threads();
170  if (threads <= 0) {
171  threads = nthread_original;
172  }
173  omp_set_num_threads(threads);
174  return nthread_original;
175 }
176 
177 } // namespace common
178 } // namespace xgboost
179 
180 #endif // XGBOOST_COMMON_THREADING_UTILS_H_
xgboost::common::BlockedSpace2d::Size
size_t Size() const
Definition: threading_utils.h:84
xgboost::common::OmpSetNumThreadsWithoutHT
int32_t OmpSetNumThreadsWithoutHT(int32_t *p_threads)
Definition: threading_utils.h:167
xgboost::common::Index
Definition: hist_util.h:142
xgboost::common::BlockedSpace2d::GetFirstDimension
size_t GetFirstDimension(size_t i) const
Definition: threading_utils.h:89
xgboost::common::Range1d::begin
size_t begin() const
Definition: threading_utils.h:25
xgboost::common::BlockedSpace2d
Definition: threading_utils.h:54
xgboost::common::BlockedSpace2d::BlockedSpace2d
BlockedSpace2d(size_t dim1, Func getter_size_dim2, size_t grain_size)
Definition: threading_utils.h:71
xgboost::common::BlockedSpace2d::GetRange
Range1d GetRange(size_t i) const
Definition: threading_utils.h:95
xgboost::common::ParallelFor
void ParallelFor(Index size, size_t nthreads, Func fn)
Definition: threading_utils.h:137
xgboost::common::Range1d
Definition: threading_utils.h:19
xgboost::common::Range1d::Range1d
Range1d(size_t begin, size_t end)
Definition: threading_utils.h:21
xgboost::common::ParallelFor2d
void ParallelFor2d(const BlockedSpace2d &space, int nthreads, Func func)
Definition: threading_utils.h:113
xgboost::common::OmpSetNumThreads
int32_t OmpSetNumThreads(int32_t *p_threads)
Definition: threading_utils.h:158
xgboost::common::Range1d::end
size_t end() const
Definition: threading_utils.h:29
xgboost
namespace of xgboost
Definition: base.h:110