17 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <sys/ioctl.h>
22 #include <sys/socket.h>
32 #include <system_error>
33 #include <unordered_map>
44 #define IS_MINGW() defined(__MINGW32__)
46 #if IS_MINGW() && !defined(POLLRDNORM) && !defined(POLLRDBAND)
61 #pragma message("Distributed training on mingw is not supported.")
62 typedef struct pollfd {
66 } WSAPOLLFD, *PWSAPOLLFD, *LPWSAPOLLFD;
69 #define POLLIN (0x0100 | 0x0200)
70 #define POLLPRI 0x0400
72 #define POLLOUT 0x0010
79 template <
typename PollFD>
80 int PollImpl(PollFD* pfd,
int nfds, std::chrono::seconds timeout) noexcept(
true) {
86 xgboost::MingWError();
89 return WSAPoll(pfd, nfds, std::chrono::milliseconds(timeout).count());
93 return poll(pfd, nfds, timeout.count() < 0 ? -1 : std::chrono::milliseconds(timeout).count());
99 if ((revents & POLLERR) != 0) {
101 auto str = strerror(err);
104 " code:" + std::to_string(err));
106 if ((revents & POLLNVAL) != 0) {
109 if ((revents & POLLHUP) != 0) {
121 #if defined(POLLRDHUP)
123 if ((revents & POLLRDHUP) != 0) {
140 pfd.events |= POLLIN;
151 pfd.events |= POLLOUT;
164 pfd.events |= POLLPRI;
174 const auto& pfd =
fds.find(fd);
175 return pfd !=
fds.end() && ((pfd->second.events & POLLIN) != 0);
186 const auto& pfd =
fds.find(fd);
187 return pfd !=
fds.end() && ((pfd->second.events & POLLOUT) != 0);
198 bool check_error =
true) {
199 std::vector<pollfd> fdset;
200 fdset.reserve(
fds.size());
201 for (
auto kv :
fds) {
202 fdset.push_back(kv.second);
204 std::int32_t ret =
PollImpl(fdset.data(), fdset.size(), timeout);
207 "Poll timeout:" + std::to_string(timeout.count()) +
" seconds.",
208 std::make_error_code(std::errc::timed_out));
209 }
else if (ret < 0) {
213 for (
auto& pfd : fdset) {
215 if (check_error && !result.OK()) {
219 auto revents = pfd.revents & pfd.events;
220 fds[pfd.fd].events = revents;
225 std::unordered_map<SOCKET, pollfd>
fds;
230 #if IS_MINGW() && !defined(POLLRDNORM) && !defined(POLLRDBAND)
TCP socket for simple communication.
Definition: socket.h:267
HandleT const & Handle() const
Return the native socket file descriptor.
Definition: socket.h:539
int PollImpl(PollFD *pfd, int nfds, std::chrono::seconds timeout) noexcept(true)
Definition: poll_utils.h:80
std::enable_if_t< std::is_integral_v< E >, xgboost::collective::Result > PollError(E const &revents)
Definition: poll_utils.h:98
Definition: poll_utils.h:76
auto Fail(std::string msg, char const *file=__builtin_FILE(), std::int32_t line=__builtin_LINE())
Return failure.
Definition: result.h:124
auto Success() noexcept(true)
Return success.
Definition: result.h:120
collective::Result FailWithCode(std::string msg)
Definition: socket.h:78
int SOCKET
Definition: poll_utils.h:40
size_t sock_size_t
Definition: poll_utils.h:41
helper data structure to perform poll
Definition: poll_utils.h:131
void WatchException(SOCKET fd)
add file descriptor to watch for exception
Definition: poll_utils.h:161
bool CheckWrite(xgboost::collective::TCPSocket const &socket) const
Definition: poll_utils.h:189
void WatchRead(xgboost::collective::TCPSocket const &socket)
Definition: poll_utils.h:142
xgboost::collective::Result Poll(std::chrono::seconds timeout, bool check_error=true)
perform poll on the set defined, read, write, exception
Definition: poll_utils.h:197
void WatchWrite(xgboost::collective::TCPSocket const &socket)
Definition: poll_utils.h:153
bool CheckRead(SOCKET fd) const
Check if the descriptor is ready for read.
Definition: poll_utils.h:173
void WatchException(xgboost::collective::TCPSocket const &socket)
Definition: poll_utils.h:166
bool CheckWrite(SOCKET fd) const
Check if the descriptor is ready for write.
Definition: poll_utils.h:185
void WatchWrite(SOCKET fd)
add file descriptor to watch for write
Definition: poll_utils.h:148
bool CheckRead(xgboost::collective::TCPSocket const &socket) const
Definition: poll_utils.h:177
std::unordered_map< SOCKET, pollfd > fds
Definition: poll_utils.h:225
void WatchRead(SOCKET fd)
add file descriptor to watch for read
Definition: poll_utils.h:137
An error type that's easier to handle than throwing dmlc exception. We can record and propagate the s...
Definition: result.h:67