Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/detail/scheduler.hpp>
19 : #include <boost/capy/ex/execution_context.hpp>
20 :
21 : #include "src/detail/scheduler_op.hpp"
22 : #include "src/detail/timer_service.hpp"
23 :
24 : #include <atomic>
25 : #include <condition_variable>
26 : #include <cstddef>
27 : #include <cstdint>
28 : #include <mutex>
29 :
30 : namespace boost::corosio::detail {
31 :
32 : struct epoll_op;
33 : struct descriptor_data;
34 :
35 : /** Linux scheduler using epoll for I/O multiplexing.
36 :
37 : This scheduler implements the scheduler interface using Linux epoll
38 : for efficient I/O event notification. It uses a single reactor model
39 : where one thread runs epoll_wait while other threads
40 : wait on a condition variable for handler work. This design provides:
41 :
42 : - Handler parallelism: N posted handlers can execute on N threads
43 : - No thundering herd: condition_variable wakes exactly one thread
44 : - IOCP parity: Behavior matches Windows I/O completion port semantics
45 :
46 : When threads call run(), they first try to execute queued handlers.
47 : If the queue is empty and no reactor is running, one thread becomes
48 : the reactor and runs epoll_wait. Other threads wait on a condition
49 : variable until handlers are available.
50 :
51 : @par Thread Safety
52 : All public member functions are thread-safe.
53 : */
54 : class epoll_scheduler
55 : : public scheduler
56 : , public capy::execution_context::service
57 : {
58 : public:
59 : using key_type = scheduler;
60 :
61 : /** Construct the scheduler.
62 :
63 : Creates an epoll instance, eventfd for reactor interruption,
64 : and timerfd for kernel-managed timer expiry.
65 :
66 : @param ctx Reference to the owning execution_context.
67 : @param concurrency_hint Hint for expected thread count (unused).
68 : */
69 : epoll_scheduler(
70 : capy::execution_context& ctx,
71 : int concurrency_hint = -1);
72 :
73 : /// Destroy the scheduler.
74 : ~epoll_scheduler();
75 :
76 : epoll_scheduler(epoll_scheduler const&) = delete;
77 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
78 :
79 : void shutdown() override;
80 : void post(capy::coro h) const override;
81 : void post(scheduler_op* h) const override;
82 : void on_work_started() noexcept override;
83 : void on_work_finished() noexcept override;
84 : bool running_in_this_thread() const noexcept override;
85 : void stop() override;
86 : bool stopped() const noexcept override;
87 : void restart() override;
88 : std::size_t run() override;
89 : std::size_t run_one() override;
90 : std::size_t wait_one(long usec) override;
91 : std::size_t poll() override;
92 : std::size_t poll_one() override;
93 :
94 : /** Return the epoll file descriptor.
95 :
96 : Used by socket services to register file descriptors
97 : for I/O event notification.
98 :
99 : @return The epoll file descriptor.
100 : */
101 : int epoll_fd() const noexcept { return epoll_fd_; }
102 :
103 : /** Register a descriptor for persistent monitoring.
104 :
105 : The fd is registered once and stays registered until explicitly
106 : deregistered. Events are dispatched via descriptor_data which
107 : tracks pending read/write/connect operations.
108 :
109 : @param fd The file descriptor to register.
110 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
111 : */
112 : void register_descriptor(int fd, descriptor_data* desc) const;
113 :
114 : /** Update events for a persistently registered descriptor.
115 :
116 : @param fd The file descriptor.
117 : @param desc Pointer to descriptor data.
118 : @param events The new events to monitor.
119 : */
120 : void update_descriptor_events(int fd, descriptor_data* desc, std::uint32_t events) const;
121 :
122 : /** Deregister a persistently registered descriptor.
123 :
124 : @param fd The file descriptor to deregister.
125 : */
126 : void deregister_descriptor(int fd) const;
127 :
128 : /** For use by I/O operations to track pending work. */
129 : void work_started() const noexcept override;
130 :
131 : /** For use by I/O operations to track completed work. */
132 : void work_finished() const noexcept override;
133 :
134 : /** Drain work from thread context's private queue to global queue.
135 :
136 : Called by thread_context_guard destructor when a thread exits run().
137 : Transfers pending work to the global queue under mutex protection.
138 :
139 : @param queue The private queue to drain.
140 : @param count Item count for wakeup decisions (wakes other threads if positive).
141 : */
142 : void drain_thread_queue(op_queue& queue, long count) const;
143 :
144 : private:
145 : std::size_t do_one(long timeout_us);
146 : void run_reactor(std::unique_lock<std::mutex>& lock);
147 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148 : void interrupt_reactor() const;
149 : void update_timerfd() const;
150 :
151 : int epoll_fd_;
152 : int event_fd_; // for interrupting reactor
153 : int timer_fd_; // timerfd for kernel-managed timer expiry
154 : mutable std::mutex mutex_;
155 : mutable std::condition_variable wakeup_event_;
156 : mutable op_queue completed_ops_;
157 : mutable std::atomic<long> outstanding_work_;
158 : std::atomic<bool> stopped_;
159 : bool shutdown_;
160 : timer_service* timer_svc_ = nullptr;
161 :
162 : // Single reactor thread coordination
163 : mutable bool reactor_running_ = false;
164 : mutable bool reactor_interrupted_ = false;
165 : mutable int idle_thread_count_ = 0;
166 :
167 : // Edge-triggered eventfd state
168 : mutable std::atomic<bool> eventfd_armed_{false};
169 :
170 :
171 : // Sentinel operation for interleaving reactor runs with handler execution.
172 : // Ensures the reactor runs periodically even when handlers are continuously
173 : // posted, preventing timer starvation.
174 : struct task_op final : scheduler_op
175 : {
176 0 : void operator()() override {}
177 0 : void destroy() override {}
178 : };
179 : task_op task_op_;
180 : };
181 :
182 : } // namespace boost::corosio::detail
183 :
184 : #endif // BOOST_COROSIO_HAS_EPOLL
185 :
186 : #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
|