include/boost/corosio/native/detail/epoll/epoll_tcp_acceptor_service.hpp

76.2% Lines (112/147) 100.0% List of functions (20/20)
epoll_tcp_acceptor_service.hpp
f(x) Functions (20)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_acceptor_service::scheduler() const :76 152x 100.0% 100.0% boost::corosio::detail::epoll_accept_op::cancel() :94 6x 80.0% 75.0% boost::corosio::detail::epoll_accept_op::operator()() :103 4529x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor::epoll_tcp_acceptor(boost::corosio::detail::epoll_tcp_acceptor_service&) :108 80x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :115 4529x 51.5% 47.0% boost::corosio::detail::epoll_tcp_acceptor::cancel() :230 2x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor::close_socket() :236 318x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(boost::capy::execution_context&, boost::corosio::detail::epoll_tcp_service&) :241 322x 100.0% 83.0% boost::corosio::detail::epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() :251 644x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::shutdown() :254 322x 80.0% 90.0% boost::corosio::detail::epoll_tcp_acceptor_service::construct() :267 80x 100.0% 78.0% boost::corosio::detail::epoll_tcp_acceptor_service::destroy(boost::corosio::io_object::implementation*) :280 80x 100.0% 83.0% boost::corosio::detail::epoll_tcp_acceptor_service::close(boost::corosio::io_object::handle&) :290 159x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :296 79x 93.3% 93.0% boost::corosio::detail::epoll_tcp_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :325 78x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :332 75x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::post(boost::corosio::detail::scheduler_op*) :339 11x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::work_started() :345 4527x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::work_finished() :351 9x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::tcp_service() const :357 4520x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25
26 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <netinet/in.h>
34 #include <sys/epoll.h>
35 #include <sys/socket.h>
36 #include <unistd.h>
37
38 namespace boost::corosio::detail {
39
40 /// State for epoll acceptor service.
41 using epoll_tcp_acceptor_state =
42 reactor_service_state<epoll_scheduler, epoll_tcp_acceptor>;
43
44 /** epoll acceptor service implementation.
45
46 Inherits from tcp_acceptor_service to enable runtime polymorphism.
47 Uses key_type = tcp_acceptor_service for service lookup.
48 */
49 class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
50 : public tcp_acceptor_service
51 {
52 public:
53 explicit epoll_tcp_acceptor_service(
54 capy::execution_context& ctx, epoll_tcp_service& tcp_svc);
55 ~epoll_tcp_acceptor_service() override;
56
57 epoll_tcp_acceptor_service(epoll_tcp_acceptor_service const&) = delete;
58 epoll_tcp_acceptor_service&
59 operator=(epoll_tcp_acceptor_service const&) = delete;
60
61 void shutdown() override;
62
63 io_object::implementation* construct() override;
64 void destroy(io_object::implementation*) override;
65 void close(io_object::handle&) override;
66 std::error_code open_acceptor_socket(
67 tcp_acceptor::implementation& impl,
68 int family,
69 int type,
70 int protocol) override;
71 std::error_code
72 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
73 std::error_code
74 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
75
76 152x epoll_scheduler& scheduler() const noexcept
77 {
78 152x return state_->sched_;
79 }
80 void post(scheduler_op* op);
81 void work_started() noexcept;
82 void work_finished() noexcept;
83
84 /** Get the TCP service for creating peer sockets during accept. */
85 epoll_tcp_service* tcp_service() const noexcept;
86
87 private:
88 capy::execution_context& ctx_;
89 epoll_tcp_service* tcp_svc_;
90 std::unique_ptr<epoll_tcp_acceptor_state> state_;
91 };
92
93 inline void
94 6x epoll_accept_op::cancel() noexcept
95 {
96 6x if (acceptor_impl_)
97 6x acceptor_impl_->cancel_single_op(*this);
98 else
99 request_cancel();
100 6x }
101
102 inline void
103 4529x epoll_accept_op::operator()()
104 {
105 4529x complete_accept_op<epoll_tcp_socket>(*this);
106 4529x }
107
108 80x inline epoll_tcp_acceptor::epoll_tcp_acceptor(
109 80x epoll_tcp_acceptor_service& svc) noexcept
110 80x : reactor_acceptor(svc)
111 {
112 80x }
113
114 inline std::coroutine_handle<>
115 4529x epoll_tcp_acceptor::accept(
116 std::coroutine_handle<> h,
117 capy::executor_ref ex,
118 std::stop_token token,
119 std::error_code* ec,
120 io_object::implementation** impl_out)
121 {
122 4529x auto& op = acc_;
123 4529x op.reset();
124 4529x op.h = h;
125 4529x op.ex = ex;
126 4529x op.ec_out = ec;
127 4529x op.impl_out = impl_out;
128 4529x op.fd = fd_;
129 4529x op.start(token, this);
130
131 4529x sockaddr_storage peer_storage{};
132 4529x socklen_t addrlen = sizeof(peer_storage);
133 int accepted;
134 do
135 {
136 4529x accepted = ::accept4(
137 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
138 SOCK_NONBLOCK | SOCK_CLOEXEC);
139 }
140 4529x while (accepted < 0 && errno == EINTR);
141
142 4529x if (accepted >= 0)
143 {
144 {
145 2x std::lock_guard lock(desc_state_.mutex);
146 2x desc_state_.read_ready = false;
147 2x }
148
149 2x if (svc_.scheduler().try_consume_inline_budget())
150 {
151 auto* socket_svc = svc_.tcp_service();
152 if (socket_svc)
153 {
154 auto& impl =
155 static_cast<epoll_tcp_socket&>(*socket_svc->construct());
156 impl.set_socket(accepted);
157
158 impl.desc_state_.fd = accepted;
159 {
160 std::lock_guard lock(impl.desc_state_.mutex);
161 impl.desc_state_.read_op = nullptr;
162 impl.desc_state_.write_op = nullptr;
163 impl.desc_state_.connect_op = nullptr;
164 }
165 socket_svc->scheduler().register_descriptor(
166 accepted, &impl.desc_state_);
167
168 impl.set_endpoints(
169 local_endpoint_, from_sockaddr(peer_storage));
170
171 *ec = {};
172 if (impl_out)
173 *impl_out = &impl;
174 }
175 else
176 {
177 ::close(accepted);
178 *ec = make_err(ENOENT);
179 if (impl_out)
180 *impl_out = nullptr;
181 }
182 op.cont_op.cont.h = h;
183 return dispatch_coro(ex, op.cont_op.cont);
184 }
185
186 2x op.accepted_fd = accepted;
187 2x op.peer_storage = peer_storage;
188 2x op.complete(0, 0);
189 2x op.impl_ptr = shared_from_this();
190 2x svc_.post(&op);
191 2x return std::noop_coroutine();
192 }
193
194 4527x if (errno == EAGAIN || errno == EWOULDBLOCK)
195 {
196 4527x op.impl_ptr = shared_from_this();
197 4527x svc_.work_started();
198
199 4527x std::lock_guard lock(desc_state_.mutex);
200 4527x bool io_done = false;
201 4527x if (desc_state_.read_ready)
202 {
203 desc_state_.read_ready = false;
204 op.perform_io();
205 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
206 if (!io_done)
207 op.errn = 0;
208 }
209
210 4527x if (io_done || op.cancelled.load(std::memory_order_acquire))
211 {
212 svc_.post(&op);
213 svc_.work_finished();
214 }
215 else
216 {
217 4527x desc_state_.read_op = &op;
218 }
219 4527x return std::noop_coroutine();
220 4527x }
221
222 op.complete(errno, 0);
223 op.impl_ptr = shared_from_this();
224 svc_.post(&op);
225 // completion is always posted to scheduler queue, never inline.
226 return std::noop_coroutine();
227 }
228
229 inline void
230 2x epoll_tcp_acceptor::cancel() noexcept
231 {
232 2x do_cancel();
233 2x }
234
235 inline void
236 318x epoll_tcp_acceptor::close_socket() noexcept
237 {
238 318x do_close_socket();
239 318x }
240
241 322x inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
242 322x capy::execution_context& ctx, epoll_tcp_service& tcp_svc)
243 322x : ctx_(ctx)
244 322x , tcp_svc_(&tcp_svc)
245 322x , state_(
246 std::make_unique<epoll_tcp_acceptor_state>(
247 322x ctx.use_service<epoll_scheduler>()))
248 {
249 322x }
250
251 644x inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
252
253 inline void
254 322x epoll_tcp_acceptor_service::shutdown()
255 {
256 322x std::lock_guard lock(state_->mutex_);
257
258 322x while (auto* impl = state_->impl_list_.pop_front())
259 impl->close_socket();
260
261 // Don't clear impl_ptrs_ here — same rationale as
262 // epoll_tcp_service::shutdown(). Let ~state_ release ptrs
263 // after scheduler shutdown has drained all queued ops.
264 322x }
265
266 inline io_object::implementation*
267 80x epoll_tcp_acceptor_service::construct()
268 {
269 80x auto impl = std::make_shared<epoll_tcp_acceptor>(*this);
270 80x auto* raw = impl.get();
271
272 80x std::lock_guard lock(state_->mutex_);
273 80x state_->impl_ptrs_.emplace(raw, std::move(impl));
274 80x state_->impl_list_.push_back(raw);
275
276 80x return raw;
277 80x }
278
279 inline void
280 80x epoll_tcp_acceptor_service::destroy(io_object::implementation* impl)
281 {
282 80x auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(impl);
283 80x epoll_impl->close_socket();
284 80x std::lock_guard lock(state_->mutex_);
285 80x state_->impl_list_.remove(epoll_impl);
286 80x state_->impl_ptrs_.erase(epoll_impl);
287 80x }
288
289 inline void
290 159x epoll_tcp_acceptor_service::close(io_object::handle& h)
291 {
292 159x static_cast<epoll_tcp_acceptor*>(h.get())->close_socket();
293 159x }
294
295 inline std::error_code
296 79x epoll_tcp_acceptor_service::open_acceptor_socket(
297 tcp_acceptor::implementation& impl, int family, int type, int protocol)
298 {
299 79x auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
300 79x epoll_impl->close_socket();
301
302 79x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
303 79x if (fd < 0)
304 return make_err(errno);
305
306 79x if (family == AF_INET6)
307 {
308 8x int val = 0; // dual-stack default
309 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
310 }
311
312 79x epoll_impl->fd_ = fd;
313
314 // Set up descriptor state but do NOT register with epoll yet
315 79x epoll_impl->desc_state_.fd = fd;
316 {
317 79x std::lock_guard lock(epoll_impl->desc_state_.mutex);
318 79x epoll_impl->desc_state_.read_op = nullptr;
319 79x }
320
321 79x return {};
322 }
323
324 inline std::error_code
325 78x epoll_tcp_acceptor_service::bind_acceptor(
326 tcp_acceptor::implementation& impl, endpoint ep)
327 {
328 78x return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
329 }
330
331 inline std::error_code
332 75x epoll_tcp_acceptor_service::listen_acceptor(
333 tcp_acceptor::implementation& impl, int backlog)
334 {
335 75x return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
336 }
337
338 inline void
339 11x epoll_tcp_acceptor_service::post(scheduler_op* op)
340 {
341 11x state_->sched_.post(op);
342 11x }
343
344 inline void
345 4527x epoll_tcp_acceptor_service::work_started() noexcept
346 {
347 4527x state_->sched_.work_started();
348 4527x }
349
350 inline void
351 9x epoll_tcp_acceptor_service::work_finished() noexcept
352 {
353 9x state_->sched_.work_finished();
354 9x }
355
356 inline epoll_tcp_service*
357 4520x epoll_tcp_acceptor_service::tcp_service() const noexcept
358 {
359 4520x return tcp_svc_;
360 }
361
362 } // namespace boost::corosio::detail
363
364 #endif // BOOST_COROSIO_HAS_EPOLL
365
366 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
367