#ifndef EPOLL_H #define EPOLL_H #include #include #include #include #include #include #include #include typedef int uv_os_sock_t; static const int UV_READABLE = EPOLLIN; static const int UV_WRITABLE = EPOLLOUT; namespace uS { struct Poll; struct Timer; extern std::recursive_mutex cbMutex; extern void (*callbacks[16])(Poll *, int, int); extern int cbHead; struct Timepoint { void (*cb)(Timer *); Timer *timer; std::chrono::system_clock::time_point timepoint; int nextDelay; }; struct Loop { int epfd; int numPolls = 0; bool cancelledLastTimer; int delay = -1; // delay to next timer expiry, or -1 if no timers pending epoll_event readyEvents[1024]; std::chrono::system_clock::time_point timepoint; std::vector timers; std::vector> closing; void (*preCb)(void *) = nullptr; void (*postCb)(void *) = nullptr; void *preCbData, *postCbData; Loop(bool defaultLoop) { epfd = epoll_create1(EPOLL_CLOEXEC); timepoint = std::chrono::system_clock::now(); } static Loop *createLoop(bool defaultLoop = true) { return new Loop(defaultLoop); } void destroy() { ::close(epfd); delete this; } void doEpoll(int epollTimeout); void run(); void poll(); int getEpollFd() { return epfd; } }; struct Timer { Loop *loop; void *data; Timer(Loop *loop) { this->loop = loop; } void start(void (*cb)(Timer *), int timeout, int repeat) { loop->timepoint = std::chrono::system_clock::now(); std::chrono::system_clock::time_point timepoint = loop->timepoint + std::chrono::milliseconds(timeout); Timepoint t = {cb, this, timepoint, repeat}; loop->timers.insert( std::upper_bound(loop->timers.begin(), loop->timers.end(), t, [](const Timepoint &a, const Timepoint &b) { return a.timepoint < b.timepoint; }), t ); loop->delay = -1; if (loop->timers.size()) { loop->delay = std::max(std::chrono::duration_cast(loop->timers[0].timepoint - loop->timepoint).count(), 0); } } void setData(void *data) { this->data = data; } void *getData() { return data; } // always called before destructor void stop() { auto pos = loop->timers.begin(); for (Timepoint &t : loop->timers) { if (t.timer == this) { loop->timers.erase(pos); break; } pos++; } loop->cancelledLastTimer = true; loop->delay = -1; if (loop->timers.size()) { loop->delay = std::max(std::chrono::duration_cast(loop->timers[0].timepoint - loop->timepoint).count(), 0); } } void close() { delete this; } }; // 4 bytes struct Poll { protected: struct { int fd : 28; unsigned int cbIndex : 4; } state = {-1, 0}; Poll(Loop *loop, uv_os_sock_t fd) { fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); state.fd = fd; loop->numPolls++; } // todo: pre-set all of callbacks up front and remove mutex void setCb(void (*cb)(Poll *p, int status, int events)) { cbMutex.lock(); state.cbIndex = cbHead; for (int i = 0; i < cbHead; i++) { if (callbacks[i] == cb) { state.cbIndex = i; break; } } if (state.cbIndex == cbHead) { callbacks[cbHead++] = cb; } cbMutex.unlock(); } void (*getCb())(Poll *, int, int) { return callbacks[state.cbIndex]; } void reInit(Loop *loop, uv_os_sock_t fd) { state.fd = fd; loop->numPolls++; } void start(Loop *loop, Poll *self, int events) { epoll_event event; event.events = events; event.data.ptr = self; epoll_ctl(loop->epfd, EPOLL_CTL_ADD, state.fd, &event); } void change(Loop *loop, Poll *self, int events) { epoll_event event; event.events = events; event.data.ptr = self; epoll_ctl(loop->epfd, EPOLL_CTL_MOD, state.fd, &event); } void stop(Loop *loop) { epoll_event event; epoll_ctl(loop->epfd, EPOLL_CTL_DEL, state.fd, &event); } bool fastTransfer(Loop *loop, Loop *newLoop, int events) { stop(loop); start(newLoop, this, events); loop->numPolls--; // needs to lock the newLoop's numPolls! newLoop->numPolls++; return true; } bool threadSafeChange(Loop *loop, Poll *self, int events) { change(loop, self, events); return true; } void close(Loop *loop, void (*cb)(Poll *)) { state.fd = -1; loop->closing.push_back({this, cb}); } public: bool isClosed() { return state.fd == -1; } uv_os_sock_t getFd() { return state.fd; } friend struct Loop; }; // this should be put in the Loop as a general "post" function always available struct Async : Poll { void (*cb)(Async *); Loop *loop; void *data; Async(Loop *loop) : Poll(loop, ::eventfd(0, EFD_CLOEXEC)) { this->loop = loop; } void start(void (*cb)(Async *)) { this->cb = cb; Poll::setCb([](Poll *p, int, int) { uint64_t val; if (::read(((Async *) p)->state.fd, &val, 8) == 8) { ((Async *) p)->cb((Async *) p); } }); Poll::start(loop, this, UV_READABLE); } void send() { uint64_t one = 1; if (::write(state.fd, &one, 8) != 8) { return; } } void close() { Poll::stop(loop); ::close(state.fd); Poll::close(loop, [](Poll *p) { delete (Async *) p; }); } void setData(void *data) { this->data = data; } void *getData() { return data; } }; } #endif // EPOLL_H