LCOV - code coverage report
Current view: top level - src/detail/epoll - acceptors.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 79.9 % 219 175
Test Date: 2026-02-04 19:19:32 Functions: 95.0 % 20 19

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_EPOLL
      13              : 
      14              : #include "src/detail/epoll/acceptors.hpp"
      15              : #include "src/detail/epoll/sockets.hpp"
      16              : #include "src/detail/endpoint_convert.hpp"
      17              : #include "src/detail/make_err.hpp"
      18              : 
      19              : #include <errno.h>
      20              : #include <netinet/in.h>
      21              : #include <sys/epoll.h>
      22              : #include <sys/socket.h>
      23              : #include <unistd.h>
      24              : 
      25              : namespace boost::corosio::detail {
      26              : 
      27              : void
      28            6 : epoll_accept_op::
      29              : cancel() noexcept
      30              : {
      31            6 :     if (acceptor_impl_)
      32            6 :         acceptor_impl_->cancel_single_op(*this);
      33              :     else
      34            0 :         request_cancel();
      35            6 : }
      36              : 
      37              : void
      38         2618 : epoll_accept_op::
      39              : operator()()
      40              : {
      41         2618 :     stop_cb.reset();
      42              : 
      43         2618 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
      44              : 
      45         2618 :     if (ec_out)
      46              :     {
      47         2618 :         if (cancelled.load(std::memory_order_acquire))
      48            9 :             *ec_out = capy::error::canceled;
      49         2609 :         else if (errn != 0)
      50            0 :             *ec_out = make_err(errn);
      51              :         else
      52         2609 :             *ec_out = {};
      53              :     }
      54              : 
      55         2618 :     if (success && accepted_fd >= 0)
      56              :     {
      57         2609 :         if (acceptor_impl_)
      58              :         {
      59         2609 :             auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
      60         2609 :                 ->service().socket_service();
      61         2609 :             if (socket_svc)
      62              :             {
      63         2609 :                 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
      64         2609 :                 impl.set_socket(accepted_fd);
      65              : 
      66              :                 // Register accepted socket with epoll (edge-triggered mode)
      67         2609 :                 impl.desc_data_.fd = accepted_fd;
      68         2609 :                 impl.desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
      69         2609 :                 impl.desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
      70         2609 :                 impl.desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
      71         2609 :                 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_data_);
      72              : 
      73         2609 :                 sockaddr_in local_addr{};
      74         2609 :                 socklen_t local_len = sizeof(local_addr);
      75         2609 :                 sockaddr_in remote_addr{};
      76         2609 :                 socklen_t remote_len = sizeof(remote_addr);
      77              : 
      78         2609 :                 endpoint local_ep, remote_ep;
      79         2609 :                 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
      80         2609 :                     local_ep = from_sockaddr_in(local_addr);
      81         2609 :                 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
      82         2609 :                     remote_ep = from_sockaddr_in(remote_addr);
      83              : 
      84         2609 :                 impl.set_endpoints(local_ep, remote_ep);
      85              : 
      86         2609 :                 if (impl_out)
      87         2609 :                     *impl_out = &impl;
      88              : 
      89         2609 :                 accepted_fd = -1;
      90              :             }
      91              :             else
      92              :             {
      93            0 :                 if (ec_out && !*ec_out)
      94            0 :                     *ec_out = make_err(ENOENT);
      95            0 :                 ::close(accepted_fd);
      96            0 :                 accepted_fd = -1;
      97            0 :                 if (impl_out)
      98            0 :                     *impl_out = nullptr;
      99              :             }
     100              :         }
     101              :         else
     102              :         {
     103            0 :             ::close(accepted_fd);
     104            0 :             accepted_fd = -1;
     105            0 :             if (impl_out)
     106            0 :                 *impl_out = nullptr;
     107              :         }
     108         2609 :     }
     109              :     else
     110              :     {
     111            9 :         if (accepted_fd >= 0)
     112              :         {
     113            0 :             ::close(accepted_fd);
     114            0 :             accepted_fd = -1;
     115              :         }
     116              : 
     117            9 :         if (peer_impl)
     118              :         {
     119            0 :             peer_impl->release();
     120            0 :             peer_impl = nullptr;
     121              :         }
     122              : 
     123            9 :         if (impl_out)
     124            9 :             *impl_out = nullptr;
     125              :     }
     126              : 
     127              :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     128         2618 :     capy::executor_ref saved_ex( std::move( ex ) );
     129         2618 :     capy::coro saved_h( std::move( h ) );
     130         2618 :     auto prevent_premature_destruction = std::move(impl_ptr);
     131         2618 :     saved_ex.dispatch( saved_h );
     132         2618 : }
     133              : 
     134           68 : epoll_acceptor_impl::
     135           68 : epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
     136           68 :     : svc_(svc)
     137              : {
     138           68 : }
     139              : 
     140              : void
     141            0 : epoll_acceptor_impl::
     142              : update_epoll_events() noexcept
     143              : {
     144            0 :     svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0);
     145            0 : }
     146              : 
     147              : void
     148           68 : epoll_acceptor_impl::
     149              : release()
     150              : {
     151           68 :     close_socket();
     152           68 :     svc_.destroy_acceptor_impl(*this);
     153           68 : }
     154              : 
     155              : void
     156         2618 : epoll_acceptor_impl::
     157              : accept(
     158              :     std::coroutine_handle<> h,
     159              :     capy::executor_ref ex,
     160              :     std::stop_token token,
     161              :     std::error_code* ec,
     162              :     io_object::io_object_impl** impl_out)
     163              : {
     164         2618 :     auto& op = acc_;
     165         2618 :     op.reset();
     166         2618 :     op.h = h;
     167         2618 :     op.ex = ex;
     168         2618 :     op.ec_out = ec;
     169         2618 :     op.impl_out = impl_out;
     170         2618 :     op.fd = fd_;
     171         2618 :     op.start(token, this);
     172              : 
     173         2618 :     sockaddr_in addr{};
     174         2618 :     socklen_t addrlen = sizeof(addr);
     175         2618 :     int accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
     176              :                              &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
     177              : 
     178         2618 :     if (accepted >= 0)
     179              :     {
     180            2 :         desc_data_.read_ready.store(false, std::memory_order_relaxed);
     181            2 :         op.accepted_fd = accepted;
     182            2 :         op.complete(0, 0);
     183            2 :         op.impl_ptr = shared_from_this();
     184            2 :         svc_.post(&op);
     185         2618 :         return;
     186              :     }
     187              : 
     188         2616 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     189              :     {
     190         2616 :         svc_.work_started();
     191         2616 :         op.impl_ptr = shared_from_this();
     192              : 
     193         2616 :         desc_data_.read_op.store(&op, std::memory_order_release);
     194              :         std::atomic_thread_fence(std::memory_order_seq_cst);
     195              : 
     196         2616 :         if (desc_data_.read_ready.exchange(false, std::memory_order_acquire))
     197              :         {
     198            0 :             auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     199            0 :             if (claimed)
     200              :             {
     201            0 :                 claimed->perform_io();
     202            0 :                 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
     203              :                 {
     204            0 :                     claimed->errn = 0;
     205            0 :                     desc_data_.read_op.store(claimed, std::memory_order_release);
     206              :                 }
     207              :                 else
     208              :                 {
     209            0 :                     svc_.post(claimed);
     210            0 :                     svc_.work_finished();
     211              :                 }
     212            0 :                 return;
     213              :             }
     214              :         }
     215              : 
     216         2616 :         if (op.cancelled.load(std::memory_order_acquire))
     217              :         {
     218            0 :             auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     219            0 :             if (claimed)
     220              :             {
     221            0 :                 svc_.post(claimed);
     222            0 :                 svc_.work_finished();
     223              :             }
     224              :         }
     225         2616 :         return;
     226              :     }
     227              : 
     228            0 :     op.complete(errno, 0);
     229            0 :     op.impl_ptr = shared_from_this();
     230            0 :     svc_.post(&op);
     231              : }
     232              : 
     233              : void
     234          137 : epoll_acceptor_impl::
     235              : cancel() noexcept
     236              : {
     237          137 :     std::shared_ptr<epoll_acceptor_impl> self;
     238              :     try {
     239          137 :         self = shared_from_this();
     240            0 :     } catch (const std::bad_weak_ptr&) {
     241            0 :         return;
     242            0 :     }
     243              : 
     244          137 :     acc_.request_cancel();
     245              :     // Use atomic exchange - only one of cancellation or reactor will succeed
     246          137 :     auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     247          137 :     if (claimed == &acc_)
     248              :     {
     249            3 :         acc_.impl_ptr = self;
     250            3 :         svc_.post(&acc_);
     251            3 :         svc_.work_finished();
     252              :     }
     253          137 : }
     254              : 
     255              : void
     256            6 : epoll_acceptor_impl::
     257              : cancel_single_op(epoll_op& op) noexcept
     258              : {
     259            6 :     op.request_cancel();
     260              : 
     261              :     // Use atomic exchange - only one of cancellation or reactor will succeed
     262            6 :     auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     263            6 :     if (claimed == &op)
     264              :     {
     265              :         try {
     266            6 :             op.impl_ptr = shared_from_this();
     267            0 :         } catch (const std::bad_weak_ptr&) {}
     268            6 :         svc_.post(&op);
     269            6 :         svc_.work_finished();
     270              :     }
     271            6 : }
     272              : 
     273              : void
     274          136 : epoll_acceptor_impl::
     275              : close_socket() noexcept
     276              : {
     277          136 :     cancel();
     278              : 
     279          136 :     if (fd_ >= 0)
     280              :     {
     281           62 :         if (desc_data_.registered_events != 0)
     282           62 :             svc_.scheduler().deregister_descriptor(fd_);
     283           62 :         ::close(fd_);
     284           62 :         fd_ = -1;
     285              :     }
     286              : 
     287          136 :     desc_data_.fd = -1;
     288          136 :     desc_data_.is_registered = false;
     289          136 :     desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
     290          136 :     desc_data_.read_ready.store(false, std::memory_order_relaxed);
     291          136 :     desc_data_.write_ready.store(false, std::memory_order_relaxed);
     292          136 :     desc_data_.registered_events = 0;
     293              : 
     294              :     // Clear cached endpoint
     295          136 :     local_endpoint_ = endpoint{};
     296          136 : }
     297              : 
     298          189 : epoll_acceptor_service::
     299          189 : epoll_acceptor_service(capy::execution_context& ctx)
     300          189 :     : ctx_(ctx)
     301          189 :     , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
     302              : {
     303          189 : }
     304              : 
     305          378 : epoll_acceptor_service::
     306          189 : ~epoll_acceptor_service()
     307              : {
     308          378 : }
     309              : 
     310              : void
     311          189 : epoll_acceptor_service::
     312              : shutdown()
     313              : {
     314          189 :     std::lock_guard lock(state_->mutex_);
     315              : 
     316          189 :     while (auto* impl = state_->acceptor_list_.pop_front())
     317            0 :         impl->close_socket();
     318              : 
     319          189 :     state_->acceptor_ptrs_.clear();
     320          189 : }
     321              : 
     322              : tcp_acceptor::acceptor_impl&
     323           68 : epoll_acceptor_service::
     324              : create_acceptor_impl()
     325              : {
     326           68 :     auto impl = std::make_shared<epoll_acceptor_impl>(*this);
     327           68 :     auto* raw = impl.get();
     328              : 
     329           68 :     std::lock_guard lock(state_->mutex_);
     330           68 :     state_->acceptor_list_.push_back(raw);
     331           68 :     state_->acceptor_ptrs_.emplace(raw, std::move(impl));
     332              : 
     333           68 :     return *raw;
     334           68 : }
     335              : 
     336              : void
     337           68 : epoll_acceptor_service::
     338              : destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
     339              : {
     340           68 :     auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
     341           68 :     std::lock_guard lock(state_->mutex_);
     342           68 :     state_->acceptor_list_.remove(epoll_impl);
     343           68 :     state_->acceptor_ptrs_.erase(epoll_impl);
     344           68 : }
     345              : 
     346              : std::error_code
     347           68 : epoll_acceptor_service::
     348              : open_acceptor(
     349              :     tcp_acceptor::acceptor_impl& impl,
     350              :     endpoint ep,
     351              :     int backlog)
     352              : {
     353           68 :     auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
     354           68 :     epoll_impl->close_socket();
     355              : 
     356           68 :     int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
     357           68 :     if (fd < 0)
     358            0 :         return make_err(errno);
     359              : 
     360           68 :     int reuse = 1;
     361           68 :     ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
     362              : 
     363           68 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     364           68 :     if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
     365              :     {
     366            6 :         int errn = errno;
     367            6 :         ::close(fd);
     368            6 :         return make_err(errn);
     369              :     }
     370              : 
     371           62 :     if (::listen(fd, backlog) < 0)
     372              :     {
     373            0 :         int errn = errno;
     374            0 :         ::close(fd);
     375            0 :         return make_err(errn);
     376              :     }
     377              : 
     378           62 :     epoll_impl->fd_ = fd;
     379              : 
     380              :     // Register fd with epoll (edge-triggered mode)
     381           62 :     epoll_impl->desc_data_.fd = fd;
     382           62 :     epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
     383           62 :     scheduler().register_descriptor(fd, &epoll_impl->desc_data_);
     384              : 
     385              :     // Cache the local endpoint (queries OS for ephemeral port if port was 0)
     386           62 :     sockaddr_in local_addr{};
     387           62 :     socklen_t local_len = sizeof(local_addr);
     388           62 :     if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     389           62 :         epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
     390              : 
     391           62 :     return {};
     392              : }
     393              : 
     394              : void
     395           11 : epoll_acceptor_service::
     396              : post(epoll_op* op)
     397              : {
     398           11 :     state_->sched_.post(op);
     399           11 : }
     400              : 
     401              : void
     402         2616 : epoll_acceptor_service::
     403              : work_started() noexcept
     404              : {
     405         2616 :     state_->sched_.work_started();
     406         2616 : }
     407              : 
     408              : void
     409            9 : epoll_acceptor_service::
     410              : work_finished() noexcept
     411              : {
     412            9 :     state_->sched_.work_finished();
     413            9 : }
     414              : 
     415              : epoll_socket_service*
     416         2609 : epoll_acceptor_service::
     417              : socket_service() const noexcept
     418              : {
     419         2609 :     auto* svc = ctx_.find_service<detail::socket_service>();
     420         2609 :     return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
     421              : }
     422              : 
     423              : } // namespace boost::corosio::detail
     424              : 
     425              : #endif // BOOST_COROSIO_HAS_EPOLL
        

Generated by: LCOV version 2.3