LCOV - code coverage report
Current view: top level - src/detail/epoll - scheduler.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 76.1 % 427 325
Test Date: 2026-02-04 19:19:32 Functions: 89.7 % 39 35

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_EPOLL
      13              : 
      14              : #include "src/detail/epoll/scheduler.hpp"
      15              : #include "src/detail/epoll/op.hpp"
      16              : #include "src/detail/make_err.hpp"
      17              : #include "src/detail/posix/resolver_service.hpp"
      18              : #include "src/detail/posix/signals.hpp"
      19              : 
      20              : #include <boost/corosio/detail/except.hpp>
      21              : #include <boost/corosio/detail/thread_local_ptr.hpp>
      22              : 
      23              : #include <atomic>
      24              : #include <chrono>
      25              : #include <limits>
      26              : 
      27              : #include <errno.h>
      28              : #include <fcntl.h>
      29              : #include <sys/epoll.h>
      30              : #include <sys/eventfd.h>
      31              : #include <sys/socket.h>
      32              : #include <sys/timerfd.h>
      33              : #include <unistd.h>
      34              : 
      35              : /*
      36              :     epoll Scheduler - Single Reactor Model
      37              :     ======================================
      38              : 
      39              :     This scheduler uses a thread coordination strategy to provide handler
      40              :     parallelism and avoid the thundering herd problem.
      41              :     Instead of all threads blocking on epoll_wait(), one thread becomes the
      42              :     "reactor" while others wait on a condition variable for handler work.
      43              : 
      44              :     Thread Model
      45              :     ------------
      46              :     - ONE thread runs epoll_wait() at a time (the reactor thread)
      47              :     - OTHER threads wait on wakeup_event_ (condition variable) for handlers
      48              :     - When work is posted, exactly one waiting thread wakes via notify_one()
      49              :     - This matches Windows IOCP semantics where N posted items wake N threads
      50              : 
      51              :     Event Loop Structure (do_one)
      52              :     -----------------------------
      53              :     1. Lock mutex, try to pop handler from queue
      54              :     2. If got handler: execute it (unlocked), return
      55              :     3. If queue empty and no reactor running: become reactor
      56              :        - Run epoll_wait (unlocked), queue I/O completions, loop back
      57              :     4. If queue empty and reactor running: wait on condvar for work
      58              : 
      59              :     The reactor_running_ flag ensures only one thread owns epoll_wait().
      60              :     After the reactor queues I/O completions, it loops back to try getting
      61              :     a handler, giving priority to handler execution over more I/O polling.
      62              : 
      63              :     Wake Coordination (wake_one_thread_and_unlock)
      64              :     ----------------------------------------------
      65              :     When posting work:
      66              :     - If idle threads exist: notify_one() wakes exactly one worker
      67              :     - Else if reactor running: interrupt via eventfd write
      68              :     - Else: no-op (thread will find work when it checks queue)
      69              : 
      70              :     This is critical for matching IOCP behavior. With the old model, posting
      71              :     N handlers would wake all threads (thundering herd). Now each post()
      72              :     wakes at most one thread, and that thread handles exactly one item.
      73              : 
      74              :     Work Counting
      75              :     -------------
      76              :     outstanding_work_ tracks pending operations. When it hits zero, run()
      77              :     returns. Each operation increments on start, decrements on completion.
      78              : 
      79              :     Timer Integration
      80              :     -----------------
      81              :     Timers are handled by timer_service. The reactor adjusts epoll_wait
      82              :     timeout to wake for the nearest timer expiry. When a new timer is
      83              :     scheduled earlier than current, timer_service calls interrupt_reactor()
      84              :     to re-evaluate the timeout.
      85              : */
      86              : 
      87              : namespace boost::corosio::detail {
      88              : 
      89              : namespace {
      90              : 
      91              : struct scheduler_context
      92              : {
      93              :     epoll_scheduler const* key;
      94              :     scheduler_context* next;
      95              :     op_queue private_queue;
      96              :     long private_outstanding_work;
      97              : 
      98          152 :     scheduler_context(epoll_scheduler const* k, scheduler_context* n)
      99          152 :         : key(k)
     100          152 :         , next(n)
     101          152 :         , private_outstanding_work(0)
     102              :     {
     103          152 :     }
     104              : };
     105              : 
     106              : corosio::detail::thread_local_ptr<scheduler_context> context_stack;
     107              : 
     108              : struct thread_context_guard
     109              : {
     110              :     scheduler_context frame_;
     111              : 
     112          152 :     explicit thread_context_guard(
     113              :         epoll_scheduler const* ctx) noexcept
     114          152 :         : frame_(ctx, context_stack.get())
     115              :     {
     116          152 :         context_stack.set(&frame_);
     117          152 :     }
     118              : 
     119          152 :     ~thread_context_guard() noexcept
     120              :     {
     121          152 :         if (!frame_.private_queue.empty())
     122            0 :             frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
     123          152 :         context_stack.set(frame_.next);
     124          152 :     }
     125              : };
     126              : 
     127              : scheduler_context*
     128       308076 : find_context(epoll_scheduler const* self) noexcept
     129              : {
     130       308076 :     for (auto* c = context_stack.get(); c != nullptr; c = c->next)
     131       306427 :         if (c->key == self)
     132       306427 :             return c;
     133         1649 :     return nullptr;
     134              : }
     135              : 
     136              : } // namespace
     137              : 
     138          189 : epoll_scheduler::
     139              : epoll_scheduler(
     140              :     capy::execution_context& ctx,
     141          189 :     int)
     142          189 :     : epoll_fd_(-1)
     143          189 :     , event_fd_(-1)
     144          189 :     , timer_fd_(-1)
     145          189 :     , outstanding_work_(0)
     146          189 :     , stopped_(false)
     147          189 :     , shutdown_(false)
     148          189 :     , reactor_running_(false)
     149          189 :     , reactor_interrupted_(false)
     150          378 :     , idle_thread_count_(0)
     151              : {
     152          189 :     epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
     153          189 :     if (epoll_fd_ < 0)
     154            0 :         detail::throw_system_error(make_err(errno), "epoll_create1");
     155              : 
     156          189 :     event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
     157          189 :     if (event_fd_ < 0)
     158              :     {
     159            0 :         int errn = errno;
     160            0 :         ::close(epoll_fd_);
     161            0 :         detail::throw_system_error(make_err(errn), "eventfd");
     162              :     }
     163              : 
     164          189 :     timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
     165          189 :     if (timer_fd_ < 0)
     166              :     {
     167            0 :         int errn = errno;
     168            0 :         ::close(event_fd_);
     169            0 :         ::close(epoll_fd_);
     170            0 :         detail::throw_system_error(make_err(errn), "timerfd_create");
     171              :     }
     172              : 
     173          189 :     epoll_event ev{};
     174          189 :     ev.events = EPOLLIN | EPOLLET;
     175          189 :     ev.data.ptr = nullptr;
     176          189 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
     177              :     {
     178            0 :         int errn = errno;
     179            0 :         ::close(timer_fd_);
     180            0 :         ::close(event_fd_);
     181            0 :         ::close(epoll_fd_);
     182            0 :         detail::throw_system_error(make_err(errn), "epoll_ctl");
     183              :     }
     184              : 
     185          189 :     epoll_event timer_ev{};
     186          189 :     timer_ev.events = EPOLLIN | EPOLLERR;
     187          189 :     timer_ev.data.ptr = &timer_fd_;
     188          189 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
     189              :     {
     190            0 :         int errn = errno;
     191            0 :         ::close(timer_fd_);
     192            0 :         ::close(event_fd_);
     193            0 :         ::close(epoll_fd_);
     194            0 :         detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
     195              :     }
     196              : 
     197          189 :     timer_svc_ = &get_timer_service(ctx, *this);
     198          189 :     timer_svc_->set_on_earliest_changed(
     199              :         timer_service::callback(
     200              :             this,
     201         2853 :             [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
     202              : 
     203              :     // Initialize resolver service
     204          189 :     get_resolver_service(ctx, *this);
     205              : 
     206              :     // Initialize signal service
     207          189 :     get_signal_service(ctx, *this);
     208              : 
     209              :     // Push task sentinel to interleave reactor runs with handler execution
     210          189 :     completed_ops_.push(&task_op_);
     211          189 : }
     212              : 
     213          378 : epoll_scheduler::
     214          189 : ~epoll_scheduler()
     215              : {
     216          189 :     if (timer_fd_ >= 0)
     217          189 :         ::close(timer_fd_);
     218          189 :     if (event_fd_ >= 0)
     219          189 :         ::close(event_fd_);
     220          189 :     if (epoll_fd_ >= 0)
     221          189 :         ::close(epoll_fd_);
     222          378 : }
     223              : 
     224              : void
     225          189 : epoll_scheduler::
     226              : shutdown()
     227              : {
     228              :     {
     229          189 :         std::unique_lock lock(mutex_);
     230          189 :         shutdown_ = true;
     231              : 
     232          378 :         while (auto* h = completed_ops_.pop())
     233              :         {
     234          189 :             if (h == &task_op_)
     235          189 :                 continue;
     236            0 :             lock.unlock();
     237            0 :             h->destroy();
     238            0 :             lock.lock();
     239          189 :         }
     240          189 :     }
     241              : 
     242          189 :     outstanding_work_.store(0, std::memory_order_release);
     243              : 
     244          189 :     if (event_fd_ >= 0)
     245          189 :         interrupt_reactor();
     246              : 
     247          189 :     wakeup_event_.notify_all();
     248          189 : }
     249              : 
     250              : void
     251         1639 : epoll_scheduler::
     252              : post(capy::coro h) const
     253              : {
     254              :     struct post_handler final
     255              :         : scheduler_op
     256              :     {
     257              :         capy::coro h_;
     258              : 
     259              :         explicit
     260         1639 :         post_handler(capy::coro h)
     261         1639 :             : h_(h)
     262              :         {
     263         1639 :         }
     264              : 
     265         3278 :         ~post_handler() = default;
     266              : 
     267         1639 :         void operator()() override
     268              :         {
     269         1639 :             auto h = h_;
     270         1639 :             delete this;
     271              :             std::atomic_thread_fence(std::memory_order_acquire);
     272         1639 :             h.resume();
     273         1639 :         }
     274              : 
     275            0 :         void destroy() override
     276              :         {
     277            0 :             delete this;
     278            0 :         }
     279              :     };
     280              : 
     281         1639 :     auto ph = std::make_unique<post_handler>(h);
     282              : 
     283              :     // Fast path: same thread posts to private queue without locking
     284         1639 :     if (auto* ctx = find_context(this))
     285              :     {
     286           16 :         outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     287           16 :         ++ctx->private_outstanding_work;
     288           16 :         ctx->private_queue.push(ph.release());
     289           16 :         return;
     290              :     }
     291              : 
     292              :     // Slow path: cross-thread post requires mutex
     293         1623 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     294              : 
     295         1623 :     std::unique_lock lock(mutex_);
     296         1623 :     completed_ops_.push(ph.release());
     297         1623 :     wake_one_thread_and_unlock(lock);
     298         1639 : }
     299              : 
     300              : void
     301       143376 : epoll_scheduler::
     302              : post(scheduler_op* h) const
     303              : {
     304              :     // Fast path: same thread posts to private queue without locking
     305       143376 :     if (auto* ctx = find_context(this))
     306              :     {
     307       143350 :         outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     308       143350 :         ++ctx->private_outstanding_work;
     309       143350 :         ctx->private_queue.push(h);
     310       143350 :         return;
     311              :     }
     312              : 
     313              :     // Slow path: cross-thread post requires mutex
     314           26 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     315              : 
     316           26 :     std::unique_lock lock(mutex_);
     317           26 :     completed_ops_.push(h);
     318           26 :     wake_one_thread_and_unlock(lock);
     319           26 : }
     320              : 
     321              : void
     322         2876 : epoll_scheduler::
     323              : on_work_started() noexcept
     324              : {
     325         2876 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     326         2876 : }
     327              : 
     328              : void
     329         2844 : epoll_scheduler::
     330              : on_work_finished() noexcept
     331              : {
     332         5688 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     333           18 :         stop();
     334         2844 : }
     335              : 
     336              : bool
     337         3089 : epoll_scheduler::
     338              : running_in_this_thread() const noexcept
     339              : {
     340         3089 :     for (auto* c = context_stack.get(); c != nullptr; c = c->next)
     341         2879 :         if (c->key == this)
     342         2879 :             return true;
     343          210 :     return false;
     344              : }
     345              : 
     346              : void
     347           37 : epoll_scheduler::
     348              : stop()
     349              : {
     350           37 :     bool expected = false;
     351           37 :     if (stopped_.compare_exchange_strong(expected, true,
     352              :             std::memory_order_release, std::memory_order_relaxed))
     353              :     {
     354              :         // Wake all threads so they notice stopped_ and exit
     355              :         {
     356           37 :             std::lock_guard lock(mutex_);
     357           37 :             wakeup_event_.notify_all();
     358           37 :         }
     359           37 :         interrupt_reactor();
     360              :     }
     361           37 : }
     362              : 
     363              : bool
     364           16 : epoll_scheduler::
     365              : stopped() const noexcept
     366              : {
     367           16 :     return stopped_.load(std::memory_order_acquire);
     368              : }
     369              : 
     370              : void
     371           49 : epoll_scheduler::
     372              : restart()
     373              : {
     374           49 :     stopped_.store(false, std::memory_order_release);
     375           49 : }
     376              : 
     377              : std::size_t
     378          175 : epoll_scheduler::
     379              : run()
     380              : {
     381          175 :     if (stopped_.load(std::memory_order_acquire))
     382           23 :         return 0;
     383              : 
     384          304 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     385              :     {
     386           11 :         stop();
     387           11 :         return 0;
     388              :     }
     389              : 
     390          141 :     thread_context_guard ctx(this);
     391              : 
     392          141 :     std::size_t n = 0;
     393       150371 :     while (do_one(-1))
     394       150230 :         if (n != (std::numeric_limits<std::size_t>::max)())
     395       150230 :             ++n;
     396          141 :     return n;
     397          141 : }
     398              : 
     399              : std::size_t
     400            2 : epoll_scheduler::
     401              : run_one()
     402              : {
     403            2 :     if (stopped_.load(std::memory_order_acquire))
     404            0 :         return 0;
     405              : 
     406            4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     407              :     {
     408            0 :         stop();
     409            0 :         return 0;
     410              :     }
     411              : 
     412            2 :     thread_context_guard ctx(this);
     413            2 :     return do_one(-1);
     414            2 : }
     415              : 
     416              : std::size_t
     417           10 : epoll_scheduler::
     418              : wait_one(long usec)
     419              : {
     420           10 :     if (stopped_.load(std::memory_order_acquire))
     421            0 :         return 0;
     422              : 
     423           20 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     424              :     {
     425            4 :         stop();
     426            4 :         return 0;
     427              :     }
     428              : 
     429            6 :     thread_context_guard ctx(this);
     430            6 :     return do_one(usec);
     431            6 : }
     432              : 
     433              : std::size_t
     434            2 : epoll_scheduler::
     435              : poll()
     436              : {
     437            2 :     if (stopped_.load(std::memory_order_acquire))
     438            0 :         return 0;
     439              : 
     440            4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     441              :     {
     442            1 :         stop();
     443            1 :         return 0;
     444              :     }
     445              : 
     446            1 :     thread_context_guard ctx(this);
     447              : 
     448            1 :     std::size_t n = 0;
     449            3 :     while (do_one(0))
     450            2 :         if (n != (std::numeric_limits<std::size_t>::max)())
     451            2 :             ++n;
     452            1 :     return n;
     453            1 : }
     454              : 
     455              : std::size_t
     456            4 : epoll_scheduler::
     457              : poll_one()
     458              : {
     459            4 :     if (stopped_.load(std::memory_order_acquire))
     460            0 :         return 0;
     461              : 
     462            8 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     463              :     {
     464            2 :         stop();
     465            2 :         return 0;
     466              :     }
     467              : 
     468            2 :     thread_context_guard ctx(this);
     469            2 :     return do_one(0);
     470            2 : }
     471              : 
     472              : void
     473         5293 : epoll_scheduler::
     474              : register_descriptor(int fd, descriptor_data* desc) const
     475              : {
     476         5293 :     epoll_event ev{};
     477         5293 :     ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
     478         5293 :     ev.data.ptr = desc;
     479              : 
     480         5293 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
     481            0 :         detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
     482              : 
     483         5293 :     desc->registered_events = ev.events;
     484         5293 :     desc->is_registered = true;
     485         5293 :     desc->fd = fd;
     486         5293 :     desc->read_ready.store(false, std::memory_order_relaxed);
     487         5293 :     desc->write_ready.store(false, std::memory_order_relaxed);
     488         5293 : }
     489              : 
     490              : void
     491            0 : epoll_scheduler::
     492              : update_descriptor_events(int, descriptor_data*, std::uint32_t) const
     493              : {
     494              :     // Provides memory fence for operation pointer visibility across threads
     495              :     std::atomic_thread_fence(std::memory_order_seq_cst);
     496            0 : }
     497              : 
     498              : void
     499         5293 : epoll_scheduler::
     500              : deregister_descriptor(int fd) const
     501              : {
     502         5293 :     ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
     503         5293 : }
     504              : 
     505              : void
     506         5396 : epoll_scheduler::
     507              : work_started() const noexcept
     508              : {
     509         5396 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     510         5396 : }
     511              : 
     512              : void
     513       150443 : epoll_scheduler::
     514              : work_finished() const noexcept
     515              : {
     516       300886 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     517              :     {
     518              :         // Last work item completed - wake all threads so they can exit.
     519              :         // notify_all() wakes threads waiting on the condvar.
     520              :         // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
     521              :         // Both are needed because they target different blocking mechanisms.
     522          130 :         std::unique_lock lock(mutex_);
     523          130 :         wakeup_event_.notify_all();
     524          130 :         if (reactor_running_ && !reactor_interrupted_)
     525              :         {
     526            0 :             reactor_interrupted_ = true;
     527            0 :             lock.unlock();
     528            0 :             interrupt_reactor();
     529              :         }
     530          130 :     }
     531       150443 : }
     532              : 
     533              : void
     534            0 : epoll_scheduler::
     535              : drain_thread_queue(op_queue& queue, long count) const
     536              : {
     537            0 :     std::lock_guard lock(mutex_);
     538              :     // Note: outstanding_work_ was already incremented when posting
     539            0 :     completed_ops_.splice(queue);
     540            0 :     if (count > 0)
     541            0 :         wakeup_event_.notify_all();
     542            0 : }
     543              : 
     544              : void
     545          252 : epoll_scheduler::
     546              : interrupt_reactor() const
     547              : {
     548              :     // Only write if not already armed to avoid redundant writes
     549          252 :     bool expected = false;
     550          252 :     if (eventfd_armed_.compare_exchange_strong(expected, true,
     551              :             std::memory_order_release, std::memory_order_relaxed))
     552              :     {
     553          221 :         std::uint64_t val = 1;
     554          221 :         [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
     555              :     }
     556          252 : }
     557              : 
     558              : void
     559         1649 : epoll_scheduler::
     560              : wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
     561              : {
     562         1649 :     if (idle_thread_count_ > 0)
     563              :     {
     564            0 :         wakeup_event_.notify_one();
     565            0 :         lock.unlock();
     566              :     }
     567         1649 :     else if (reactor_running_ && !reactor_interrupted_)
     568              :     {
     569           26 :         reactor_interrupted_ = true;
     570           26 :         lock.unlock();
     571           26 :         interrupt_reactor();
     572              :     }
     573              :     else
     574              :     {
     575         1623 :         lock.unlock();
     576              :     }
     577         1649 : }
     578              : 
     579              : struct work_guard
     580              : {
     581              :     epoll_scheduler const* self;
     582            0 :     ~work_guard() { self->work_finished(); }
     583              : };
     584              : 
     585              : void
     586         5693 : epoll_scheduler::
     587              : update_timerfd() const
     588              : {
     589         5693 :     auto nearest = timer_svc_->nearest_expiry();
     590              : 
     591         5693 :     itimerspec ts{};
     592         5693 :     int flags = 0;
     593              : 
     594         5693 :     if (nearest == timer_service::time_point::max())
     595              :     {
     596              :         // No timers - disarm by setting to 0 (relative)
     597              :     }
     598              :     else
     599              :     {
     600         5653 :         auto now = std::chrono::steady_clock::now();
     601         5653 :         if (nearest <= now)
     602              :         {
     603              :             // Use 1ns instead of 0 - zero disarms the timerfd
     604           71 :             ts.it_value.tv_nsec = 1;
     605              :         }
     606              :         else
     607              :         {
     608         5582 :             auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
     609        11164 :                 nearest - now).count();
     610         5582 :             ts.it_value.tv_sec = nsec / 1000000000;
     611         5582 :             ts.it_value.tv_nsec = nsec % 1000000000;
     612              :             // Ensure non-zero to avoid disarming if duration rounds to 0
     613         5582 :             if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
     614            0 :                 ts.it_value.tv_nsec = 1;
     615              :         }
     616              :     }
     617              : 
     618         5693 :     if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
     619            0 :         detail::throw_system_error(make_err(errno), "timerfd_settime");
     620         5693 : }
     621              : 
     622              : void
     623        81468 : epoll_scheduler::
     624              : run_reactor(std::unique_lock<std::mutex>& lock)
     625              : {
     626        81468 :     int timeout_ms = reactor_interrupted_ ? 0 : -1;
     627              : 
     628        81468 :     lock.unlock();
     629              : 
     630              :     // --- Event loop runs WITHOUT the mutex (like Asio) ---
     631              : 
     632              :     epoll_event events[128];
     633        81468 :     int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
     634        81468 :     int saved_errno = errno;
     635              : 
     636        81468 :     if (nfds < 0 && saved_errno != EINTR)
     637            0 :         detail::throw_system_error(make_err(saved_errno), "epoll_wait");
     638              : 
     639        81468 :     bool check_timers = false;
     640        81468 :     op_queue local_ops;
     641        81468 :     int completions_queued = 0;
     642              : 
     643              :     // Process events without holding the mutex
     644       161249 :     for (int i = 0; i < nfds; ++i)
     645              :     {
     646        79781 :         if (events[i].data.ptr == nullptr)
     647              :         {
     648              :             std::uint64_t val;
     649           32 :             [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
     650           32 :             eventfd_armed_.store(false, std::memory_order_relaxed);
     651           32 :             continue;
     652           32 :         }
     653              : 
     654        79749 :         if (events[i].data.ptr == &timer_fd_)
     655              :         {
     656              :             std::uint64_t expirations;
     657         2840 :             [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
     658         2840 :             check_timers = true;
     659         2840 :             continue;
     660         2840 :         }
     661              : 
     662        76909 :         auto* desc = static_cast<descriptor_data*>(events[i].data.ptr);
     663        76909 :         std::uint32_t ev = events[i].events;
     664        76909 :         int err = 0;
     665              : 
     666        76909 :         if (ev & (EPOLLERR | EPOLLHUP))
     667              :         {
     668           47 :             socklen_t len = sizeof(err);
     669           47 :             if (::getsockopt(desc->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     670            0 :                 err = errno;
     671           47 :             if (err == 0)
     672           46 :                 err = EIO;
     673              :         }
     674              : 
     675        76909 :         if (ev & EPOLLIN)
     676              :         {
     677        35745 :             auto* op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
     678        35745 :             if (op)
     679              :             {
     680         2658 :                 if (err)
     681              :                 {
     682            0 :                     op->complete(err, 0);
     683            0 :                     local_ops.push(op);
     684            0 :                     ++completions_queued;
     685              :                 }
     686              :                 else
     687              :                 {
     688         2658 :                     op->perform_io();
     689         2658 :                     if (op->errn == EAGAIN || op->errn == EWOULDBLOCK)
     690              :                     {
     691            0 :                         op->errn = 0;
     692            0 :                         desc->read_op.store(op, std::memory_order_release);
     693              :                     }
     694              :                     else
     695              :                     {
     696         2658 :                         local_ops.push(op);
     697         2658 :                         ++completions_queued;
     698              :                     }
     699              :                 }
     700              :             }
     701              :             else
     702              :             {
     703        33087 :                 desc->read_ready.store(true, std::memory_order_release);
     704              :             }
     705              :         }
     706              : 
     707        76909 :         if (ev & EPOLLOUT)
     708              :         {
     709        74302 :             auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
     710        74302 :             if (conn_op)
     711              :             {
     712         2568 :                 if (err)
     713              :                 {
     714            1 :                     conn_op->complete(err, 0);
     715            1 :                     local_ops.push(conn_op);
     716            1 :                     ++completions_queued;
     717              :                 }
     718              :                 else
     719              :                 {
     720         2567 :                     conn_op->perform_io();
     721         2567 :                     if (conn_op->errn == EAGAIN || conn_op->errn == EWOULDBLOCK)
     722              :                     {
     723            0 :                         conn_op->errn = 0;
     724            0 :                         desc->connect_op.store(conn_op, std::memory_order_release);
     725              :                     }
     726              :                     else
     727              :                     {
     728         2567 :                         local_ops.push(conn_op);
     729         2567 :                         ++completions_queued;
     730              :                     }
     731              :                 }
     732              :             }
     733              : 
     734        74302 :             auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
     735        74302 :             if (write_op)
     736              :             {
     737            0 :                 if (err)
     738              :                 {
     739            0 :                     write_op->complete(err, 0);
     740            0 :                     local_ops.push(write_op);
     741            0 :                     ++completions_queued;
     742              :                 }
     743              :                 else
     744              :                 {
     745            0 :                     write_op->perform_io();
     746            0 :                     if (write_op->errn == EAGAIN || write_op->errn == EWOULDBLOCK)
     747              :                     {
     748            0 :                         write_op->errn = 0;
     749            0 :                         desc->write_op.store(write_op, std::memory_order_release);
     750              :                     }
     751              :                     else
     752              :                     {
     753            0 :                         local_ops.push(write_op);
     754            0 :                         ++completions_queued;
     755              :                     }
     756              :                 }
     757              :             }
     758              : 
     759        74302 :             if (!conn_op && !write_op)
     760        71734 :                 desc->write_ready.store(true, std::memory_order_release);
     761              :         }
     762              : 
     763        76909 :         if (err && !(ev & (EPOLLIN | EPOLLOUT)))
     764              :         {
     765            0 :             auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
     766            0 :             if (read_op)
     767              :             {
     768            0 :                 read_op->complete(err, 0);
     769            0 :                 local_ops.push(read_op);
     770            0 :                 ++completions_queued;
     771              :             }
     772              : 
     773            0 :             auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
     774            0 :             if (write_op)
     775              :             {
     776            0 :                 write_op->complete(err, 0);
     777            0 :                 local_ops.push(write_op);
     778            0 :                 ++completions_queued;
     779              :             }
     780              : 
     781            0 :             auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
     782            0 :             if (conn_op)
     783              :             {
     784            0 :                 conn_op->complete(err, 0);
     785            0 :                 local_ops.push(conn_op);
     786            0 :                 ++completions_queued;
     787              :             }
     788              :         }
     789              :     }
     790              : 
     791              :     // Process timers only when timerfd fires (like Asio's check_timers pattern)
     792        81468 :     if (check_timers)
     793              :     {
     794         2840 :         timer_svc_->process_expired();
     795         2840 :         update_timerfd();
     796              :     }
     797              : 
     798              :     // --- Acquire mutex only for queue operations ---
     799        81468 :     lock.lock();
     800              : 
     801        81468 :     if (!local_ops.empty())
     802         2659 :         completed_ops_.splice(local_ops);
     803              : 
     804              :     // Drain private queue (outstanding_work_ was already incremented when posting)
     805        81468 :     if (auto* ctx = find_context(this))
     806              :     {
     807        81468 :         if (!ctx->private_queue.empty())
     808              :         {
     809        76056 :             completions_queued += ctx->private_outstanding_work;
     810        76056 :             ctx->private_outstanding_work = 0;
     811        76056 :             completed_ops_.splice(ctx->private_queue);
     812              :         }
     813              :     }
     814              : 
     815              :     // Only wake threads that are actually idle, and only as many as we have work
     816        81468 :     if (completions_queued > 0 && idle_thread_count_ > 0)
     817              :     {
     818            0 :         int threads_to_wake = (std::min)(completions_queued, idle_thread_count_);
     819            0 :         for (int i = 0; i < threads_to_wake; ++i)
     820            0 :             wakeup_event_.notify_one();
     821              :     }
     822        81468 : }
     823              : 
     824              : std::size_t
     825       150384 : epoll_scheduler::
     826              : do_one(long timeout_us)
     827              : {
     828       150384 :     std::unique_lock lock(mutex_);
     829              : 
     830              :     for (;;)
     831              :     {
     832       231852 :         if (stopped_.load(std::memory_order_acquire))
     833           18 :             return 0;
     834              : 
     835       231834 :         scheduler_op* op = completed_ops_.pop();
     836              : 
     837       231834 :         if (op == &task_op_)
     838              :         {
     839              :             // Check both global queue and private queue for pending handlers
     840        81593 :             auto* ctx = find_context(this);
     841       163038 :             bool more_handlers = !completed_ops_.empty() ||
     842        81445 :                 (ctx && !ctx->private_queue.empty());
     843              : 
     844        81593 :             if (!more_handlers)
     845              :             {
     846        11208 :                 if (outstanding_work_.load(std::memory_order_acquire) == 0)
     847              :                 {
     848          125 :                     completed_ops_.push(&task_op_);
     849          125 :                     return 0;
     850              :                 }
     851         5479 :                 if (timeout_us == 0)
     852              :                 {
     853            0 :                     completed_ops_.push(&task_op_);
     854            0 :                     return 0;
     855              :                 }
     856              :             }
     857              : 
     858        81468 :             reactor_interrupted_ = more_handlers || timeout_us == 0;
     859        81468 :             reactor_running_ = true;
     860              : 
     861        81468 :             if (more_handlers && idle_thread_count_ > 0)
     862            0 :                 wakeup_event_.notify_one();
     863              : 
     864        81468 :             run_reactor(lock);
     865              : 
     866        81468 :             reactor_running_ = false;
     867        81468 :             completed_ops_.push(&task_op_);
     868        81468 :             continue;
     869        81468 :         }
     870              : 
     871       150241 :         if (op != nullptr)
     872              :         {
     873       150241 :             lock.unlock();
     874       150241 :             work_guard g{this};
     875       150241 :             (*op)();
     876       150241 :             return 1;
     877       150241 :         }
     878              : 
     879            0 :         if (outstanding_work_.load(std::memory_order_acquire) == 0)
     880            0 :             return 0;
     881              : 
     882            0 :         if (timeout_us == 0)
     883            0 :             return 0;
     884              : 
     885              :         // Drain private queue before blocking (outstanding_work_ was already incremented)
     886            0 :         if (auto* ctx = find_context(this))
     887              :         {
     888            0 :             if (!ctx->private_queue.empty())
     889              :             {
     890            0 :                 ctx->private_outstanding_work = 0;
     891            0 :                 completed_ops_.splice(ctx->private_queue);
     892            0 :                 continue;
     893              :             }
     894              :         }
     895              : 
     896            0 :         ++idle_thread_count_;
     897            0 :         if (timeout_us < 0)
     898            0 :             wakeup_event_.wait(lock);
     899              :         else
     900            0 :             wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
     901            0 :         --idle_thread_count_;
     902        81468 :     }
     903       150384 : }
     904              : 
     905              : } // namespace boost::corosio::detail
     906              : 
     907              : #endif
        

Generated by: LCOV version 2.3