libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

76.1% Lines (325/427) 89.2% Functions (33/37) 64.6% Branches (170/263)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on wakeup_event_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The reactor_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Wake Coordination (wake_one_thread_and_unlock)
64 ----------------------------------------------
65 When posting work:
66 - If idle threads exist: notify_one() wakes exactly one worker
67 - Else if reactor running: interrupt via eventfd write
68 - Else: no-op (thread will find work when it checks queue)
69
70 This is critical for matching IOCP behavior. With the old model, posting
71 N handlers would wake all threads (thundering herd). Now each post()
72 wakes at most one thread, and that thread handles exactly one item.
73
74 Work Counting
75 -------------
76 outstanding_work_ tracks pending operations. When it hits zero, run()
77 returns. Each operation increments on start, decrements on completion.
78
79 Timer Integration
80 -----------------
81 Timers are handled by timer_service. The reactor adjusts epoll_wait
82 timeout to wake for the nearest timer expiry. When a new timer is
83 scheduled earlier than current, timer_service calls interrupt_reactor()
84 to re-evaluate the timeout.
85 */
86
87 namespace boost::corosio::detail {
88
89 namespace {
90
91 struct scheduler_context
92 {
93 epoll_scheduler const* key;
94 scheduler_context* next;
95 op_queue private_queue;
96 long private_outstanding_work;
97
98 152 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
99 152 : key(k)
100 152 , next(n)
101 152 , private_outstanding_work(0)
102 {
103 152 }
104 };
105
106 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
107
108 struct thread_context_guard
109 {
110 scheduler_context frame_;
111
112 152 explicit thread_context_guard(
113 epoll_scheduler const* ctx) noexcept
114 152 : frame_(ctx, context_stack.get())
115 {
116 152 context_stack.set(&frame_);
117 152 }
118
119 152 ~thread_context_guard() noexcept
120 {
121
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 152 times.
152 if (!frame_.private_queue.empty())
122 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
123 152 context_stack.set(frame_.next);
124 152 }
125 };
126
127 scheduler_context*
128 308076 find_context(epoll_scheduler const* self) noexcept
129 {
130
2/2
✓ Branch 1 taken 306427 times.
✓ Branch 2 taken 1649 times.
308076 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
131
1/2
✓ Branch 0 taken 306427 times.
✗ Branch 1 not taken.
306427 if (c->key == self)
132 306427 return c;
133 1649 return nullptr;
134 }
135
136 } // namespace
137
138 189 epoll_scheduler::
139 epoll_scheduler(
140 capy::execution_context& ctx,
141 189 int)
142 189 : epoll_fd_(-1)
143 189 , event_fd_(-1)
144 189 , timer_fd_(-1)
145 189 , outstanding_work_(0)
146 189 , stopped_(false)
147 189 , shutdown_(false)
148 189 , reactor_running_(false)
149 189 , reactor_interrupted_(false)
150 378 , idle_thread_count_(0)
151 {
152 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
153
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
154 detail::throw_system_error(make_err(errno), "epoll_create1");
155
156 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
157
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
158 {
159 int errn = errno;
160 ::close(epoll_fd_);
161 detail::throw_system_error(make_err(errn), "eventfd");
162 }
163
164 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
165
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
166 {
167 int errn = errno;
168 ::close(event_fd_);
169 ::close(epoll_fd_);
170 detail::throw_system_error(make_err(errn), "timerfd_create");
171 }
172
173 189 epoll_event ev{};
174 189 ev.events = EPOLLIN | EPOLLET;
175 189 ev.data.ptr = nullptr;
176
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
177 {
178 int errn = errno;
179 ::close(timer_fd_);
180 ::close(event_fd_);
181 ::close(epoll_fd_);
182 detail::throw_system_error(make_err(errn), "epoll_ctl");
183 }
184
185 189 epoll_event timer_ev{};
186 189 timer_ev.events = EPOLLIN | EPOLLERR;
187 189 timer_ev.data.ptr = &timer_fd_;
188
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
189 {
190 int errn = errno;
191 ::close(timer_fd_);
192 ::close(event_fd_);
193 ::close(epoll_fd_);
194 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
195 }
196
197
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
198
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
199 timer_service::callback(
200 this,
201 2853 [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
202
203 // Initialize resolver service
204
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
205
206 // Initialize signal service
207
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
208
209 // Push task sentinel to interleave reactor runs with handler execution
210 189 completed_ops_.push(&task_op_);
211 189 }
212
213 378 epoll_scheduler::
214 189 ~epoll_scheduler()
215 {
216
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
217 189 ::close(timer_fd_);
218
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
219 189 ::close(event_fd_);
220
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
221 189 ::close(epoll_fd_);
222 378 }
223
224 void
225 189 epoll_scheduler::
226 shutdown()
227 {
228 {
229
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
230 189 shutdown_ = true;
231
232
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
233 {
234
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
235 189 continue;
236 lock.unlock();
237 h->destroy();
238 lock.lock();
239 189 }
240 189 }
241
242 189 outstanding_work_.store(0, std::memory_order_release);
243
244
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
245 189 interrupt_reactor();
246
247 189 wakeup_event_.notify_all();
248 189 }
249
250 void
251 1639 epoll_scheduler::
252 post(capy::coro h) const
253 {
254 struct post_handler final
255 : scheduler_op
256 {
257 capy::coro h_;
258
259 explicit
260 1639 post_handler(capy::coro h)
261 1639 : h_(h)
262 {
263 1639 }
264
265 3278 ~post_handler() = default;
266
267 1639 void operator()() override
268 {
269 1639 auto h = h_;
270
1/2
✓ Branch 0 taken 1639 times.
✗ Branch 1 not taken.
1639 delete this;
271 std::atomic_thread_fence(std::memory_order_acquire);
272
1/1
✓ Branch 1 taken 1639 times.
1639 h.resume();
273 1639 }
274
275 void destroy() override
276 {
277 delete this;
278 }
279 };
280
281
1/1
✓ Branch 1 taken 1639 times.
1639 auto ph = std::make_unique<post_handler>(h);
282
283 // Fast path: same thread posts to private queue without locking
284
2/2
✓ Branch 1 taken 16 times.
✓ Branch 2 taken 1623 times.
1639 if (auto* ctx = find_context(this))
285 {
286 16 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
287 16 ++ctx->private_outstanding_work;
288 16 ctx->private_queue.push(ph.release());
289 16 return;
290 }
291
292 // Slow path: cross-thread post requires mutex
293 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
294
295
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
296 1623 completed_ops_.push(ph.release());
297
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
298 1639 }
299
300 void
301 143376 epoll_scheduler::
302 post(scheduler_op* h) const
303 {
304 // Fast path: same thread posts to private queue without locking
305
2/2
✓ Branch 1 taken 143350 times.
✓ Branch 2 taken 26 times.
143376 if (auto* ctx = find_context(this))
306 {
307 143350 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
308 143350 ++ctx->private_outstanding_work;
309 143350 ctx->private_queue.push(h);
310 143350 return;
311 }
312
313 // Slow path: cross-thread post requires mutex
314 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
315
316
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
317 26 completed_ops_.push(h);
318
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
319 26 }
320
321 void
322 2876 epoll_scheduler::
323 on_work_started() noexcept
324 {
325 2876 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
326 2876 }
327
328 void
329 2844 epoll_scheduler::
330 on_work_finished() noexcept
331 {
332
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 2826 times.
5688 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
333 18 stop();
334 2844 }
335
336 bool
337 3089 epoll_scheduler::
338 running_in_this_thread() const noexcept
339 {
340
2/2
✓ Branch 1 taken 2879 times.
✓ Branch 2 taken 210 times.
3089 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
341
1/2
✓ Branch 0 taken 2879 times.
✗ Branch 1 not taken.
2879 if (c->key == this)
342 2879 return true;
343 210 return false;
344 }
345
346 void
347 37 epoll_scheduler::
348 stop()
349 {
350 37 bool expected = false;
351
1/2
✓ Branch 1 taken 37 times.
✗ Branch 2 not taken.
37 if (stopped_.compare_exchange_strong(expected, true,
352 std::memory_order_release, std::memory_order_relaxed))
353 {
354 // Wake all threads so they notice stopped_ and exit
355 {
356
1/1
✓ Branch 1 taken 37 times.
37 std::lock_guard lock(mutex_);
357 37 wakeup_event_.notify_all();
358 37 }
359
1/1
✓ Branch 1 taken 37 times.
37 interrupt_reactor();
360 }
361 37 }
362
363 bool
364 16 epoll_scheduler::
365 stopped() const noexcept
366 {
367 16 return stopped_.load(std::memory_order_acquire);
368 }
369
370 void
371 49 epoll_scheduler::
372 restart()
373 {
374 49 stopped_.store(false, std::memory_order_release);
375 49 }
376
377 std::size_t
378 175 epoll_scheduler::
379 run()
380 {
381
2/2
✓ Branch 1 taken 23 times.
✓ Branch 2 taken 152 times.
175 if (stopped_.load(std::memory_order_acquire))
382 23 return 0;
383
384
2/2
✓ Branch 1 taken 11 times.
✓ Branch 2 taken 141 times.
304 if (outstanding_work_.load(std::memory_order_acquire) == 0)
385 {
386
1/1
✓ Branch 1 taken 11 times.
11 stop();
387 11 return 0;
388 }
389
390 141 thread_context_guard ctx(this);
391
392 141 std::size_t n = 0;
393
3/3
✓ Branch 1 taken 150371 times.
✓ Branch 3 taken 150230 times.
✓ Branch 4 taken 141 times.
150371 while (do_one(-1))
394
1/2
✓ Branch 1 taken 150230 times.
✗ Branch 2 not taken.
150230 if (n != (std::numeric_limits<std::size_t>::max)())
395 150230 ++n;
396 141 return n;
397 141 }
398
399 std::size_t
400 2 epoll_scheduler::
401 run_one()
402 {
403
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (stopped_.load(std::memory_order_acquire))
404 return 0;
405
406
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
407 {
408 stop();
409 return 0;
410 }
411
412 2 thread_context_guard ctx(this);
413
1/1
✓ Branch 1 taken 2 times.
2 return do_one(-1);
414 2 }
415
416 std::size_t
417 10 epoll_scheduler::
418 wait_one(long usec)
419 {
420
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
10 if (stopped_.load(std::memory_order_acquire))
421 return 0;
422
423
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 6 times.
20 if (outstanding_work_.load(std::memory_order_acquire) == 0)
424 {
425
1/1
✓ Branch 1 taken 4 times.
4 stop();
426 4 return 0;
427 }
428
429 6 thread_context_guard ctx(this);
430
1/1
✓ Branch 1 taken 6 times.
6 return do_one(usec);
431 6 }
432
433 std::size_t
434 2 epoll_scheduler::
435 poll()
436 {
437
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (stopped_.load(std::memory_order_acquire))
438 return 0;
439
440
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
441 {
442
1/1
✓ Branch 1 taken 1 time.
1 stop();
443 1 return 0;
444 }
445
446 1 thread_context_guard ctx(this);
447
448 1 std::size_t n = 0;
449
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 1 time.
3 while (do_one(0))
450
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
451 2 ++n;
452 1 return n;
453 1 }
454
455 std::size_t
456 4 epoll_scheduler::
457 poll_one()
458 {
459
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (stopped_.load(std::memory_order_acquire))
460 return 0;
461
462
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
463 {
464
1/1
✓ Branch 1 taken 2 times.
2 stop();
465 2 return 0;
466 }
467
468 2 thread_context_guard ctx(this);
469
1/1
✓ Branch 1 taken 2 times.
2 return do_one(0);
470 2 }
471
472 void
473 5293 epoll_scheduler::
474 register_descriptor(int fd, descriptor_data* desc) const
475 {
476 5293 epoll_event ev{};
477 5293 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
478 5293 ev.data.ptr = desc;
479
480
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5293 times.
5293 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
481 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
482
483 5293 desc->registered_events = ev.events;
484 5293 desc->is_registered = true;
485 5293 desc->fd = fd;
486 5293 desc->read_ready.store(false, std::memory_order_relaxed);
487 5293 desc->write_ready.store(false, std::memory_order_relaxed);
488 5293 }
489
490 void
491 epoll_scheduler::
492 update_descriptor_events(int, descriptor_data*, std::uint32_t) const
493 {
494 // Provides memory fence for operation pointer visibility across threads
495 std::atomic_thread_fence(std::memory_order_seq_cst);
496 }
497
498 void
499 5293 epoll_scheduler::
500 deregister_descriptor(int fd) const
501 {
502 5293 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
503 5293 }
504
505 void
506 5396 epoll_scheduler::
507 work_started() const noexcept
508 {
509 5396 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
510 5396 }
511
512 void
513 150443 epoll_scheduler::
514 work_finished() const noexcept
515 {
516
2/2
✓ Branch 0 taken 130 times.
✓ Branch 1 taken 150313 times.
300886 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
517 {
518 // Last work item completed - wake all threads so they can exit.
519 // notify_all() wakes threads waiting on the condvar.
520 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
521 // Both are needed because they target different blocking mechanisms.
522 130 std::unique_lock lock(mutex_);
523 130 wakeup_event_.notify_all();
524
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
130 if (reactor_running_ && !reactor_interrupted_)
525 {
526 reactor_interrupted_ = true;
527 lock.unlock();
528 interrupt_reactor();
529 }
530 130 }
531 150443 }
532
533 void
534 epoll_scheduler::
535 drain_thread_queue(op_queue& queue, long count) const
536 {
537 std::lock_guard lock(mutex_);
538 // Note: outstanding_work_ was already incremented when posting
539 completed_ops_.splice(queue);
540 if (count > 0)
541 wakeup_event_.notify_all();
542 }
543
544 void
545 252 epoll_scheduler::
546 interrupt_reactor() const
547 {
548 // Only write if not already armed to avoid redundant writes
549 252 bool expected = false;
550
2/2
✓ Branch 1 taken 221 times.
✓ Branch 2 taken 31 times.
252 if (eventfd_armed_.compare_exchange_strong(expected, true,
551 std::memory_order_release, std::memory_order_relaxed))
552 {
553 221 std::uint64_t val = 1;
554
1/1
✓ Branch 1 taken 221 times.
221 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
555 }
556 252 }
557
558 void
559 1649 epoll_scheduler::
560 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
561 {
562
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (idle_thread_count_ > 0)
563 {
564 wakeup_event_.notify_one();
565 lock.unlock();
566 }
567
3/4
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 1623 times.
✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
1649 else if (reactor_running_ && !reactor_interrupted_)
568 {
569 26 reactor_interrupted_ = true;
570 26 lock.unlock();
571 26 interrupt_reactor();
572 }
573 else
574 {
575 1623 lock.unlock();
576 }
577 1649 }
578
579 struct work_guard
580 {
581 epoll_scheduler const* self;
582 ~work_guard() { self->work_finished(); }
583 };
584
585 void
586 5693 epoll_scheduler::
587 update_timerfd() const
588 {
589 5693 auto nearest = timer_svc_->nearest_expiry();
590
591 5693 itimerspec ts{};
592 5693 int flags = 0;
593
594
3/3
✓ Branch 2 taken 5693 times.
✓ Branch 4 taken 5653 times.
✓ Branch 5 taken 40 times.
5693 if (nearest == timer_service::time_point::max())
595 {
596 // No timers - disarm by setting to 0 (relative)
597 }
598 else
599 {
600 5653 auto now = std::chrono::steady_clock::now();
601
3/3
✓ Branch 1 taken 5653 times.
✓ Branch 4 taken 71 times.
✓ Branch 5 taken 5582 times.
5653 if (nearest <= now)
602 {
603 // Use 1ns instead of 0 - zero disarms the timerfd
604 71 ts.it_value.tv_nsec = 1;
605 }
606 else
607 {
608 5582 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
609
1/1
✓ Branch 1 taken 5582 times.
11164 nearest - now).count();
610 5582 ts.it_value.tv_sec = nsec / 1000000000;
611 5582 ts.it_value.tv_nsec = nsec % 1000000000;
612 // Ensure non-zero to avoid disarming if duration rounds to 0
613
3/4
✓ Branch 0 taken 5572 times.
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5572 times.
5582 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
614 ts.it_value.tv_nsec = 1;
615 }
616 }
617
618
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5693 times.
5693 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
619 detail::throw_system_error(make_err(errno), "timerfd_settime");
620 5693 }
621
622 void
623 81468 epoll_scheduler::
624 run_reactor(std::unique_lock<std::mutex>& lock)
625 {
626
2/2
✓ Branch 0 taken 75989 times.
✓ Branch 1 taken 5479 times.
81468 int timeout_ms = reactor_interrupted_ ? 0 : -1;
627
628
1/1
✓ Branch 1 taken 81468 times.
81468 lock.unlock();
629
630 // --- Event loop runs WITHOUT the mutex (like Asio) ---
631
632 epoll_event events[128];
633
1/1
✓ Branch 1 taken 81468 times.
81468 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
634 81468 int saved_errno = errno;
635
636
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 81468 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
81468 if (nfds < 0 && saved_errno != EINTR)
637 detail::throw_system_error(make_err(saved_errno), "epoll_wait");
638
639 81468 bool check_timers = false;
640 81468 op_queue local_ops;
641 81468 int completions_queued = 0;
642
643 // Process events without holding the mutex
644
2/2
✓ Branch 0 taken 79781 times.
✓ Branch 1 taken 81468 times.
161249 for (int i = 0; i < nfds; ++i)
645 {
646
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 79749 times.
79781 if (events[i].data.ptr == nullptr)
647 {
648 std::uint64_t val;
649
1/1
✓ Branch 1 taken 32 times.
32 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
650 32 eventfd_armed_.store(false, std::memory_order_relaxed);
651 32 continue;
652 32 }
653
654
2/2
✓ Branch 0 taken 2840 times.
✓ Branch 1 taken 76909 times.
79749 if (events[i].data.ptr == &timer_fd_)
655 {
656 std::uint64_t expirations;
657
1/1
✓ Branch 1 taken 2840 times.
2840 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
658 2840 check_timers = true;
659 2840 continue;
660 2840 }
661
662 76909 auto* desc = static_cast<descriptor_data*>(events[i].data.ptr);
663 76909 std::uint32_t ev = events[i].events;
664 76909 int err = 0;
665
666
2/2
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 76862 times.
76909 if (ev & (EPOLLERR | EPOLLHUP))
667 {
668 47 socklen_t len = sizeof(err);
669
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 47 times.
47 if (::getsockopt(desc->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
670 err = errno;
671
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 1 time.
47 if (err == 0)
672 46 err = EIO;
673 }
674
675
2/2
✓ Branch 0 taken 35745 times.
✓ Branch 1 taken 41164 times.
76909 if (ev & EPOLLIN)
676 {
677 35745 auto* op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
678
2/2
✓ Branch 0 taken 2658 times.
✓ Branch 1 taken 33087 times.
35745 if (op)
679 {
680
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2658 times.
2658 if (err)
681 {
682 op->complete(err, 0);
683 local_ops.push(op);
684 ++completions_queued;
685 }
686 else
687 {
688 2658 op->perform_io();
689
2/4
✓ Branch 0 taken 2658 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2658 times.
2658 if (op->errn == EAGAIN || op->errn == EWOULDBLOCK)
690 {
691 op->errn = 0;
692 desc->read_op.store(op, std::memory_order_release);
693 }
694 else
695 {
696 2658 local_ops.push(op);
697 2658 ++completions_queued;
698 }
699 }
700 }
701 else
702 {
703 33087 desc->read_ready.store(true, std::memory_order_release);
704 }
705 }
706
707
2/2
✓ Branch 0 taken 74302 times.
✓ Branch 1 taken 2607 times.
76909 if (ev & EPOLLOUT)
708 {
709 74302 auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
710
2/2
✓ Branch 0 taken 2568 times.
✓ Branch 1 taken 71734 times.
74302 if (conn_op)
711 {
712
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2567 times.
2568 if (err)
713 {
714 1 conn_op->complete(err, 0);
715 1 local_ops.push(conn_op);
716 1 ++completions_queued;
717 }
718 else
719 {
720 2567 conn_op->perform_io();
721
2/4
✓ Branch 0 taken 2567 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2567 times.
2567 if (conn_op->errn == EAGAIN || conn_op->errn == EWOULDBLOCK)
722 {
723 conn_op->errn = 0;
724 desc->connect_op.store(conn_op, std::memory_order_release);
725 }
726 else
727 {
728 2567 local_ops.push(conn_op);
729 2567 ++completions_queued;
730 }
731 }
732 }
733
734 74302 auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
735
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 74302 times.
74302 if (write_op)
736 {
737 if (err)
738 {
739 write_op->complete(err, 0);
740 local_ops.push(write_op);
741 ++completions_queued;
742 }
743 else
744 {
745 write_op->perform_io();
746 if (write_op->errn == EAGAIN || write_op->errn == EWOULDBLOCK)
747 {
748 write_op->errn = 0;
749 desc->write_op.store(write_op, std::memory_order_release);
750 }
751 else
752 {
753 local_ops.push(write_op);
754 ++completions_queued;
755 }
756 }
757 }
758
759
3/4
✓ Branch 0 taken 71734 times.
✓ Branch 1 taken 2568 times.
✓ Branch 2 taken 71734 times.
✗ Branch 3 not taken.
74302 if (!conn_op && !write_op)
760 71734 desc->write_ready.store(true, std::memory_order_release);
761 }
762
763
3/4
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 76862 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 47 times.
76909 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
764 {
765 auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
766 if (read_op)
767 {
768 read_op->complete(err, 0);
769 local_ops.push(read_op);
770 ++completions_queued;
771 }
772
773 auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
774 if (write_op)
775 {
776 write_op->complete(err, 0);
777 local_ops.push(write_op);
778 ++completions_queued;
779 }
780
781 auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
782 if (conn_op)
783 {
784 conn_op->complete(err, 0);
785 local_ops.push(conn_op);
786 ++completions_queued;
787 }
788 }
789 }
790
791 // Process timers only when timerfd fires (like Asio's check_timers pattern)
792
2/2
✓ Branch 0 taken 2840 times.
✓ Branch 1 taken 78628 times.
81468 if (check_timers)
793 {
794
1/1
✓ Branch 1 taken 2840 times.
2840 timer_svc_->process_expired();
795
1/1
✓ Branch 1 taken 2840 times.
2840 update_timerfd();
796 }
797
798 // --- Acquire mutex only for queue operations ---
799
1/1
✓ Branch 1 taken 81468 times.
81468 lock.lock();
800
801
2/2
✓ Branch 1 taken 2659 times.
✓ Branch 2 taken 78809 times.
81468 if (!local_ops.empty())
802 2659 completed_ops_.splice(local_ops);
803
804 // Drain private queue (outstanding_work_ was already incremented when posting)
805
1/2
✓ Branch 1 taken 81468 times.
✗ Branch 2 not taken.
81468 if (auto* ctx = find_context(this))
806 {
807
2/2
✓ Branch 1 taken 76056 times.
✓ Branch 2 taken 5412 times.
81468 if (!ctx->private_queue.empty())
808 {
809 76056 completions_queued += ctx->private_outstanding_work;
810 76056 ctx->private_outstanding_work = 0;
811 76056 completed_ops_.splice(ctx->private_queue);
812 }
813 }
814
815 // Only wake threads that are actually idle, and only as many as we have work
816
3/4
✓ Branch 0 taken 78672 times.
✓ Branch 1 taken 2796 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 78672 times.
81468 if (completions_queued > 0 && idle_thread_count_ > 0)
817 {
818 int threads_to_wake = (std::min)(completions_queued, idle_thread_count_);
819 for (int i = 0; i < threads_to_wake; ++i)
820 wakeup_event_.notify_one();
821 }
822 81468 }
823
824 std::size_t
825 150384 epoll_scheduler::
826 do_one(long timeout_us)
827 {
828
1/1
✓ Branch 1 taken 150384 times.
150384 std::unique_lock lock(mutex_);
829
830 for (;;)
831 {
832
2/2
✓ Branch 1 taken 18 times.
✓ Branch 2 taken 231834 times.
231852 if (stopped_.load(std::memory_order_acquire))
833 18 return 0;
834
835 231834 scheduler_op* op = completed_ops_.pop();
836
837
2/2
✓ Branch 0 taken 81593 times.
✓ Branch 1 taken 150241 times.
231834 if (op == &task_op_)
838 {
839 // Check both global queue and private queue for pending handlers
840 81593 auto* ctx = find_context(this);
841
3/4
✓ Branch 1 taken 81445 times.
✓ Branch 2 taken 148 times.
✓ Branch 3 taken 81445 times.
✗ Branch 4 not taken.
163038 bool more_handlers = !completed_ops_.empty() ||
842
2/2
✓ Branch 1 taken 75841 times.
✓ Branch 2 taken 5604 times.
81445 (ctx && !ctx->private_queue.empty());
843
844
2/2
✓ Branch 0 taken 5604 times.
✓ Branch 1 taken 75989 times.
81593 if (!more_handlers)
845 {
846
2/2
✓ Branch 1 taken 125 times.
✓ Branch 2 taken 5479 times.
11208 if (outstanding_work_.load(std::memory_order_acquire) == 0)
847 {
848 125 completed_ops_.push(&task_op_);
849 125 return 0;
850 }
851
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5479 times.
5479 if (timeout_us == 0)
852 {
853 completed_ops_.push(&task_op_);
854 return 0;
855 }
856 }
857
858
3/4
✓ Branch 0 taken 5479 times.
✓ Branch 1 taken 75989 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5479 times.
81468 reactor_interrupted_ = more_handlers || timeout_us == 0;
859 81468 reactor_running_ = true;
860
861
3/4
✓ Branch 0 taken 75989 times.
✓ Branch 1 taken 5479 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 75989 times.
81468 if (more_handlers && idle_thread_count_ > 0)
862 wakeup_event_.notify_one();
863
864
1/1
✓ Branch 1 taken 81468 times.
81468 run_reactor(lock);
865
866 81468 reactor_running_ = false;
867 81468 completed_ops_.push(&task_op_);
868 81468 continue;
869 81468 }
870
871
1/2
✓ Branch 0 taken 150241 times.
✗ Branch 1 not taken.
150241 if (op != nullptr)
872 {
873
1/1
✓ Branch 1 taken 150241 times.
150241 lock.unlock();
874 150241 work_guard g{this};
875
1/1
✓ Branch 1 taken 150241 times.
150241 (*op)();
876 150241 return 1;
877 150241 }
878
879 if (outstanding_work_.load(std::memory_order_acquire) == 0)
880 return 0;
881
882 if (timeout_us == 0)
883 return 0;
884
885 // Drain private queue before blocking (outstanding_work_ was already incremented)
886 if (auto* ctx = find_context(this))
887 {
888 if (!ctx->private_queue.empty())
889 {
890 ctx->private_outstanding_work = 0;
891 completed_ops_.splice(ctx->private_queue);
892 continue;
893 }
894 }
895
896 ++idle_thread_count_;
897 if (timeout_us < 0)
898 wakeup_event_.wait(lock);
899 else
900 wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
901 --idle_thread_count_;
902 81468 }
903 150384 }
904
905 } // namespace boost::corosio::detail
906
907 #endif
908