LCOV - code coverage report
Current view: top level - src/detail/epoll - sockets.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 78.4 % 402 315
Test Date: 2026-02-04 19:19:32 Functions: 92.9 % 42 39

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_EPOLL
      13              : 
      14              : #include "src/detail/epoll/sockets.hpp"
      15              : #include "src/detail/endpoint_convert.hpp"
      16              : #include "src/detail/make_err.hpp"
      17              : #include "src/detail/resume_coro.hpp"
      18              : 
      19              : #include <boost/corosio/detail/except.hpp>
      20              : #include <boost/capy/buffers.hpp>
      21              : 
      22              : #include <errno.h>
      23              : #include <netinet/in.h>
      24              : #include <netinet/tcp.h>
      25              : #include <sys/epoll.h>
      26              : #include <sys/socket.h>
      27              : #include <unistd.h>
      28              : 
      29              : namespace boost::corosio::detail {
      30              : 
      31              : void
      32          106 : epoll_op::canceller::
      33              : operator()() const noexcept
      34              : {
      35          106 :     op->cancel();
      36          106 : }
      37              : 
      38              : void
      39            0 : epoll_connect_op::
      40              : cancel() noexcept
      41              : {
      42            0 :     if (socket_impl_)
      43            0 :         socket_impl_->cancel_single_op(*this);
      44              :     else
      45            0 :         request_cancel();
      46            0 : }
      47              : 
      48              : void
      49           99 : epoll_read_op::
      50              : cancel() noexcept
      51              : {
      52           99 :     if (socket_impl_)
      53           99 :         socket_impl_->cancel_single_op(*this);
      54              :     else
      55            0 :         request_cancel();
      56           99 : }
      57              : 
      58              : void
      59            1 : epoll_write_op::
      60              : cancel() noexcept
      61              : {
      62            1 :     if (socket_impl_)
      63            1 :         socket_impl_->cancel_single_op(*this);
      64              :     else
      65            0 :         request_cancel();
      66            1 : }
      67              : 
      68              : void
      69         2611 : epoll_connect_op::
      70              : operator()()
      71              : {
      72         2611 :     stop_cb.reset();
      73              : 
      74         2611 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
      75              : 
      76              :     // Cache endpoints on successful connect
      77         2611 :     if (success && socket_impl_)
      78              :     {
      79              :         // Query local endpoint via getsockname (may fail, but remote is always known)
      80         2609 :         endpoint local_ep;
      81         2609 :         sockaddr_in local_addr{};
      82         2609 :         socklen_t local_len = sizeof(local_addr);
      83         2609 :         if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
      84         2609 :             local_ep = from_sockaddr_in(local_addr);
      85              :         // Always cache remote endpoint; local may be default if getsockname failed
      86         2609 :         static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
      87              :     }
      88              : 
      89         2611 :     if (ec_out)
      90              :     {
      91         2611 :         if (cancelled.load(std::memory_order_acquire))
      92            0 :             *ec_out = capy::error::canceled;
      93         2611 :         else if (errn != 0)
      94            2 :             *ec_out = make_err(errn);
      95              :         else
      96         2609 :             *ec_out = {};
      97              :     }
      98              : 
      99         2611 :     if (bytes_out)
     100            0 :         *bytes_out = bytes_transferred;
     101              : 
     102              :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     103         2611 :     capy::executor_ref saved_ex( std::move( ex ) );
     104         2611 :     capy::coro saved_h( std::move( h ) );
     105         2611 :     auto prevent_premature_destruction = std::move(impl_ptr);
     106         2611 :     resume_coro(saved_ex, saved_h);
     107         2611 : }
     108              : 
     109         5231 : epoll_socket_impl::
     110         5231 : epoll_socket_impl(epoll_socket_service& svc) noexcept
     111         5231 :     : svc_(svc)
     112              : {
     113         5231 : }
     114              : 
     115         5231 : epoll_socket_impl::
     116         5231 : ~epoll_socket_impl()
     117              : {
     118         5231 :     if (read_initiator_handle_)
     119           40 :         read_initiator_handle_.destroy();
     120         5231 :     if (write_initiator_handle_)
     121           38 :         write_initiator_handle_.destroy();
     122              : 
     123              :     // promise_type::operator delete is no-op, so free here
     124         5231 :     if (read_initiator_frame_)
     125           40 :         ::operator delete(read_initiator_frame_);
     126         5231 :     if (write_initiator_frame_)
     127           38 :         ::operator delete(write_initiator_frame_);
     128         5231 : }
     129              : 
     130              : void
     131            0 : epoll_socket_impl::
     132              : update_epoll_events() noexcept
     133              : {
     134              :     // With EPOLLET, update_descriptor_events just provides a memory fence
     135            0 :     svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0);
     136            0 : }
     137              : 
     138              : void
     139         5231 : epoll_socket_impl::
     140              : release()
     141              : {
     142         5231 :     close_socket();
     143         5231 :     svc_.destroy_impl(*this);
     144         5231 : }
     145              : 
     146              : void
     147         2611 : epoll_socket_impl::
     148              : connect(
     149              :     std::coroutine_handle<> h,
     150              :     capy::executor_ref ex,
     151              :     endpoint ep,
     152              :     std::stop_token token,
     153              :     std::error_code* ec)
     154              : {
     155         2611 :     auto& op = conn_;
     156         2611 :     op.reset();
     157         2611 :     op.h = h;
     158         2611 :     op.ex = ex;
     159         2611 :     op.ec_out = ec;
     160         2611 :     op.fd = fd_;
     161         2611 :     op.target_endpoint = ep;  // Store target for endpoint caching
     162         2611 :     op.start(token, this);
     163              : 
     164         2611 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     165         2611 :     int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
     166              : 
     167         2611 :     if (result == 0)
     168              :     {
     169              :         // Sync success - cache endpoints immediately
     170              :         // Remote is always known; local may fail but we still cache remote
     171            0 :         sockaddr_in local_addr{};
     172            0 :         socklen_t local_len = sizeof(local_addr);
     173            0 :         if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     174            0 :             local_endpoint_ = detail::from_sockaddr_in(local_addr);
     175            0 :         remote_endpoint_ = ep;
     176              : 
     177            0 :         op.complete(0, 0);
     178            0 :         op.impl_ptr = shared_from_this();
     179            0 :         svc_.post(&op);
     180            0 :         return;
     181              :     }
     182              : 
     183         2611 :     if (errno == EINPROGRESS)
     184              :     {
     185         2611 :         svc_.work_started();
     186         2611 :         op.impl_ptr = shared_from_this();
     187              : 
     188         2611 :         desc_data_.connect_op.store(&op, std::memory_order_seq_cst);
     189              : 
     190         2611 :         if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst))
     191              :         {
     192           43 :             auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel);
     193           43 :             if (claimed)
     194              :             {
     195           43 :                 claimed->perform_io();
     196           43 :                 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
     197              :                 {
     198            0 :                     claimed->errn = 0;
     199            0 :                     desc_data_.connect_op.store(claimed, std::memory_order_release);
     200              :                 }
     201              :                 else
     202              :                 {
     203           43 :                     svc_.post(claimed);
     204           43 :                     svc_.work_finished();
     205              :                 }
     206           43 :                 return;
     207              :             }
     208              :         }
     209              : 
     210         2568 :         if (op.cancelled.load(std::memory_order_acquire))
     211              :         {
     212            0 :             auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel);
     213            0 :             if (claimed)
     214              :             {
     215            0 :                 svc_.post(claimed);
     216            0 :                 svc_.work_finished();
     217              :             }
     218              :         }
     219         2568 :         return;
     220              :     }
     221              : 
     222            0 :     op.complete(errno, 0);
     223            0 :     op.impl_ptr = shared_from_this();
     224            0 :     svc_.post(&op);
     225              : }
     226              : 
     227              : read_initiator
     228        71728 : make_read_initiator(void*& cached, epoll_socket_impl* impl)
     229              : {
     230              :     impl->do_read_io();
     231              :     co_return;
     232       143456 : }
     233              : 
     234              : write_initiator
     235        71606 : make_write_initiator(void*& cached, epoll_socket_impl* impl)
     236              : {
     237              :     impl->do_write_io();
     238              :     co_return;
     239       143212 : }
     240              : 
     241              : void
     242        71728 : epoll_socket_impl::
     243              : do_read_io()
     244              : {
     245        71728 :     auto& op = rd_;
     246              : 
     247        71728 :     ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
     248              : 
     249        71728 :     if (n > 0)
     250              :     {
     251        71554 :         desc_data_.read_ready.store(false, std::memory_order_relaxed);
     252        71554 :         op.complete(0, static_cast<std::size_t>(n));
     253        71554 :         svc_.post(&op);
     254        71554 :         return;
     255              :     }
     256              : 
     257          174 :     if (n == 0)
     258              :     {
     259            5 :         desc_data_.read_ready.store(false, std::memory_order_relaxed);
     260            5 :         op.complete(0, 0);
     261            5 :         svc_.post(&op);
     262            5 :         return;
     263              :     }
     264              : 
     265          169 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     266              :     {
     267          169 :         svc_.work_started();
     268              : 
     269          169 :         desc_data_.read_op.store(&op, std::memory_order_seq_cst);
     270              : 
     271          169 :         if (desc_data_.read_ready.exchange(false, std::memory_order_seq_cst))
     272              :         {
     273            0 :             auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     274            0 :             if (claimed)
     275              :             {
     276            0 :                 claimed->perform_io();
     277            0 :                 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
     278              :                 {
     279            0 :                     claimed->errn = 0;
     280            0 :                     desc_data_.read_op.store(claimed, std::memory_order_release);
     281              :                 }
     282              :                 else
     283              :                 {
     284            0 :                     svc_.post(claimed);
     285            0 :                     svc_.work_finished();
     286              :                 }
     287            0 :                 return;
     288              :             }
     289              :         }
     290              : 
     291          169 :         if (op.cancelled.load(std::memory_order_acquire))
     292              :         {
     293            0 :             auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     294            0 :             if (claimed)
     295              :             {
     296            0 :                 svc_.post(claimed);
     297            0 :                 svc_.work_finished();
     298              :             }
     299              :         }
     300          169 :         return;
     301              :     }
     302              : 
     303            0 :     op.complete(errno, 0);
     304            0 :     svc_.post(&op);
     305              : }
     306              : 
     307              : void
     308        71606 : epoll_socket_impl::
     309              : do_write_io()
     310              : {
     311        71606 :     auto& op = wr_;
     312              : 
     313        71606 :     msghdr msg{};
     314        71606 :     msg.msg_iov = op.iovecs;
     315        71606 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     316              : 
     317        71606 :     ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     318              : 
     319        71606 :     if (n > 0)
     320              :     {
     321        71605 :         desc_data_.write_ready.store(false, std::memory_order_relaxed);
     322        71605 :         op.complete(0, static_cast<std::size_t>(n));
     323        71605 :         svc_.post(&op);
     324        71605 :         return;
     325              :     }
     326              : 
     327            1 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     328              :     {
     329            0 :         svc_.work_started();
     330              : 
     331            0 :         desc_data_.write_op.store(&op, std::memory_order_seq_cst);
     332              : 
     333            0 :         if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst))
     334              :         {
     335            0 :             auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel);
     336            0 :             if (claimed)
     337              :             {
     338            0 :                 claimed->perform_io();
     339            0 :                 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
     340              :                 {
     341            0 :                     claimed->errn = 0;
     342            0 :                     desc_data_.write_op.store(claimed, std::memory_order_release);
     343              :                 }
     344              :                 else
     345              :                 {
     346            0 :                     svc_.post(claimed);
     347            0 :                     svc_.work_finished();
     348              :                 }
     349            0 :                 return;
     350              :             }
     351              :         }
     352              : 
     353            0 :         if (op.cancelled.load(std::memory_order_acquire))
     354              :         {
     355            0 :             auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel);
     356            0 :             if (claimed)
     357              :             {
     358            0 :                 svc_.post(claimed);
     359            0 :                 svc_.work_finished();
     360              :             }
     361              :         }
     362            0 :         return;
     363              :     }
     364              : 
     365            1 :     op.complete(errno ? errno : EIO, 0);
     366            1 :     svc_.post(&op);
     367              : }
     368              : 
     369              : std::coroutine_handle<>
     370        71729 : epoll_socket_impl::
     371              : read_some(
     372              :     std::coroutine_handle<> h,
     373              :     capy::executor_ref ex,
     374              :     io_buffer_param param,
     375              :     std::stop_token token,
     376              :     std::error_code* ec,
     377              :     std::size_t* bytes_out)
     378              : {
     379        71729 :     auto& op = rd_;
     380        71729 :     op.reset();
     381        71729 :     op.h = h;
     382        71729 :     op.ex = ex;
     383        71729 :     op.ec_out = ec;
     384        71729 :     op.bytes_out = bytes_out;
     385        71729 :     op.fd = fd_;
     386        71729 :     op.start(token, this);
     387        71729 :     op.impl_ptr = shared_from_this();
     388              : 
     389              :     // Must prepare buffers before initiator runs
     390        71729 :     capy::mutable_buffer bufs[epoll_read_op::max_buffers];
     391        71729 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
     392              : 
     393        71729 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     394              :     {
     395            1 :         op.empty_buffer_read = true;
     396            1 :         op.complete(0, 0);
     397            1 :         svc_.post(&op);
     398            1 :         return std::noop_coroutine();
     399              :     }
     400              : 
     401       143456 :     for (int i = 0; i < op.iovec_count; ++i)
     402              :     {
     403        71728 :         op.iovecs[i].iov_base = bufs[i].data();
     404        71728 :         op.iovecs[i].iov_len = bufs[i].size();
     405              :     }
     406              : 
     407        71728 :     if (read_initiator_handle_)
     408        71688 :         read_initiator_handle_.destroy();
     409              : 
     410        71728 :     auto initiator = make_read_initiator(read_initiator_frame_, this);
     411        71728 :     read_initiator_handle_ = initiator.h;
     412              : 
     413              :     // Symmetric transfer ensures caller is suspended before I/O starts
     414        71728 :     return initiator.h;
     415              : }
     416              : 
     417              : std::coroutine_handle<>
     418        71607 : epoll_socket_impl::
     419              : write_some(
     420              :     std::coroutine_handle<> h,
     421              :     capy::executor_ref ex,
     422              :     io_buffer_param param,
     423              :     std::stop_token token,
     424              :     std::error_code* ec,
     425              :     std::size_t* bytes_out)
     426              : {
     427        71607 :     auto& op = wr_;
     428        71607 :     op.reset();
     429        71607 :     op.h = h;
     430        71607 :     op.ex = ex;
     431        71607 :     op.ec_out = ec;
     432        71607 :     op.bytes_out = bytes_out;
     433        71607 :     op.fd = fd_;
     434        71607 :     op.start(token, this);
     435        71607 :     op.impl_ptr = shared_from_this();
     436              : 
     437              :     // Must prepare buffers before initiator runs
     438        71607 :     capy::mutable_buffer bufs[epoll_write_op::max_buffers];
     439        71607 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
     440              : 
     441        71607 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     442              :     {
     443            1 :         op.complete(0, 0);
     444            1 :         svc_.post(&op);
     445            1 :         return std::noop_coroutine();
     446              :     }
     447              : 
     448       143212 :     for (int i = 0; i < op.iovec_count; ++i)
     449              :     {
     450        71606 :         op.iovecs[i].iov_base = bufs[i].data();
     451        71606 :         op.iovecs[i].iov_len = bufs[i].size();
     452              :     }
     453              : 
     454        71606 :     if (write_initiator_handle_)
     455        71568 :         write_initiator_handle_.destroy();
     456              : 
     457        71606 :     auto initiator = make_write_initiator(write_initiator_frame_, this);
     458        71606 :     write_initiator_handle_ = initiator.h;
     459              : 
     460              :     // Symmetric transfer ensures caller is suspended before I/O starts
     461        71606 :     return initiator.h;
     462              : }
     463              : 
     464              : std::error_code
     465            3 : epoll_socket_impl::
     466              : shutdown(tcp_socket::shutdown_type what) noexcept
     467              : {
     468              :     int how;
     469            3 :     switch (what)
     470              :     {
     471            1 :     case tcp_socket::shutdown_receive: how = SHUT_RD;   break;
     472            1 :     case tcp_socket::shutdown_send:    how = SHUT_WR;   break;
     473            1 :     case tcp_socket::shutdown_both:    how = SHUT_RDWR; break;
     474            0 :     default:
     475            0 :         return make_err(EINVAL);
     476              :     }
     477            3 :     if (::shutdown(fd_, how) != 0)
     478            0 :         return make_err(errno);
     479            3 :     return {};
     480              : }
     481              : 
     482              : std::error_code
     483            5 : epoll_socket_impl::
     484              : set_no_delay(bool value) noexcept
     485              : {
     486            5 :     int flag = value ? 1 : 0;
     487            5 :     if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
     488            0 :         return make_err(errno);
     489            5 :     return {};
     490              : }
     491              : 
     492              : bool
     493            5 : epoll_socket_impl::
     494              : no_delay(std::error_code& ec) const noexcept
     495              : {
     496            5 :     int flag = 0;
     497            5 :     socklen_t len = sizeof(flag);
     498            5 :     if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
     499              :     {
     500            0 :         ec = make_err(errno);
     501            0 :         return false;
     502              :     }
     503            5 :     ec = {};
     504            5 :     return flag != 0;
     505              : }
     506              : 
     507              : std::error_code
     508            4 : epoll_socket_impl::
     509              : set_keep_alive(bool value) noexcept
     510              : {
     511            4 :     int flag = value ? 1 : 0;
     512            4 :     if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
     513            0 :         return make_err(errno);
     514            4 :     return {};
     515              : }
     516              : 
     517              : bool
     518            4 : epoll_socket_impl::
     519              : keep_alive(std::error_code& ec) const noexcept
     520              : {
     521            4 :     int flag = 0;
     522            4 :     socklen_t len = sizeof(flag);
     523            4 :     if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
     524              :     {
     525            0 :         ec = make_err(errno);
     526            0 :         return false;
     527              :     }
     528            4 :     ec = {};
     529            4 :     return flag != 0;
     530              : }
     531              : 
     532              : std::error_code
     533            1 : epoll_socket_impl::
     534              : set_receive_buffer_size(int size) noexcept
     535              : {
     536            1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
     537            0 :         return make_err(errno);
     538            1 :     return {};
     539              : }
     540              : 
     541              : int
     542            3 : epoll_socket_impl::
     543              : receive_buffer_size(std::error_code& ec) const noexcept
     544              : {
     545            3 :     int size = 0;
     546            3 :     socklen_t len = sizeof(size);
     547            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
     548              :     {
     549            0 :         ec = make_err(errno);
     550            0 :         return 0;
     551              :     }
     552            3 :     ec = {};
     553            3 :     return size;
     554              : }
     555              : 
     556              : std::error_code
     557            1 : epoll_socket_impl::
     558              : set_send_buffer_size(int size) noexcept
     559              : {
     560            1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
     561            0 :         return make_err(errno);
     562            1 :     return {};
     563              : }
     564              : 
     565              : int
     566            3 : epoll_socket_impl::
     567              : send_buffer_size(std::error_code& ec) const noexcept
     568              : {
     569            3 :     int size = 0;
     570            3 :     socklen_t len = sizeof(size);
     571            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
     572              :     {
     573            0 :         ec = make_err(errno);
     574            0 :         return 0;
     575              :     }
     576            3 :     ec = {};
     577            3 :     return size;
     578              : }
     579              : 
     580              : std::error_code
     581            4 : epoll_socket_impl::
     582              : set_linger(bool enabled, int timeout) noexcept
     583              : {
     584            4 :     if (timeout < 0)
     585            1 :         return make_err(EINVAL);
     586              :     struct ::linger lg;
     587            3 :     lg.l_onoff = enabled ? 1 : 0;
     588            3 :     lg.l_linger = timeout;
     589            3 :     if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
     590            0 :         return make_err(errno);
     591            3 :     return {};
     592              : }
     593              : 
     594              : tcp_socket::linger_options
     595            3 : epoll_socket_impl::
     596              : linger(std::error_code& ec) const noexcept
     597              : {
     598            3 :     struct ::linger lg{};
     599            3 :     socklen_t len = sizeof(lg);
     600            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
     601              :     {
     602            0 :         ec = make_err(errno);
     603            0 :         return {};
     604              :     }
     605            3 :     ec = {};
     606            3 :     return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
     607              : }
     608              : 
     609              : void
     610         7949 : epoll_socket_impl::
     611              : cancel() noexcept
     612              : {
     613         7949 :     std::shared_ptr<epoll_socket_impl> self;
     614              :     try {
     615         7949 :         self = shared_from_this();
     616            0 :     } catch (const std::bad_weak_ptr&) {
     617            0 :         return;
     618            0 :     }
     619              : 
     620              :     // Use atomic exchange to claim operations - only one of cancellation
     621              :     // or reactor will succeed
     622        23847 :     auto cancel_atomic_op = [this, &self](epoll_op& op, std::atomic<epoll_op*>& desc_op_ptr) {
     623        23847 :         op.request_cancel();
     624        23847 :         auto* claimed = desc_op_ptr.exchange(nullptr, std::memory_order_acq_rel);
     625        23847 :         if (claimed == &op)
     626              :         {
     627           51 :             op.impl_ptr = self;
     628           51 :             svc_.post(&op);
     629           51 :             svc_.work_finished();
     630              :         }
     631        31796 :     };
     632              : 
     633         7949 :     cancel_atomic_op(conn_, desc_data_.connect_op);
     634         7949 :     cancel_atomic_op(rd_, desc_data_.read_op);
     635         7949 :     cancel_atomic_op(wr_, desc_data_.write_op);
     636         7949 : }
     637              : 
     638              : void
     639          100 : epoll_socket_impl::
     640              : cancel_single_op(epoll_op& op) noexcept
     641              : {
     642          100 :     op.request_cancel();
     643              : 
     644          100 :     std::atomic<epoll_op*>* desc_op_ptr = nullptr;
     645          100 :     if (&op == &conn_) desc_op_ptr = &desc_data_.connect_op;
     646          100 :     else if (&op == &rd_) desc_op_ptr = &desc_data_.read_op;
     647            1 :     else if (&op == &wr_) desc_op_ptr = &desc_data_.write_op;
     648              : 
     649          100 :     if (desc_op_ptr)
     650              :     {
     651              :         // Use atomic exchange - only one of cancellation or reactor will succeed
     652          100 :         auto* claimed = desc_op_ptr->exchange(nullptr, std::memory_order_acq_rel);
     653          100 :         if (claimed == &op)
     654              :         {
     655              :             try {
     656           67 :                 op.impl_ptr = shared_from_this();
     657            0 :             } catch (const std::bad_weak_ptr&) {}
     658           67 :             svc_.post(&op);
     659           67 :             svc_.work_finished();
     660              :         }
     661              :     }
     662          100 : }
     663              : 
     664              : void
     665         7853 : epoll_socket_impl::
     666              : close_socket() noexcept
     667              : {
     668         7853 :     cancel();
     669              : 
     670         7853 :     if (fd_ >= 0)
     671              :     {
     672         5231 :         if (desc_data_.registered_events != 0)
     673         5231 :             svc_.scheduler().deregister_descriptor(fd_);
     674         5231 :         ::close(fd_);
     675         5231 :         fd_ = -1;
     676              :     }
     677              : 
     678         7853 :     desc_data_.fd = -1;
     679         7853 :     desc_data_.is_registered = false;
     680         7853 :     desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
     681         7853 :     desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
     682         7853 :     desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
     683         7853 :     desc_data_.read_ready.store(false, std::memory_order_relaxed);
     684         7853 :     desc_data_.write_ready.store(false, std::memory_order_relaxed);
     685         7853 :     desc_data_.registered_events = 0;
     686              : 
     687         7853 :     local_endpoint_ = endpoint{};
     688         7853 :     remote_endpoint_ = endpoint{};
     689         7853 : }
     690              : 
     691          189 : epoll_socket_service::
     692          189 : epoll_socket_service(capy::execution_context& ctx)
     693          189 :     : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
     694              : {
     695          189 : }
     696              : 
     697          378 : epoll_socket_service::
     698          189 : ~epoll_socket_service()
     699              : {
     700          378 : }
     701              : 
     702              : void
     703          189 : epoll_socket_service::
     704              : shutdown()
     705              : {
     706          189 :     std::lock_guard lock(state_->mutex_);
     707              : 
     708          189 :     while (auto* impl = state_->socket_list_.pop_front())
     709            0 :         impl->close_socket();
     710              : 
     711          189 :     state_->socket_ptrs_.clear();
     712          189 : }
     713              : 
     714              : tcp_socket::socket_impl&
     715         5231 : epoll_socket_service::
     716              : create_impl()
     717              : {
     718         5231 :     auto impl = std::make_shared<epoll_socket_impl>(*this);
     719         5231 :     auto* raw = impl.get();
     720              : 
     721              :     {
     722         5231 :         std::lock_guard lock(state_->mutex_);
     723         5231 :         state_->socket_list_.push_back(raw);
     724         5231 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     725         5231 :     }
     726              : 
     727         5231 :     return *raw;
     728         5231 : }
     729              : 
     730              : void
     731         5231 : epoll_socket_service::
     732              : destroy_impl(tcp_socket::socket_impl& impl)
     733              : {
     734         5231 :     auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
     735         5231 :     std::lock_guard lock(state_->mutex_);
     736         5231 :     state_->socket_list_.remove(epoll_impl);
     737         5231 :     state_->socket_ptrs_.erase(epoll_impl);
     738         5231 : }
     739              : 
     740              : std::error_code
     741         2622 : epoll_socket_service::
     742              : open_socket(tcp_socket::socket_impl& impl)
     743              : {
     744         2622 :     auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
     745         2622 :     epoll_impl->close_socket();
     746              : 
     747         2622 :     int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
     748         2622 :     if (fd < 0)
     749            0 :         return make_err(errno);
     750              : 
     751         2622 :     epoll_impl->fd_ = fd;
     752              : 
     753              :     // Register fd with epoll (edge-triggered mode)
     754         2622 :     epoll_impl->desc_data_.fd = fd;
     755         2622 :     epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
     756         2622 :     epoll_impl->desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
     757         2622 :     epoll_impl->desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
     758         2622 :     scheduler().register_descriptor(fd, &epoll_impl->desc_data_);
     759              : 
     760         2622 :     return {};
     761              : }
     762              : 
     763              : void
     764       143328 : epoll_socket_service::
     765              : post(epoll_op* op)
     766              : {
     767       143328 :     state_->sched_.post(op);
     768       143328 : }
     769              : 
     770              : void
     771         2780 : epoll_socket_service::
     772              : work_started() noexcept
     773              : {
     774         2780 :     state_->sched_.work_started();
     775         2780 : }
     776              : 
     777              : void
     778          161 : epoll_socket_service::
     779              : work_finished() noexcept
     780              : {
     781          161 :     state_->sched_.work_finished();
     782          161 : }
     783              : 
     784              : } // namespace boost::corosio::detail
     785              : 
     786              : #endif // BOOST_COROSIO_HAS_EPOLL
        

Generated by: LCOV version 2.3