TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25 :
26 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27 :
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <netinet/in.h>
34 : #include <sys/epoll.h>
35 : #include <sys/socket.h>
36 : #include <unistd.h>
37 :
38 : namespace boost::corosio::detail {
39 :
40 : /// State for epoll acceptor service.
41 : using epoll_tcp_acceptor_state =
42 : reactor_service_state<epoll_scheduler, epoll_tcp_acceptor>;
43 :
44 : /** epoll acceptor service implementation.
45 :
46 : Inherits from tcp_acceptor_service to enable runtime polymorphism.
47 : Uses key_type = tcp_acceptor_service for service lookup.
48 : */
49 : class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
50 : : public tcp_acceptor_service
51 : {
52 : public:
53 : explicit epoll_tcp_acceptor_service(
54 : capy::execution_context& ctx, epoll_tcp_service& tcp_svc);
55 : ~epoll_tcp_acceptor_service() override;
56 :
57 : epoll_tcp_acceptor_service(epoll_tcp_acceptor_service const&) = delete;
58 : epoll_tcp_acceptor_service&
59 : operator=(epoll_tcp_acceptor_service const&) = delete;
60 :
61 : void shutdown() override;
62 :
63 : io_object::implementation* construct() override;
64 : void destroy(io_object::implementation*) override;
65 : void close(io_object::handle&) override;
66 : std::error_code open_acceptor_socket(
67 : tcp_acceptor::implementation& impl,
68 : int family,
69 : int type,
70 : int protocol) override;
71 : std::error_code
72 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
73 : std::error_code
74 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
75 :
76 HIT 152 : epoll_scheduler& scheduler() const noexcept
77 : {
78 152 : return state_->sched_;
79 : }
80 : void post(scheduler_op* op);
81 : void work_started() noexcept;
82 : void work_finished() noexcept;
83 :
84 : /** Get the TCP service for creating peer sockets during accept. */
85 : epoll_tcp_service* tcp_service() const noexcept;
86 :
87 : private:
88 : capy::execution_context& ctx_;
89 : epoll_tcp_service* tcp_svc_;
90 : std::unique_ptr<epoll_tcp_acceptor_state> state_;
91 : };
92 :
93 : inline void
94 6 : epoll_accept_op::cancel() noexcept
95 : {
96 6 : if (acceptor_impl_)
97 6 : acceptor_impl_->cancel_single_op(*this);
98 : else
99 MIS 0 : request_cancel();
100 HIT 6 : }
101 :
102 : inline void
103 4529 : epoll_accept_op::operator()()
104 : {
105 4529 : complete_accept_op<epoll_tcp_socket>(*this);
106 4529 : }
107 :
108 80 : inline epoll_tcp_acceptor::epoll_tcp_acceptor(
109 80 : epoll_tcp_acceptor_service& svc) noexcept
110 80 : : reactor_acceptor(svc)
111 : {
112 80 : }
113 :
114 : inline std::coroutine_handle<>
115 4529 : epoll_tcp_acceptor::accept(
116 : std::coroutine_handle<> h,
117 : capy::executor_ref ex,
118 : std::stop_token token,
119 : std::error_code* ec,
120 : io_object::implementation** impl_out)
121 : {
122 4529 : auto& op = acc_;
123 4529 : op.reset();
124 4529 : op.h = h;
125 4529 : op.ex = ex;
126 4529 : op.ec_out = ec;
127 4529 : op.impl_out = impl_out;
128 4529 : op.fd = fd_;
129 4529 : op.start(token, this);
130 :
131 4529 : sockaddr_storage peer_storage{};
132 4529 : socklen_t addrlen = sizeof(peer_storage);
133 : int accepted;
134 : do
135 : {
136 4529 : accepted = ::accept4(
137 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
138 : SOCK_NONBLOCK | SOCK_CLOEXEC);
139 : }
140 4529 : while (accepted < 0 && errno == EINTR);
141 :
142 4529 : if (accepted >= 0)
143 : {
144 : {
145 2 : std::lock_guard lock(desc_state_.mutex);
146 2 : desc_state_.read_ready = false;
147 2 : }
148 :
149 2 : if (svc_.scheduler().try_consume_inline_budget())
150 : {
151 MIS 0 : auto* socket_svc = svc_.tcp_service();
152 0 : if (socket_svc)
153 : {
154 : auto& impl =
155 0 : static_cast<epoll_tcp_socket&>(*socket_svc->construct());
156 0 : impl.set_socket(accepted);
157 :
158 0 : impl.desc_state_.fd = accepted;
159 : {
160 0 : std::lock_guard lock(impl.desc_state_.mutex);
161 0 : impl.desc_state_.read_op = nullptr;
162 0 : impl.desc_state_.write_op = nullptr;
163 0 : impl.desc_state_.connect_op = nullptr;
164 0 : }
165 0 : socket_svc->scheduler().register_descriptor(
166 : accepted, &impl.desc_state_);
167 :
168 0 : impl.set_endpoints(
169 : local_endpoint_, from_sockaddr(peer_storage));
170 :
171 0 : *ec = {};
172 0 : if (impl_out)
173 0 : *impl_out = &impl;
174 : }
175 : else
176 : {
177 0 : ::close(accepted);
178 0 : *ec = make_err(ENOENT);
179 0 : if (impl_out)
180 0 : *impl_out = nullptr;
181 : }
182 0 : op.cont_op.cont.h = h;
183 0 : return dispatch_coro(ex, op.cont_op.cont);
184 : }
185 :
186 HIT 2 : op.accepted_fd = accepted;
187 2 : op.peer_storage = peer_storage;
188 2 : op.complete(0, 0);
189 2 : op.impl_ptr = shared_from_this();
190 2 : svc_.post(&op);
191 2 : return std::noop_coroutine();
192 : }
193 :
194 4527 : if (errno == EAGAIN || errno == EWOULDBLOCK)
195 : {
196 4527 : op.impl_ptr = shared_from_this();
197 4527 : svc_.work_started();
198 :
199 4527 : std::lock_guard lock(desc_state_.mutex);
200 4527 : bool io_done = false;
201 4527 : if (desc_state_.read_ready)
202 : {
203 MIS 0 : desc_state_.read_ready = false;
204 0 : op.perform_io();
205 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
206 0 : if (!io_done)
207 0 : op.errn = 0;
208 : }
209 :
210 HIT 4527 : if (io_done || op.cancelled.load(std::memory_order_acquire))
211 : {
212 MIS 0 : svc_.post(&op);
213 0 : svc_.work_finished();
214 : }
215 : else
216 : {
217 HIT 4527 : desc_state_.read_op = &op;
218 : }
219 4527 : return std::noop_coroutine();
220 4527 : }
221 :
222 MIS 0 : op.complete(errno, 0);
223 0 : op.impl_ptr = shared_from_this();
224 0 : svc_.post(&op);
225 : // completion is always posted to scheduler queue, never inline.
226 0 : return std::noop_coroutine();
227 : }
228 :
229 : inline void
230 HIT 2 : epoll_tcp_acceptor::cancel() noexcept
231 : {
232 2 : do_cancel();
233 2 : }
234 :
235 : inline void
236 318 : epoll_tcp_acceptor::close_socket() noexcept
237 : {
238 318 : do_close_socket();
239 318 : }
240 :
241 322 : inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
242 322 : capy::execution_context& ctx, epoll_tcp_service& tcp_svc)
243 322 : : ctx_(ctx)
244 322 : , tcp_svc_(&tcp_svc)
245 322 : , state_(
246 : std::make_unique<epoll_tcp_acceptor_state>(
247 322 : ctx.use_service<epoll_scheduler>()))
248 : {
249 322 : }
250 :
251 644 : inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
252 :
253 : inline void
254 322 : epoll_tcp_acceptor_service::shutdown()
255 : {
256 322 : std::lock_guard lock(state_->mutex_);
257 :
258 322 : while (auto* impl = state_->impl_list_.pop_front())
259 MIS 0 : impl->close_socket();
260 :
261 : // Don't clear impl_ptrs_ here — same rationale as
262 : // epoll_tcp_service::shutdown(). Let ~state_ release ptrs
263 : // after scheduler shutdown has drained all queued ops.
264 HIT 322 : }
265 :
266 : inline io_object::implementation*
267 80 : epoll_tcp_acceptor_service::construct()
268 : {
269 80 : auto impl = std::make_shared<epoll_tcp_acceptor>(*this);
270 80 : auto* raw = impl.get();
271 :
272 80 : std::lock_guard lock(state_->mutex_);
273 80 : state_->impl_ptrs_.emplace(raw, std::move(impl));
274 80 : state_->impl_list_.push_back(raw);
275 :
276 80 : return raw;
277 80 : }
278 :
279 : inline void
280 80 : epoll_tcp_acceptor_service::destroy(io_object::implementation* impl)
281 : {
282 80 : auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(impl);
283 80 : epoll_impl->close_socket();
284 80 : std::lock_guard lock(state_->mutex_);
285 80 : state_->impl_list_.remove(epoll_impl);
286 80 : state_->impl_ptrs_.erase(epoll_impl);
287 80 : }
288 :
289 : inline void
290 159 : epoll_tcp_acceptor_service::close(io_object::handle& h)
291 : {
292 159 : static_cast<epoll_tcp_acceptor*>(h.get())->close_socket();
293 159 : }
294 :
295 : inline std::error_code
296 79 : epoll_tcp_acceptor_service::open_acceptor_socket(
297 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
298 : {
299 79 : auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
300 79 : epoll_impl->close_socket();
301 :
302 79 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
303 79 : if (fd < 0)
304 MIS 0 : return make_err(errno);
305 :
306 HIT 79 : if (family == AF_INET6)
307 : {
308 8 : int val = 0; // dual-stack default
309 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
310 : }
311 :
312 79 : epoll_impl->fd_ = fd;
313 :
314 : // Set up descriptor state but do NOT register with epoll yet
315 79 : epoll_impl->desc_state_.fd = fd;
316 : {
317 79 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
318 79 : epoll_impl->desc_state_.read_op = nullptr;
319 79 : }
320 :
321 79 : return {};
322 : }
323 :
324 : inline std::error_code
325 78 : epoll_tcp_acceptor_service::bind_acceptor(
326 : tcp_acceptor::implementation& impl, endpoint ep)
327 : {
328 78 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
329 : }
330 :
331 : inline std::error_code
332 75 : epoll_tcp_acceptor_service::listen_acceptor(
333 : tcp_acceptor::implementation& impl, int backlog)
334 : {
335 75 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
336 : }
337 :
338 : inline void
339 11 : epoll_tcp_acceptor_service::post(scheduler_op* op)
340 : {
341 11 : state_->sched_.post(op);
342 11 : }
343 :
344 : inline void
345 4527 : epoll_tcp_acceptor_service::work_started() noexcept
346 : {
347 4527 : state_->sched_.work_started();
348 4527 : }
349 :
350 : inline void
351 9 : epoll_tcp_acceptor_service::work_finished() noexcept
352 : {
353 9 : state_->sched_.work_finished();
354 9 : }
355 :
356 : inline epoll_tcp_service*
357 4520 : epoll_tcp_acceptor_service::tcp_service() const noexcept
358 : {
359 4520 : return tcp_svc_;
360 : }
361 :
362 : } // namespace boost::corosio::detail
363 :
364 : #endif // BOOST_COROSIO_HAS_EPOLL
365 :
366 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
|