libs/corosio/src/corosio/src/detail/select/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2) -% Branches (0/0)
libs/corosio/src/corosio/src/detail/select/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/scheduler.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include "src/detail/scheduler_op.hpp"
22 #include "src/detail/timer_service.hpp"
23
24 #include <sys/select.h>
25
26 #include <atomic>
27 #include <condition_variable>
28 #include <cstddef>
29 #include <mutex>
30 #include <unordered_map>
31
32 namespace boost::corosio::detail {
33
34 struct select_op;
35
36 /** POSIX scheduler using select() for I/O multiplexing.
37
38 This scheduler implements the scheduler interface using the POSIX select()
39 call for I/O event notification. It uses a single reactor model
40 where one thread runs select() while other threads wait on a condition
41 variable for handler work. This design provides:
42
43 - Handler parallelism: N posted handlers can execute on N threads
44 - No thundering herd: condition_variable wakes exactly one thread
45 - Portability: Works on all POSIX systems
46
47 The design mirrors epoll_scheduler for behavioral consistency:
48 - Same single-reactor thread coordination model
49 - Same work counting semantics
50 - Same timer integration pattern
51
52 Known Limitations:
53 - FD_SETSIZE (~1024) limits maximum concurrent connections
54 - O(n) scanning: rebuilds fd_sets each iteration
55 - Level-triggered only (no edge-triggered mode)
56
57 @par Thread Safety
58 All public member functions are thread-safe.
59 */
60 class select_scheduler
61 : public scheduler
62 , public capy::execution_context::service
63 {
64 public:
65 using key_type = scheduler;
66
67 /** Construct the scheduler.
68
69 Creates a self-pipe for reactor interruption.
70
71 @param ctx Reference to the owning execution_context.
72 @param concurrency_hint Hint for expected thread count (unused).
73 */
74 select_scheduler(
75 capy::execution_context& ctx,
76 int concurrency_hint = -1);
77
78 ~select_scheduler();
79
80 select_scheduler(select_scheduler const&) = delete;
81 select_scheduler& operator=(select_scheduler const&) = delete;
82
83 void shutdown() override;
84 void post(capy::coro h) const override;
85 void post(scheduler_op* h) const override;
86 void on_work_started() noexcept override;
87 void on_work_finished() noexcept override;
88 bool running_in_this_thread() const noexcept override;
89 void stop() override;
90 bool stopped() const noexcept override;
91 void restart() override;
92 std::size_t run() override;
93 std::size_t run_one() override;
94 std::size_t wait_one(long usec) override;
95 std::size_t poll() override;
96 std::size_t poll_one() override;
97
98 /** Return the maximum file descriptor value supported.
99
100 Returns FD_SETSIZE - 1, the maximum fd value that can be
101 monitored by select(). Operations with fd >= FD_SETSIZE
102 will fail with EINVAL.
103
104 @return The maximum supported file descriptor value.
105 */
106 static constexpr int max_fd() noexcept { return FD_SETSIZE - 1; }
107
108 /** Register a file descriptor for monitoring.
109
110 @param fd The file descriptor to register.
111 @param op The operation associated with this fd.
112 @param events Event mask: 1 = read, 2 = write, 3 = both.
113 */
114 void register_fd(int fd, select_op* op, int events) const;
115
116 /** Unregister a file descriptor from monitoring.
117
118 @param fd The file descriptor to unregister.
119 @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
120 */
121 void deregister_fd(int fd, int events) const;
122
123 /** For use by I/O operations to track pending work. */
124 void work_started() const noexcept override;
125
126 /** For use by I/O operations to track completed work. */
127 void work_finished() const noexcept override;
128
129 // Event flags for register_fd/deregister_fd
130 static constexpr int event_read = 1;
131 static constexpr int event_write = 2;
132
133 private:
134 std::size_t do_one(long timeout_us);
135 void run_reactor(std::unique_lock<std::mutex>& lock);
136 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
137 void interrupt_reactor() const;
138 long calculate_timeout(long requested_timeout_us) const;
139
140 // Self-pipe for interrupting select()
141 int pipe_fds_[2]; // [0]=read, [1]=write
142
143 mutable std::mutex mutex_;
144 mutable std::condition_variable wakeup_event_;
145 mutable op_queue completed_ops_;
146 mutable std::atomic<long> outstanding_work_;
147 std::atomic<bool> stopped_;
148 bool shutdown_;
149 timer_service* timer_svc_ = nullptr;
150
151 // Per-fd state for tracking registered operations
152 struct fd_state
153 {
154 select_op* read_op = nullptr;
155 select_op* write_op = nullptr;
156 };
157 mutable std::unordered_map<int, fd_state> registered_fds_;
158 mutable int max_fd_ = -1;
159
160 // Single reactor thread coordination
161 mutable bool reactor_running_ = false;
162 mutable bool reactor_interrupted_ = false;
163 mutable int idle_thread_count_ = 0;
164
165 // Sentinel operation for interleaving reactor runs with handler execution.
166 // Ensures the reactor runs periodically even when handlers are continuously
167 // posted, preventing timer starvation.
168 struct task_op final : scheduler_op
169 {
170 void operator()() override {}
171 void destroy() override {}
172 };
173 task_op task_op_;
174 };
175
176 } // namespace boost::corosio::detail
177
178 #endif // BOOST_COROSIO_HAS_SELECT
179
180 #endif // BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
181