libs/corosio/src/corosio/src/detail/select/op.hpp

74.8% Lines (98/131) 84.2% Functions (16/19) 65.7% Branches (23/35)
libs/corosio/src/corosio/src/detail/select/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/scheduler_op.hpp"
27 #include "src/detail/endpoint_convert.hpp"
28
29 #include <unistd.h>
30 #include <errno.h>
31 #include <fcntl.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <optional>
37 #include <stop_token>
38
39 #include <netinet/in.h>
40 #include <sys/select.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 select Operation State
46 ======================
47
48 Each async I/O operation has a corresponding select_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 This mirrors the epoll_op design for consistency across backends.
54
55 Completion vs Cancellation Race
56 -------------------------------
57 The `registered` atomic uses a tri-state (unregistered, registering,
58 registered) to handle two races: (1) between register_fd() and the
59 reactor seeing an event, and (2) between reactor completion and cancel().
60
61 The registering state closes the window where an event could arrive
62 after register_fd() but before the boolean was set. The reactor and
63 cancel() both treat registering the same as registered when claiming.
64
65 Whoever atomically exchanges to unregistered "claims" the operation
66 and is responsible for completing it. The loser sees unregistered and
67 does nothing. The initiating thread uses compare_exchange to transition
68 from registering to registered; if this fails, the reactor or cancel
69 already claimed the op.
70
71 Impl Lifetime Management
72 ------------------------
73 When cancel() posts an op to the scheduler's ready queue, the socket impl
74 might be destroyed before the scheduler processes the op. The `impl_ptr`
75 member holds a shared_ptr to the impl, keeping it alive until the op
76 completes.
77
78 EOF Detection
79 -------------
80 For reads, 0 bytes with no error means EOF. But an empty user buffer also
81 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
82
83 SIGPIPE Prevention
84 ------------------
85 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
86 SIGPIPE when the peer has closed.
87 */
88
89 namespace boost::corosio::detail {
90
91 // Forward declarations for cancellation support
92 class select_socket_impl;
93 class select_acceptor_impl;
94
95 /** Registration state for async operations.
96
97 Tri-state enum to handle the race between register_fd() and
98 run_reactor() seeing an event. Setting REGISTERING before
99 calling register_fd() ensures events delivered during the
100 registration window are not dropped.
101 */
102 enum class select_registration_state : std::uint8_t
103 {
104 unregistered, ///< Not registered with reactor
105 registering, ///< register_fd() called, not yet confirmed
106 registered ///< Fully registered, ready for events
107 };
108
109 struct select_op : scheduler_op
110 {
111 struct canceller
112 {
113 select_op* op;
114 void operator()() const noexcept;
115 };
116
117 capy::coro h;
118 capy::executor_ref ex;
119 std::error_code* ec_out = nullptr;
120 std::size_t* bytes_out = nullptr;
121
122 int fd = -1;
123 int errn = 0;
124 std::size_t bytes_transferred = 0;
125
126 std::atomic<bool> cancelled{false};
127 std::atomic<select_registration_state> registered{select_registration_state::unregistered};
128 std::optional<std::stop_callback<canceller>> stop_cb;
129
130 // Prevents use-after-free when socket is closed with pending ops.
131 std::shared_ptr<void> impl_ptr;
132
133 // For stop_token cancellation - pointer to owning socket/acceptor impl.
134 select_socket_impl* socket_impl_ = nullptr;
135 select_acceptor_impl* acceptor_impl_ = nullptr;
136
137 12708 select_op()
138 12708 {
139 12708 data_ = this;
140 12708 }
141
142 166110 void reset() noexcept
143 {
144 166110 fd = -1;
145 166110 errn = 0;
146 166110 bytes_transferred = 0;
147 166110 cancelled.store(false, std::memory_order_relaxed);
148 166110 registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
149 166110 impl_ptr.reset();
150 166110 socket_impl_ = nullptr;
151 166110 acceptor_impl_ = nullptr;
152 166110 }
153
154 161897 void operator()() override
155 {
156 161897 stop_cb.reset();
157
158
1/2
✓ Branch 0 taken 161897 times.
✗ Branch 1 not taken.
161897 if (ec_out)
159 {
160
2/2
✓ Branch 1 taken 200 times.
✓ Branch 2 taken 161697 times.
161897 if (cancelled.load(std::memory_order_acquire))
161 200 *ec_out = capy::error::canceled;
162
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 161696 times.
161697 else if (errn != 0)
163 1 *ec_out = make_err(errn);
164
6/6
✓ Branch 1 taken 80811 times.
✓ Branch 2 taken 80885 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 80806 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 161691 times.
161696 else if (is_read_operation() && bytes_transferred == 0)
165 5 *ec_out = capy::error::eof;
166 else
167 161691 *ec_out = {};
168 }
169
170
1/2
✓ Branch 0 taken 161897 times.
✗ Branch 1 not taken.
161897 if (bytes_out)
171 161897 *bytes_out = bytes_transferred;
172
173 // Move to stack before destroying the frame
174 161897 capy::executor_ref saved_ex( std::move( ex ) );
175 161897 capy::coro saved_h( std::move( h ) );
176 161897 impl_ptr.reset();
177
1/1
✓ Branch 1 taken 161897 times.
161897 saved_ex.dispatch( saved_h );
178 161897 }
179
180 80884 virtual bool is_read_operation() const noexcept { return false; }
181 virtual void cancel() noexcept = 0;
182
183 void destroy() override
184 {
185 stop_cb.reset();
186 impl_ptr.reset();
187 }
188
189 19484 void request_cancel() noexcept
190 {
191 19484 cancelled.store(true, std::memory_order_release);
192 19484 }
193
194 void start(std::stop_token token)
195 {
196 cancelled.store(false, std::memory_order_release);
197 stop_cb.reset();
198 socket_impl_ = nullptr;
199 acceptor_impl_ = nullptr;
200
201 if (token.stop_possible())
202 stop_cb.emplace(token, canceller{this});
203 }
204
205 164003 void start(std::stop_token token, select_socket_impl* impl)
206 {
207 164003 cancelled.store(false, std::memory_order_release);
208 164003 stop_cb.reset();
209 164003 socket_impl_ = impl;
210 164003 acceptor_impl_ = nullptr;
211
212
2/2
✓ Branch 1 taken 98 times.
✓ Branch 2 taken 163905 times.
164003 if (token.stop_possible())
213 98 stop_cb.emplace(token, canceller{this});
214 164003 }
215
216 2107 void start(std::stop_token token, select_acceptor_impl* impl)
217 {
218 2107 cancelled.store(false, std::memory_order_release);
219 2107 stop_cb.reset();
220 2107 socket_impl_ = nullptr;
221 2107 acceptor_impl_ = impl;
222
223
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2107 times.
2107 if (token.stop_possible())
224 stop_cb.emplace(token, canceller{this});
225 2107 }
226
227 165991 void complete(int err, std::size_t bytes) noexcept
228 {
229 165991 errn = err;
230 165991 bytes_transferred = bytes;
231 165991 }
232
233 virtual void perform_io() noexcept {}
234 };
235
236
237 struct select_connect_op : select_op
238 {
239 endpoint target_endpoint;
240
241 2106 void reset() noexcept
242 {
243 2106 select_op::reset();
244 2106 target_endpoint = endpoint{};
245 2106 }
246
247 2106 void perform_io() noexcept override
248 {
249 // connect() completion status is retrieved via SO_ERROR, not return value
250 2106 int err = 0;
251 2106 socklen_t len = sizeof(err);
252
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2106 times.
2106 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
253 err = errno;
254 2106 complete(err, 0);
255 2106 }
256
257 // Defined in sockets.cpp where select_socket_impl is complete
258 void operator()() override;
259 void cancel() noexcept override;
260 };
261
262
263 struct select_read_op : select_op
264 {
265 static constexpr std::size_t max_buffers = 16;
266 iovec iovecs[max_buffers];
267 int iovec_count = 0;
268 bool empty_buffer_read = false;
269
270 80812 bool is_read_operation() const noexcept override
271 {
272 80812 return !empty_buffer_read;
273 }
274
275 81008 void reset() noexcept
276 {
277 81008 select_op::reset();
278 81008 iovec_count = 0;
279 81008 empty_buffer_read = false;
280 81008 }
281
282 48 void perform_io() noexcept override
283 {
284 48 ssize_t n = ::readv(fd, iovecs, iovec_count);
285
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 if (n >= 0)
286 48 complete(0, static_cast<std::size_t>(n));
287 else
288 complete(errno, 0);
289 48 }
290
291 void cancel() noexcept override;
292 };
293
294
295 struct select_write_op : select_op
296 {
297 static constexpr std::size_t max_buffers = 16;
298 iovec iovecs[max_buffers];
299 int iovec_count = 0;
300
301 80889 void reset() noexcept
302 {
303 80889 select_op::reset();
304 80889 iovec_count = 0;
305 80889 }
306
307 void perform_io() noexcept override
308 {
309 msghdr msg{};
310 msg.msg_iov = iovecs;
311 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
312
313 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
314 if (n >= 0)
315 complete(0, static_cast<std::size_t>(n));
316 else
317 complete(errno, 0);
318 }
319
320 void cancel() noexcept override;
321 };
322
323
324 struct select_accept_op : select_op
325 {
326 int accepted_fd = -1;
327 io_object::io_object_impl* peer_impl = nullptr;
328 io_object::io_object_impl** impl_out = nullptr;
329
330 2107 void reset() noexcept
331 {
332 2107 select_op::reset();
333 2107 accepted_fd = -1;
334 2107 peer_impl = nullptr;
335 2107 impl_out = nullptr;
336 2107 }
337
338 2102 void perform_io() noexcept override
339 {
340 2102 sockaddr_in addr{};
341 2102 socklen_t addrlen = sizeof(addr);
342
343 // Note: select backend uses accept() + fcntl instead of accept4()
344 // for broader POSIX compatibility
345 2102 int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
346
347
1/2
✓ Branch 0 taken 2102 times.
✗ Branch 1 not taken.
2102 if (new_fd >= 0)
348 {
349 // Reject fds that exceed select()'s FD_SETSIZE limit.
350 // Better to fail now than during later async operations.
351
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2102 times.
2102 if (new_fd >= FD_SETSIZE)
352 {
353 ::close(new_fd);
354 complete(EINVAL, 0);
355 return;
356 }
357
358 // Set non-blocking and close-on-exec flags.
359 // A non-blocking socket is essential for the async reactor;
360 // if we can't configure it, fail rather than risk blocking.
361 2102 int flags = ::fcntl(new_fd, F_GETFL, 0);
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2102 times.
2102 if (flags == -1)
363 {
364 int err = errno;
365 ::close(new_fd);
366 complete(err, 0);
367 return;
368 }
369
370
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2102 times.
2102 if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
371 {
372 int err = errno;
373 ::close(new_fd);
374 complete(err, 0);
375 return;
376 }
377
378
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2102 times.
2102 if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
379 {
380 int err = errno;
381 ::close(new_fd);
382 complete(err, 0);
383 return;
384 }
385
386 2102 accepted_fd = new_fd;
387 2102 complete(0, 0);
388 }
389 else
390 {
391 complete(errno, 0);
392 }
393 }
394
395 // Defined in acceptors.cpp where select_acceptor_impl is complete
396 void operator()() override;
397 void cancel() noexcept override;
398 };
399
400 } // namespace boost::corosio::detail
401
402 #endif // BOOST_COROSIO_HAS_SELECT
403
404 #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
405