libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2) -% Branches (0/0)
libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/scheduler.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include "src/detail/scheduler_op.hpp"
22 #include "src/detail/timer_service.hpp"
23
24 #include <atomic>
25 #include <condition_variable>
26 #include <cstddef>
27 #include <cstdint>
28 #include <mutex>
29
30 namespace boost::corosio::detail {
31
32 struct epoll_op;
33 struct descriptor_data;
34
35 /** Linux scheduler using epoll for I/O multiplexing.
36
37 This scheduler implements the scheduler interface using Linux epoll
38 for efficient I/O event notification. It uses a single reactor model
39 where one thread runs epoll_wait while other threads
40 wait on a condition variable for handler work. This design provides:
41
42 - Handler parallelism: N posted handlers can execute on N threads
43 - No thundering herd: condition_variable wakes exactly one thread
44 - IOCP parity: Behavior matches Windows I/O completion port semantics
45
46 When threads call run(), they first try to execute queued handlers.
47 If the queue is empty and no reactor is running, one thread becomes
48 the reactor and runs epoll_wait. Other threads wait on a condition
49 variable until handlers are available.
50
51 @par Thread Safety
52 All public member functions are thread-safe.
53 */
54 class epoll_scheduler
55 : public scheduler
56 , public capy::execution_context::service
57 {
58 public:
59 using key_type = scheduler;
60
61 /** Construct the scheduler.
62
63 Creates an epoll instance, eventfd for reactor interruption,
64 and timerfd for kernel-managed timer expiry.
65
66 @param ctx Reference to the owning execution_context.
67 @param concurrency_hint Hint for expected thread count (unused).
68 */
69 epoll_scheduler(
70 capy::execution_context& ctx,
71 int concurrency_hint = -1);
72
73 /// Destroy the scheduler.
74 ~epoll_scheduler();
75
76 epoll_scheduler(epoll_scheduler const&) = delete;
77 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
78
79 void shutdown() override;
80 void post(capy::coro h) const override;
81 void post(scheduler_op* h) const override;
82 void on_work_started() noexcept override;
83 void on_work_finished() noexcept override;
84 bool running_in_this_thread() const noexcept override;
85 void stop() override;
86 bool stopped() const noexcept override;
87 void restart() override;
88 std::size_t run() override;
89 std::size_t run_one() override;
90 std::size_t wait_one(long usec) override;
91 std::size_t poll() override;
92 std::size_t poll_one() override;
93
94 /** Return the epoll file descriptor.
95
96 Used by socket services to register file descriptors
97 for I/O event notification.
98
99 @return The epoll file descriptor.
100 */
101 int epoll_fd() const noexcept { return epoll_fd_; }
102
103 /** Register a descriptor for persistent monitoring.
104
105 The fd is registered once and stays registered until explicitly
106 deregistered. Events are dispatched via descriptor_data which
107 tracks pending read/write/connect operations.
108
109 @param fd The file descriptor to register.
110 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
111 */
112 void register_descriptor(int fd, descriptor_data* desc) const;
113
114 /** Update events for a persistently registered descriptor.
115
116 @param fd The file descriptor.
117 @param desc Pointer to descriptor data.
118 @param events The new events to monitor.
119 */
120 void update_descriptor_events(int fd, descriptor_data* desc, std::uint32_t events) const;
121
122 /** Deregister a persistently registered descriptor.
123
124 @param fd The file descriptor to deregister.
125 */
126 void deregister_descriptor(int fd) const;
127
128 /** For use by I/O operations to track pending work. */
129 void work_started() const noexcept override;
130
131 /** For use by I/O operations to track completed work. */
132 void work_finished() const noexcept override;
133
134 /** Drain work from thread context's private queue to global queue.
135
136 Called by thread_context_guard destructor when a thread exits run().
137 Transfers pending work to the global queue under mutex protection.
138
139 @param queue The private queue to drain.
140 @param count Item count for wakeup decisions (wakes other threads if positive).
141 */
142 void drain_thread_queue(op_queue& queue, long count) const;
143
144 private:
145 std::size_t do_one(long timeout_us);
146 void run_reactor(std::unique_lock<std::mutex>& lock);
147 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148 void interrupt_reactor() const;
149 void update_timerfd() const;
150
151 int epoll_fd_;
152 int event_fd_; // for interrupting reactor
153 int timer_fd_; // timerfd for kernel-managed timer expiry
154 mutable std::mutex mutex_;
155 mutable std::condition_variable wakeup_event_;
156 mutable op_queue completed_ops_;
157 mutable std::atomic<long> outstanding_work_;
158 std::atomic<bool> stopped_;
159 bool shutdown_;
160 timer_service* timer_svc_ = nullptr;
161
162 // Single reactor thread coordination
163 mutable bool reactor_running_ = false;
164 mutable bool reactor_interrupted_ = false;
165 mutable int idle_thread_count_ = 0;
166
167 // Edge-triggered eventfd state
168 mutable std::atomic<bool> eventfd_armed_{false};
169
170
171 // Sentinel operation for interleaving reactor runs with handler execution.
172 // Ensures the reactor runs periodically even when handlers are continuously
173 // posted, preventing timer starvation.
174 struct task_op final : scheduler_op
175 {
176 void operator()() override {}
177 void destroy() override {}
178 };
179 task_op task_op_;
180 };
181
182 } // namespace boost::corosio::detail
183
184 #endif // BOOST_COROSIO_HAS_EPOLL
185
186 #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
187