libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

78.4% Lines (315/402) 95.0% Functions (38/40) 60.7% Branches (125/206)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/resume_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <errno.h>
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <sys/epoll.h>
26 #include <sys/socket.h>
27 #include <unistd.h>
28
29 namespace boost::corosio::detail {
30
31 void
32 106 epoll_op::canceller::
33 operator()() const noexcept
34 {
35 106 op->cancel();
36 106 }
37
38 void
39 epoll_connect_op::
40 cancel() noexcept
41 {
42 if (socket_impl_)
43 socket_impl_->cancel_single_op(*this);
44 else
45 request_cancel();
46 }
47
48 void
49 99 epoll_read_op::
50 cancel() noexcept
51 {
52
1/2
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
99 if (socket_impl_)
53 99 socket_impl_->cancel_single_op(*this);
54 else
55 request_cancel();
56 99 }
57
58 void
59 1 epoll_write_op::
60 cancel() noexcept
61 {
62
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (socket_impl_)
63 1 socket_impl_->cancel_single_op(*this);
64 else
65 request_cancel();
66 1 }
67
68 void
69 2611 epoll_connect_op::
70 operator()()
71 {
72 2611 stop_cb.reset();
73
74
3/4
✓ Branch 0 taken 2609 times.
✓ Branch 1 taken 2 times.
✓ Branch 3 taken 2609 times.
✗ Branch 4 not taken.
2611 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
75
76 // Cache endpoints on successful connect
77
3/4
✓ Branch 0 taken 2609 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2609 times.
✗ Branch 3 not taken.
2611 if (success && socket_impl_)
78 {
79 // Query local endpoint via getsockname (may fail, but remote is always known)
80 2609 endpoint local_ep;
81 2609 sockaddr_in local_addr{};
82 2609 socklen_t local_len = sizeof(local_addr);
83
1/2
✓ Branch 1 taken 2609 times.
✗ Branch 2 not taken.
2609 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
84 2609 local_ep = from_sockaddr_in(local_addr);
85 // Always cache remote endpoint; local may be default if getsockname failed
86 2609 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
87 }
88
89
1/2
✓ Branch 0 taken 2611 times.
✗ Branch 1 not taken.
2611 if (ec_out)
90 {
91
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2611 times.
2611 if (cancelled.load(std::memory_order_acquire))
92 *ec_out = capy::error::canceled;
93
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2609 times.
2611 else if (errn != 0)
94 2 *ec_out = make_err(errn);
95 else
96 2609 *ec_out = {};
97 }
98
99
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2611 times.
2611 if (bytes_out)
100 *bytes_out = bytes_transferred;
101
102 // Move to stack before resuming. See epoll_op::operator()() for rationale.
103 2611 capy::executor_ref saved_ex( std::move( ex ) );
104 2611 capy::coro saved_h( std::move( h ) );
105 2611 auto prevent_premature_destruction = std::move(impl_ptr);
106
1/1
✓ Branch 1 taken 2611 times.
2611 resume_coro(saved_ex, saved_h);
107 2611 }
108
109 5231 epoll_socket_impl::
110 5231 epoll_socket_impl(epoll_socket_service& svc) noexcept
111 5231 : svc_(svc)
112 {
113 5231 }
114
115 5231 epoll_socket_impl::
116 5231 ~epoll_socket_impl()
117 {
118
2/2
✓ Branch 1 taken 40 times.
✓ Branch 2 taken 5191 times.
5231 if (read_initiator_handle_)
119 40 read_initiator_handle_.destroy();
120
2/2
✓ Branch 1 taken 38 times.
✓ Branch 2 taken 5193 times.
5231 if (write_initiator_handle_)
121 38 write_initiator_handle_.destroy();
122
123 // promise_type::operator delete is no-op, so free here
124
2/2
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 5191 times.
5231 if (read_initiator_frame_)
125 40 ::operator delete(read_initiator_frame_);
126
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 5193 times.
5231 if (write_initiator_frame_)
127 38 ::operator delete(write_initiator_frame_);
128 5231 }
129
130 void
131 epoll_socket_impl::
132 update_epoll_events() noexcept
133 {
134 // With EPOLLET, update_descriptor_events just provides a memory fence
135 svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0);
136 }
137
138 void
139 5231 epoll_socket_impl::
140 release()
141 {
142 5231 close_socket();
143 5231 svc_.destroy_impl(*this);
144 5231 }
145
146 void
147 2611 epoll_socket_impl::
148 connect(
149 std::coroutine_handle<> h,
150 capy::executor_ref ex,
151 endpoint ep,
152 std::stop_token token,
153 std::error_code* ec)
154 {
155 2611 auto& op = conn_;
156 2611 op.reset();
157 2611 op.h = h;
158 2611 op.ex = ex;
159 2611 op.ec_out = ec;
160 2611 op.fd = fd_;
161 2611 op.target_endpoint = ep; // Store target for endpoint caching
162 2611 op.start(token, this);
163
164 2611 sockaddr_in addr = detail::to_sockaddr_in(ep);
165
1/1
✓ Branch 1 taken 2611 times.
2611 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
166
167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2611 times.
2611 if (result == 0)
168 {
169 // Sync success - cache endpoints immediately
170 // Remote is always known; local may fail but we still cache remote
171 sockaddr_in local_addr{};
172 socklen_t local_len = sizeof(local_addr);
173 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
174 local_endpoint_ = detail::from_sockaddr_in(local_addr);
175 remote_endpoint_ = ep;
176
177 op.complete(0, 0);
178 op.impl_ptr = shared_from_this();
179 svc_.post(&op);
180 return;
181 }
182
183
1/2
✓ Branch 0 taken 2611 times.
✗ Branch 1 not taken.
2611 if (errno == EINPROGRESS)
184 {
185 2611 svc_.work_started();
186
1/1
✓ Branch 1 taken 2611 times.
2611 op.impl_ptr = shared_from_this();
187
188 2611 desc_data_.connect_op.store(&op, std::memory_order_seq_cst);
189
190
2/2
✓ Branch 1 taken 43 times.
✓ Branch 2 taken 2568 times.
2611 if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst))
191 {
192 43 auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel);
193
1/2
✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
43 if (claimed)
194 {
195 43 claimed->perform_io();
196
2/4
✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
43 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
197 {
198 claimed->errn = 0;
199 desc_data_.connect_op.store(claimed, std::memory_order_release);
200 }
201 else
202 {
203
1/1
✓ Branch 1 taken 43 times.
43 svc_.post(claimed);
204 43 svc_.work_finished();
205 }
206 43 return;
207 }
208 }
209
210
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2568 times.
2568 if (op.cancelled.load(std::memory_order_acquire))
211 {
212 auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel);
213 if (claimed)
214 {
215 svc_.post(claimed);
216 svc_.work_finished();
217 }
218 }
219 2568 return;
220 }
221
222 op.complete(errno, 0);
223 op.impl_ptr = shared_from_this();
224 svc_.post(&op);
225 }
226
227 read_initiator
228
1/1
✓ Branch 1 taken 71728 times.
71728 make_read_initiator(void*& cached, epoll_socket_impl* impl)
229 {
230 impl->do_read_io();
231 co_return;
232 143456 }
233
234 write_initiator
235
1/1
✓ Branch 1 taken 71606 times.
71606 make_write_initiator(void*& cached, epoll_socket_impl* impl)
236 {
237 impl->do_write_io();
238 co_return;
239 143212 }
240
241 void
242 71728 epoll_socket_impl::
243 do_read_io()
244 {
245 71728 auto& op = rd_;
246
247 71728 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
248
249
2/2
✓ Branch 0 taken 71554 times.
✓ Branch 1 taken 174 times.
71728 if (n > 0)
250 {
251 71554 desc_data_.read_ready.store(false, std::memory_order_relaxed);
252 71554 op.complete(0, static_cast<std::size_t>(n));
253 71554 svc_.post(&op);
254 71554 return;
255 }
256
257
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 169 times.
174 if (n == 0)
258 {
259 5 desc_data_.read_ready.store(false, std::memory_order_relaxed);
260 5 op.complete(0, 0);
261 5 svc_.post(&op);
262 5 return;
263 }
264
265
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 169 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
169 if (errno == EAGAIN || errno == EWOULDBLOCK)
266 {
267 169 svc_.work_started();
268
269 169 desc_data_.read_op.store(&op, std::memory_order_seq_cst);
270
271
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 169 times.
169 if (desc_data_.read_ready.exchange(false, std::memory_order_seq_cst))
272 {
273 auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
274 if (claimed)
275 {
276 claimed->perform_io();
277 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
278 {
279 claimed->errn = 0;
280 desc_data_.read_op.store(claimed, std::memory_order_release);
281 }
282 else
283 {
284 svc_.post(claimed);
285 svc_.work_finished();
286 }
287 return;
288 }
289 }
290
291
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 169 times.
169 if (op.cancelled.load(std::memory_order_acquire))
292 {
293 auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
294 if (claimed)
295 {
296 svc_.post(claimed);
297 svc_.work_finished();
298 }
299 }
300 169 return;
301 }
302
303 op.complete(errno, 0);
304 svc_.post(&op);
305 }
306
307 void
308 71606 epoll_socket_impl::
309 do_write_io()
310 {
311 71606 auto& op = wr_;
312
313 71606 msghdr msg{};
314 71606 msg.msg_iov = op.iovecs;
315 71606 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
316
317
1/1
✓ Branch 1 taken 71606 times.
71606 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
318
319
2/2
✓ Branch 0 taken 71605 times.
✓ Branch 1 taken 1 time.
71606 if (n > 0)
320 {
321 71605 desc_data_.write_ready.store(false, std::memory_order_relaxed);
322 71605 op.complete(0, static_cast<std::size_t>(n));
323
1/1
✓ Branch 1 taken 71605 times.
71605 svc_.post(&op);
324 71605 return;
325 }
326
327
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
328 {
329 svc_.work_started();
330
331 desc_data_.write_op.store(&op, std::memory_order_seq_cst);
332
333 if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst))
334 {
335 auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel);
336 if (claimed)
337 {
338 claimed->perform_io();
339 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
340 {
341 claimed->errn = 0;
342 desc_data_.write_op.store(claimed, std::memory_order_release);
343 }
344 else
345 {
346 svc_.post(claimed);
347 svc_.work_finished();
348 }
349 return;
350 }
351 }
352
353 if (op.cancelled.load(std::memory_order_acquire))
354 {
355 auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel);
356 if (claimed)
357 {
358 svc_.post(claimed);
359 svc_.work_finished();
360 }
361 }
362 return;
363 }
364
365
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
366
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
367 }
368
369 std::coroutine_handle<>
370 71729 epoll_socket_impl::
371 read_some(
372 std::coroutine_handle<> h,
373 capy::executor_ref ex,
374 io_buffer_param param,
375 std::stop_token token,
376 std::error_code* ec,
377 std::size_t* bytes_out)
378 {
379 71729 auto& op = rd_;
380 71729 op.reset();
381 71729 op.h = h;
382 71729 op.ex = ex;
383 71729 op.ec_out = ec;
384 71729 op.bytes_out = bytes_out;
385 71729 op.fd = fd_;
386 71729 op.start(token, this);
387
1/1
✓ Branch 1 taken 71729 times.
71729 op.impl_ptr = shared_from_this();
388
389 // Must prepare buffers before initiator runs
390 71729 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
391 71729 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
392
393
6/8
✓ Branch 0 taken 71728 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 71728 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 71728 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 71728 times.
71729 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
394 {
395 1 op.empty_buffer_read = true;
396 1 op.complete(0, 0);
397
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
398 1 return std::noop_coroutine();
399 }
400
401
2/2
✓ Branch 0 taken 71728 times.
✓ Branch 1 taken 71728 times.
143456 for (int i = 0; i < op.iovec_count; ++i)
402 {
403 71728 op.iovecs[i].iov_base = bufs[i].data();
404 71728 op.iovecs[i].iov_len = bufs[i].size();
405 }
406
407
2/2
✓ Branch 1 taken 71688 times.
✓ Branch 2 taken 40 times.
71728 if (read_initiator_handle_)
408
1/1
✓ Branch 1 taken 71688 times.
71688 read_initiator_handle_.destroy();
409
410
1/1
✓ Branch 1 taken 71728 times.
71728 auto initiator = make_read_initiator(read_initiator_frame_, this);
411 71728 read_initiator_handle_ = initiator.h;
412
413 // Symmetric transfer ensures caller is suspended before I/O starts
414 71728 return initiator.h;
415 }
416
417 std::coroutine_handle<>
418 71607 epoll_socket_impl::
419 write_some(
420 std::coroutine_handle<> h,
421 capy::executor_ref ex,
422 io_buffer_param param,
423 std::stop_token token,
424 std::error_code* ec,
425 std::size_t* bytes_out)
426 {
427 71607 auto& op = wr_;
428 71607 op.reset();
429 71607 op.h = h;
430 71607 op.ex = ex;
431 71607 op.ec_out = ec;
432 71607 op.bytes_out = bytes_out;
433 71607 op.fd = fd_;
434 71607 op.start(token, this);
435
1/1
✓ Branch 1 taken 71607 times.
71607 op.impl_ptr = shared_from_this();
436
437 // Must prepare buffers before initiator runs
438 71607 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
439 71607 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
440
441
6/8
✓ Branch 0 taken 71606 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 71606 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 71606 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 71606 times.
71607 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
442 {
443 1 op.complete(0, 0);
444
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
445 1 return std::noop_coroutine();
446 }
447
448
2/2
✓ Branch 0 taken 71606 times.
✓ Branch 1 taken 71606 times.
143212 for (int i = 0; i < op.iovec_count; ++i)
449 {
450 71606 op.iovecs[i].iov_base = bufs[i].data();
451 71606 op.iovecs[i].iov_len = bufs[i].size();
452 }
453
454
2/2
✓ Branch 1 taken 71568 times.
✓ Branch 2 taken 38 times.
71606 if (write_initiator_handle_)
455
1/1
✓ Branch 1 taken 71568 times.
71568 write_initiator_handle_.destroy();
456
457
1/1
✓ Branch 1 taken 71606 times.
71606 auto initiator = make_write_initiator(write_initiator_frame_, this);
458 71606 write_initiator_handle_ = initiator.h;
459
460 // Symmetric transfer ensures caller is suspended before I/O starts
461 71606 return initiator.h;
462 }
463
464 std::error_code
465 3 epoll_socket_impl::
466 shutdown(tcp_socket::shutdown_type what) noexcept
467 {
468 int how;
469
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
470 {
471 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
472 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
473 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
474 default:
475 return make_err(EINVAL);
476 }
477
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
478 return make_err(errno);
479 3 return {};
480 }
481
482 std::error_code
483 5 epoll_socket_impl::
484 set_no_delay(bool value) noexcept
485 {
486
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
487
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
488 return make_err(errno);
489 5 return {};
490 }
491
492 bool
493 5 epoll_socket_impl::
494 no_delay(std::error_code& ec) const noexcept
495 {
496 5 int flag = 0;
497 5 socklen_t len = sizeof(flag);
498
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
499 {
500 ec = make_err(errno);
501 return false;
502 }
503 5 ec = {};
504 5 return flag != 0;
505 }
506
507 std::error_code
508 4 epoll_socket_impl::
509 set_keep_alive(bool value) noexcept
510 {
511
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
512
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
513 return make_err(errno);
514 4 return {};
515 }
516
517 bool
518 4 epoll_socket_impl::
519 keep_alive(std::error_code& ec) const noexcept
520 {
521 4 int flag = 0;
522 4 socklen_t len = sizeof(flag);
523
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
524 {
525 ec = make_err(errno);
526 return false;
527 }
528 4 ec = {};
529 4 return flag != 0;
530 }
531
532 std::error_code
533 1 epoll_socket_impl::
534 set_receive_buffer_size(int size) noexcept
535 {
536
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
537 return make_err(errno);
538 1 return {};
539 }
540
541 int
542 3 epoll_socket_impl::
543 receive_buffer_size(std::error_code& ec) const noexcept
544 {
545 3 int size = 0;
546 3 socklen_t len = sizeof(size);
547
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
548 {
549 ec = make_err(errno);
550 return 0;
551 }
552 3 ec = {};
553 3 return size;
554 }
555
556 std::error_code
557 1 epoll_socket_impl::
558 set_send_buffer_size(int size) noexcept
559 {
560
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
561 return make_err(errno);
562 1 return {};
563 }
564
565 int
566 3 epoll_socket_impl::
567 send_buffer_size(std::error_code& ec) const noexcept
568 {
569 3 int size = 0;
570 3 socklen_t len = sizeof(size);
571
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
572 {
573 ec = make_err(errno);
574 return 0;
575 }
576 3 ec = {};
577 3 return size;
578 }
579
580 std::error_code
581 4 epoll_socket_impl::
582 set_linger(bool enabled, int timeout) noexcept
583 {
584
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
585 1 return make_err(EINVAL);
586 struct ::linger lg;
587
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
588 3 lg.l_linger = timeout;
589
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
590 return make_err(errno);
591 3 return {};
592 }
593
594 tcp_socket::linger_options
595 3 epoll_socket_impl::
596 linger(std::error_code& ec) const noexcept
597 {
598 3 struct ::linger lg{};
599 3 socklen_t len = sizeof(lg);
600
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
601 {
602 ec = make_err(errno);
603 return {};
604 }
605 3 ec = {};
606 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
607 }
608
609 void
610 7949 epoll_socket_impl::
611 cancel() noexcept
612 {
613 7949 std::shared_ptr<epoll_socket_impl> self;
614 try {
615
1/1
✓ Branch 1 taken 7949 times.
7949 self = shared_from_this();
616 } catch (const std::bad_weak_ptr&) {
617 return;
618 }
619
620 // Use atomic exchange to claim operations - only one of cancellation
621 // or reactor will succeed
622 23847 auto cancel_atomic_op = [this, &self](epoll_op& op, std::atomic<epoll_op*>& desc_op_ptr) {
623 23847 op.request_cancel();
624 23847 auto* claimed = desc_op_ptr.exchange(nullptr, std::memory_order_acq_rel);
625
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 23796 times.
23847 if (claimed == &op)
626 {
627 51 op.impl_ptr = self;
628 51 svc_.post(&op);
629 51 svc_.work_finished();
630 }
631 31796 };
632
633 7949 cancel_atomic_op(conn_, desc_data_.connect_op);
634 7949 cancel_atomic_op(rd_, desc_data_.read_op);
635 7949 cancel_atomic_op(wr_, desc_data_.write_op);
636 7949 }
637
638 void
639 100 epoll_socket_impl::
640 cancel_single_op(epoll_op& op) noexcept
641 {
642 100 op.request_cancel();
643
644 100 std::atomic<epoll_op*>* desc_op_ptr = nullptr;
645
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 100 times.
100 if (&op == &conn_) desc_op_ptr = &desc_data_.connect_op;
646
2/2
✓ Branch 0 taken 99 times.
✓ Branch 1 taken 1 time.
100 else if (&op == &rd_) desc_op_ptr = &desc_data_.read_op;
647
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 else if (&op == &wr_) desc_op_ptr = &desc_data_.write_op;
648
649
1/2
✓ Branch 0 taken 100 times.
✗ Branch 1 not taken.
100 if (desc_op_ptr)
650 {
651 // Use atomic exchange - only one of cancellation or reactor will succeed
652 100 auto* claimed = desc_op_ptr->exchange(nullptr, std::memory_order_acq_rel);
653
2/2
✓ Branch 0 taken 67 times.
✓ Branch 1 taken 33 times.
100 if (claimed == &op)
654 {
655 try {
656
1/1
✓ Branch 1 taken 67 times.
67 op.impl_ptr = shared_from_this();
657 } catch (const std::bad_weak_ptr&) {}
658 67 svc_.post(&op);
659 67 svc_.work_finished();
660 }
661 }
662 100 }
663
664 void
665 7853 epoll_socket_impl::
666 close_socket() noexcept
667 {
668 7853 cancel();
669
670
2/2
✓ Branch 0 taken 5231 times.
✓ Branch 1 taken 2622 times.
7853 if (fd_ >= 0)
671 {
672
1/2
✓ Branch 0 taken 5231 times.
✗ Branch 1 not taken.
5231 if (desc_data_.registered_events != 0)
673 5231 svc_.scheduler().deregister_descriptor(fd_);
674 5231 ::close(fd_);
675 5231 fd_ = -1;
676 }
677
678 7853 desc_data_.fd = -1;
679 7853 desc_data_.is_registered = false;
680 7853 desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
681 7853 desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
682 7853 desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
683 7853 desc_data_.read_ready.store(false, std::memory_order_relaxed);
684 7853 desc_data_.write_ready.store(false, std::memory_order_relaxed);
685 7853 desc_data_.registered_events = 0;
686
687 7853 local_endpoint_ = endpoint{};
688 7853 remote_endpoint_ = endpoint{};
689 7853 }
690
691 189 epoll_socket_service::
692 189 epoll_socket_service(capy::execution_context& ctx)
693
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
694 {
695 189 }
696
697 378 epoll_socket_service::
698 189 ~epoll_socket_service()
699 {
700 378 }
701
702 void
703 189 epoll_socket_service::
704 shutdown()
705 {
706
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
707
708
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->socket_list_.pop_front())
709 impl->close_socket();
710
711 189 state_->socket_ptrs_.clear();
712 189 }
713
714 tcp_socket::socket_impl&
715 5231 epoll_socket_service::
716 create_impl()
717 {
718
1/1
✓ Branch 1 taken 5231 times.
5231 auto impl = std::make_shared<epoll_socket_impl>(*this);
719 5231 auto* raw = impl.get();
720
721 {
722
1/1
✓ Branch 2 taken 5231 times.
5231 std::lock_guard lock(state_->mutex_);
723 5231 state_->socket_list_.push_back(raw);
724
1/1
✓ Branch 3 taken 5231 times.
5231 state_->socket_ptrs_.emplace(raw, std::move(impl));
725 5231 }
726
727 5231 return *raw;
728 5231 }
729
730 void
731 5231 epoll_socket_service::
732 destroy_impl(tcp_socket::socket_impl& impl)
733 {
734 5231 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
735
1/1
✓ Branch 2 taken 5231 times.
5231 std::lock_guard lock(state_->mutex_);
736 5231 state_->socket_list_.remove(epoll_impl);
737
1/1
✓ Branch 2 taken 5231 times.
5231 state_->socket_ptrs_.erase(epoll_impl);
738 5231 }
739
740 std::error_code
741 2622 epoll_socket_service::
742 open_socket(tcp_socket::socket_impl& impl)
743 {
744 2622 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
745 2622 epoll_impl->close_socket();
746
747 2622 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2622 times.
2622 if (fd < 0)
749 return make_err(errno);
750
751 2622 epoll_impl->fd_ = fd;
752
753 // Register fd with epoll (edge-triggered mode)
754 2622 epoll_impl->desc_data_.fd = fd;
755 2622 epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
756 2622 epoll_impl->desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
757 2622 epoll_impl->desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
758 2622 scheduler().register_descriptor(fd, &epoll_impl->desc_data_);
759
760 2622 return {};
761 }
762
763 void
764 143328 epoll_socket_service::
765 post(epoll_op* op)
766 {
767 143328 state_->sched_.post(op);
768 143328 }
769
770 void
771 2780 epoll_socket_service::
772 work_started() noexcept
773 {
774 2780 state_->sched_.work_started();
775 2780 }
776
777 void
778 161 epoll_socket_service::
779 work_finished() noexcept
780 {
781 161 state_->sched_.work_finished();
782 161 }
783
784 } // namespace boost::corosio::detail
785
786 #endif // BOOST_COROSIO_HAS_EPOLL
787