Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
11 : #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/coro.hpp>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include "src/detail/make_err.hpp"
26 : #include "src/detail/scheduler_op.hpp"
27 : #include "src/detail/endpoint_convert.hpp"
28 :
29 : #include <unistd.h>
30 : #include <errno.h>
31 : #include <fcntl.h>
32 :
33 : #include <atomic>
34 : #include <cstddef>
35 : #include <memory>
36 : #include <optional>
37 : #include <stop_token>
38 :
39 : #include <netinet/in.h>
40 : #include <sys/select.h>
41 : #include <sys/socket.h>
42 : #include <sys/uio.h>
43 :
44 : /*
45 : select Operation State
46 : ======================
47 :
48 : Each async I/O operation has a corresponding select_op-derived struct that
49 : holds the operation's state while it's in flight. The socket impl owns
50 : fixed slots for each operation type (conn_, rd_, wr_), so only one
51 : operation of each type can be pending per socket at a time.
52 :
53 : This mirrors the epoll_op design for consistency across backends.
54 :
55 : Completion vs Cancellation Race
56 : -------------------------------
57 : The `registered` atomic uses a tri-state (unregistered, registering,
58 : registered) to handle two races: (1) between register_fd() and the
59 : reactor seeing an event, and (2) between reactor completion and cancel().
60 :
61 : The registering state closes the window where an event could arrive
62 : after register_fd() but before the boolean was set. The reactor and
63 : cancel() both treat registering the same as registered when claiming.
64 :
65 : Whoever atomically exchanges to unregistered "claims" the operation
66 : and is responsible for completing it. The loser sees unregistered and
67 : does nothing. The initiating thread uses compare_exchange to transition
68 : from registering to registered; if this fails, the reactor or cancel
69 : already claimed the op.
70 :
71 : Impl Lifetime Management
72 : ------------------------
73 : When cancel() posts an op to the scheduler's ready queue, the socket impl
74 : might be destroyed before the scheduler processes the op. The `impl_ptr`
75 : member holds a shared_ptr to the impl, keeping it alive until the op
76 : completes.
77 :
78 : EOF Detection
79 : -------------
80 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
81 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
82 :
83 : SIGPIPE Prevention
84 : ------------------
85 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
86 : SIGPIPE when the peer has closed.
87 : */
88 :
89 : namespace boost::corosio::detail {
90 :
91 : // Forward declarations for cancellation support
92 : class select_socket_impl;
93 : class select_acceptor_impl;
94 :
95 : /** Registration state for async operations.
96 :
97 : Tri-state enum to handle the race between register_fd() and
98 : run_reactor() seeing an event. Setting REGISTERING before
99 : calling register_fd() ensures events delivered during the
100 : registration window are not dropped.
101 : */
102 : enum class select_registration_state : std::uint8_t
103 : {
104 : unregistered, ///< Not registered with reactor
105 : registering, ///< register_fd() called, not yet confirmed
106 : registered ///< Fully registered, ready for events
107 : };
108 :
109 : struct select_op : scheduler_op
110 : {
111 : struct canceller
112 : {
113 : select_op* op;
114 : void operator()() const noexcept;
115 : };
116 :
117 : capy::coro h;
118 : capy::executor_ref ex;
119 : std::error_code* ec_out = nullptr;
120 : std::size_t* bytes_out = nullptr;
121 :
122 : int fd = -1;
123 : int errn = 0;
124 : std::size_t bytes_transferred = 0;
125 :
126 : std::atomic<bool> cancelled{false};
127 : std::atomic<select_registration_state> registered{select_registration_state::unregistered};
128 : std::optional<std::stop_callback<canceller>> stop_cb;
129 :
130 : // Prevents use-after-free when socket is closed with pending ops.
131 : std::shared_ptr<void> impl_ptr;
132 :
133 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
134 : select_socket_impl* socket_impl_ = nullptr;
135 : select_acceptor_impl* acceptor_impl_ = nullptr;
136 :
137 12708 : select_op()
138 12708 : {
139 12708 : data_ = this;
140 12708 : }
141 :
142 166110 : void reset() noexcept
143 : {
144 166110 : fd = -1;
145 166110 : errn = 0;
146 166110 : bytes_transferred = 0;
147 166110 : cancelled.store(false, std::memory_order_relaxed);
148 166110 : registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
149 166110 : impl_ptr.reset();
150 166110 : socket_impl_ = nullptr;
151 166110 : acceptor_impl_ = nullptr;
152 166110 : }
153 :
154 161897 : void operator()() override
155 : {
156 161897 : stop_cb.reset();
157 :
158 161897 : if (ec_out)
159 : {
160 161897 : if (cancelled.load(std::memory_order_acquire))
161 200 : *ec_out = capy::error::canceled;
162 161697 : else if (errn != 0)
163 1 : *ec_out = make_err(errn);
164 161696 : else if (is_read_operation() && bytes_transferred == 0)
165 5 : *ec_out = capy::error::eof;
166 : else
167 161691 : *ec_out = {};
168 : }
169 :
170 161897 : if (bytes_out)
171 161897 : *bytes_out = bytes_transferred;
172 :
173 : // Move to stack before destroying the frame
174 161897 : capy::executor_ref saved_ex( std::move( ex ) );
175 161897 : capy::coro saved_h( std::move( h ) );
176 161897 : impl_ptr.reset();
177 161897 : saved_ex.dispatch( saved_h );
178 161897 : }
179 :
180 80884 : virtual bool is_read_operation() const noexcept { return false; }
181 : virtual void cancel() noexcept = 0;
182 :
183 0 : void destroy() override
184 : {
185 0 : stop_cb.reset();
186 0 : impl_ptr.reset();
187 0 : }
188 :
189 19484 : void request_cancel() noexcept
190 : {
191 19484 : cancelled.store(true, std::memory_order_release);
192 19484 : }
193 :
194 : void start(std::stop_token token)
195 : {
196 : cancelled.store(false, std::memory_order_release);
197 : stop_cb.reset();
198 : socket_impl_ = nullptr;
199 : acceptor_impl_ = nullptr;
200 :
201 : if (token.stop_possible())
202 : stop_cb.emplace(token, canceller{this});
203 : }
204 :
205 164003 : void start(std::stop_token token, select_socket_impl* impl)
206 : {
207 164003 : cancelled.store(false, std::memory_order_release);
208 164003 : stop_cb.reset();
209 164003 : socket_impl_ = impl;
210 164003 : acceptor_impl_ = nullptr;
211 :
212 164003 : if (token.stop_possible())
213 98 : stop_cb.emplace(token, canceller{this});
214 164003 : }
215 :
216 2107 : void start(std::stop_token token, select_acceptor_impl* impl)
217 : {
218 2107 : cancelled.store(false, std::memory_order_release);
219 2107 : stop_cb.reset();
220 2107 : socket_impl_ = nullptr;
221 2107 : acceptor_impl_ = impl;
222 :
223 2107 : if (token.stop_possible())
224 0 : stop_cb.emplace(token, canceller{this});
225 2107 : }
226 :
227 165991 : void complete(int err, std::size_t bytes) noexcept
228 : {
229 165991 : errn = err;
230 165991 : bytes_transferred = bytes;
231 165991 : }
232 :
233 0 : virtual void perform_io() noexcept {}
234 : };
235 :
236 :
237 : struct select_connect_op : select_op
238 : {
239 : endpoint target_endpoint;
240 :
241 2106 : void reset() noexcept
242 : {
243 2106 : select_op::reset();
244 2106 : target_endpoint = endpoint{};
245 2106 : }
246 :
247 2106 : void perform_io() noexcept override
248 : {
249 : // connect() completion status is retrieved via SO_ERROR, not return value
250 2106 : int err = 0;
251 2106 : socklen_t len = sizeof(err);
252 2106 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
253 0 : err = errno;
254 2106 : complete(err, 0);
255 2106 : }
256 :
257 : // Defined in sockets.cpp where select_socket_impl is complete
258 : void operator()() override;
259 : void cancel() noexcept override;
260 : };
261 :
262 :
263 : struct select_read_op : select_op
264 : {
265 : static constexpr std::size_t max_buffers = 16;
266 : iovec iovecs[max_buffers];
267 : int iovec_count = 0;
268 : bool empty_buffer_read = false;
269 :
270 80812 : bool is_read_operation() const noexcept override
271 : {
272 80812 : return !empty_buffer_read;
273 : }
274 :
275 81008 : void reset() noexcept
276 : {
277 81008 : select_op::reset();
278 81008 : iovec_count = 0;
279 81008 : empty_buffer_read = false;
280 81008 : }
281 :
282 48 : void perform_io() noexcept override
283 : {
284 48 : ssize_t n = ::readv(fd, iovecs, iovec_count);
285 48 : if (n >= 0)
286 48 : complete(0, static_cast<std::size_t>(n));
287 : else
288 0 : complete(errno, 0);
289 48 : }
290 :
291 : void cancel() noexcept override;
292 : };
293 :
294 :
295 : struct select_write_op : select_op
296 : {
297 : static constexpr std::size_t max_buffers = 16;
298 : iovec iovecs[max_buffers];
299 : int iovec_count = 0;
300 :
301 80889 : void reset() noexcept
302 : {
303 80889 : select_op::reset();
304 80889 : iovec_count = 0;
305 80889 : }
306 :
307 0 : void perform_io() noexcept override
308 : {
309 0 : msghdr msg{};
310 0 : msg.msg_iov = iovecs;
311 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
312 :
313 0 : ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
314 0 : if (n >= 0)
315 0 : complete(0, static_cast<std::size_t>(n));
316 : else
317 0 : complete(errno, 0);
318 0 : }
319 :
320 : void cancel() noexcept override;
321 : };
322 :
323 :
324 : struct select_accept_op : select_op
325 : {
326 : int accepted_fd = -1;
327 : io_object::io_object_impl* peer_impl = nullptr;
328 : io_object::io_object_impl** impl_out = nullptr;
329 :
330 2107 : void reset() noexcept
331 : {
332 2107 : select_op::reset();
333 2107 : accepted_fd = -1;
334 2107 : peer_impl = nullptr;
335 2107 : impl_out = nullptr;
336 2107 : }
337 :
338 2102 : void perform_io() noexcept override
339 : {
340 2102 : sockaddr_in addr{};
341 2102 : socklen_t addrlen = sizeof(addr);
342 :
343 : // Note: select backend uses accept() + fcntl instead of accept4()
344 : // for broader POSIX compatibility
345 2102 : int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
346 :
347 2102 : if (new_fd >= 0)
348 : {
349 : // Reject fds that exceed select()'s FD_SETSIZE limit.
350 : // Better to fail now than during later async operations.
351 2102 : if (new_fd >= FD_SETSIZE)
352 : {
353 0 : ::close(new_fd);
354 0 : complete(EINVAL, 0);
355 0 : return;
356 : }
357 :
358 : // Set non-blocking and close-on-exec flags.
359 : // A non-blocking socket is essential for the async reactor;
360 : // if we can't configure it, fail rather than risk blocking.
361 2102 : int flags = ::fcntl(new_fd, F_GETFL, 0);
362 2102 : if (flags == -1)
363 : {
364 0 : int err = errno;
365 0 : ::close(new_fd);
366 0 : complete(err, 0);
367 0 : return;
368 : }
369 :
370 2102 : if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
371 : {
372 0 : int err = errno;
373 0 : ::close(new_fd);
374 0 : complete(err, 0);
375 0 : return;
376 : }
377 :
378 2102 : if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
379 : {
380 0 : int err = errno;
381 0 : ::close(new_fd);
382 0 : complete(err, 0);
383 0 : return;
384 : }
385 :
386 2102 : accepted_fd = new_fd;
387 2102 : complete(0, 0);
388 : }
389 : else
390 : {
391 0 : complete(errno, 0);
392 : }
393 : }
394 :
395 : // Defined in acceptors.cpp where select_acceptor_impl is complete
396 : void operator()() override;
397 : void cancel() noexcept override;
398 : };
399 :
400 : } // namespace boost::corosio::detail
401 :
402 : #endif // BOOST_COROSIO_HAS_SELECT
403 :
404 : #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
|