TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25 :
26 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27 :
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <netinet/in.h>
35 : #include <sys/select.h>
36 : #include <sys/socket.h>
37 : #include <unistd.h>
38 :
39 : namespace boost::corosio::detail {
40 :
41 : /// State for select acceptor service.
42 : using select_tcp_acceptor_state =
43 : reactor_service_state<select_scheduler, select_tcp_acceptor>;
44 :
45 : /** select acceptor service implementation.
46 :
47 : Inherits from tcp_acceptor_service to enable runtime polymorphism.
48 : Uses key_type = tcp_acceptor_service for service lookup.
49 : */
50 : class BOOST_COROSIO_DECL select_tcp_acceptor_service final
51 : : public tcp_acceptor_service
52 : {
53 : public:
54 : explicit select_tcp_acceptor_service(
55 : capy::execution_context& ctx, select_tcp_service& tcp_svc);
56 : ~select_tcp_acceptor_service() override;
57 :
58 : select_tcp_acceptor_service(select_tcp_acceptor_service const&) = delete;
59 : select_tcp_acceptor_service&
60 : operator=(select_tcp_acceptor_service const&) = delete;
61 :
62 : void shutdown() override;
63 :
64 : io_object::implementation* construct() override;
65 : void destroy(io_object::implementation*) override;
66 : void close(io_object::handle&) override;
67 : std::error_code open_acceptor_socket(
68 : tcp_acceptor::implementation& impl,
69 : int family,
70 : int type,
71 : int protocol) override;
72 : std::error_code
73 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
74 : std::error_code
75 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
76 :
77 HIT 116 : select_scheduler& scheduler() const noexcept
78 : {
79 116 : return state_->sched_;
80 : }
81 : void post(scheduler_op* op);
82 : void work_started() noexcept;
83 : void work_finished() noexcept;
84 :
85 : /** Get the TCP service for creating peer sockets during accept. */
86 : select_tcp_service* tcp_service() const noexcept;
87 :
88 : private:
89 : capy::execution_context& ctx_;
90 : select_tcp_service* tcp_svc_;
91 : std::unique_ptr<select_tcp_acceptor_state> state_;
92 : };
93 :
94 : inline void
95 MIS 0 : select_accept_op::cancel() noexcept
96 : {
97 0 : if (acceptor_impl_)
98 0 : acceptor_impl_->cancel_single_op(*this);
99 : else
100 0 : request_cancel();
101 0 : }
102 :
103 : inline void
104 HIT 2844 : select_accept_op::operator()()
105 : {
106 2844 : complete_accept_op<select_tcp_socket>(*this);
107 2844 : }
108 :
109 61 : inline select_tcp_acceptor::select_tcp_acceptor(
110 61 : select_tcp_acceptor_service& svc) noexcept
111 61 : : reactor_acceptor(svc)
112 : {
113 61 : }
114 :
115 : inline std::coroutine_handle<>
116 2844 : select_tcp_acceptor::accept(
117 : std::coroutine_handle<> h,
118 : capy::executor_ref ex,
119 : std::stop_token token,
120 : std::error_code* ec,
121 : io_object::implementation** impl_out)
122 : {
123 2844 : auto& op = acc_;
124 2844 : op.reset();
125 2844 : op.h = h;
126 2844 : op.ex = ex;
127 2844 : op.ec_out = ec;
128 2844 : op.impl_out = impl_out;
129 2844 : op.fd = fd_;
130 2844 : op.start(token, this);
131 :
132 2844 : sockaddr_storage peer_storage{};
133 2844 : socklen_t addrlen = sizeof(peer_storage);
134 : int accepted;
135 : do
136 : {
137 : accepted =
138 2844 : ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
139 : }
140 2844 : while (accepted < 0 && errno == EINTR);
141 :
142 2844 : if (accepted >= 0)
143 : {
144 2 : if (accepted >= FD_SETSIZE)
145 : {
146 MIS 0 : ::close(accepted);
147 0 : op.complete(EINVAL, 0);
148 0 : op.impl_ptr = shared_from_this();
149 0 : svc_.post(&op);
150 0 : return std::noop_coroutine();
151 : }
152 :
153 HIT 2 : int flags = ::fcntl(accepted, F_GETFL, 0);
154 2 : if (flags == -1)
155 : {
156 MIS 0 : int err = errno;
157 0 : ::close(accepted);
158 0 : op.complete(err, 0);
159 0 : op.impl_ptr = shared_from_this();
160 0 : svc_.post(&op);
161 0 : return std::noop_coroutine();
162 : }
163 :
164 HIT 2 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
165 : {
166 MIS 0 : int err = errno;
167 0 : ::close(accepted);
168 0 : op.complete(err, 0);
169 0 : op.impl_ptr = shared_from_this();
170 0 : svc_.post(&op);
171 0 : return std::noop_coroutine();
172 : }
173 :
174 HIT 2 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
175 : {
176 MIS 0 : int err = errno;
177 0 : ::close(accepted);
178 0 : op.complete(err, 0);
179 0 : op.impl_ptr = shared_from_this();
180 0 : svc_.post(&op);
181 0 : return std::noop_coroutine();
182 : }
183 :
184 : {
185 HIT 2 : std::lock_guard lock(desc_state_.mutex);
186 2 : desc_state_.read_ready = false;
187 2 : }
188 :
189 2 : if (svc_.scheduler().try_consume_inline_budget())
190 : {
191 MIS 0 : auto* socket_svc = svc_.tcp_service();
192 0 : if (socket_svc)
193 : {
194 : auto& impl =
195 0 : static_cast<select_tcp_socket&>(*socket_svc->construct());
196 0 : impl.set_socket(accepted);
197 :
198 0 : impl.desc_state_.fd = accepted;
199 : {
200 0 : std::lock_guard lock(impl.desc_state_.mutex);
201 0 : impl.desc_state_.read_op = nullptr;
202 0 : impl.desc_state_.write_op = nullptr;
203 0 : impl.desc_state_.connect_op = nullptr;
204 0 : }
205 0 : socket_svc->scheduler().register_descriptor(
206 : accepted, &impl.desc_state_);
207 :
208 0 : impl.set_endpoints(
209 : local_endpoint_, from_sockaddr(peer_storage));
210 :
211 0 : *ec = {};
212 0 : if (impl_out)
213 0 : *impl_out = &impl;
214 : }
215 : else
216 : {
217 0 : ::close(accepted);
218 0 : *ec = make_err(ENOENT);
219 0 : if (impl_out)
220 0 : *impl_out = nullptr;
221 : }
222 0 : op.cont_op.cont.h = h;
223 0 : return dispatch_coro(ex, op.cont_op.cont);
224 : }
225 :
226 HIT 2 : op.accepted_fd = accepted;
227 2 : op.peer_storage = peer_storage;
228 2 : op.complete(0, 0);
229 2 : op.impl_ptr = shared_from_this();
230 2 : svc_.post(&op);
231 2 : return std::noop_coroutine();
232 : }
233 :
234 2842 : if (errno == EAGAIN || errno == EWOULDBLOCK)
235 : {
236 2842 : op.impl_ptr = shared_from_this();
237 2842 : svc_.work_started();
238 :
239 2842 : std::lock_guard lock(desc_state_.mutex);
240 2842 : bool io_done = false;
241 2842 : if (desc_state_.read_ready)
242 : {
243 MIS 0 : desc_state_.read_ready = false;
244 0 : op.perform_io();
245 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
246 0 : if (!io_done)
247 0 : op.errn = 0;
248 : }
249 :
250 HIT 2842 : if (io_done || op.cancelled.load(std::memory_order_acquire))
251 : {
252 MIS 0 : svc_.post(&op);
253 0 : svc_.work_finished();
254 : }
255 : else
256 : {
257 HIT 2842 : desc_state_.read_op = &op;
258 : }
259 2842 : return std::noop_coroutine();
260 2842 : }
261 :
262 MIS 0 : op.complete(errno, 0);
263 0 : op.impl_ptr = shared_from_this();
264 0 : svc_.post(&op);
265 0 : return std::noop_coroutine();
266 : }
267 :
268 : inline void
269 HIT 2 : select_tcp_acceptor::cancel() noexcept
270 : {
271 2 : do_cancel();
272 2 : }
273 :
274 : inline void
275 240 : select_tcp_acceptor::close_socket() noexcept
276 : {
277 240 : do_close_socket();
278 240 : }
279 :
280 195 : inline select_tcp_acceptor_service::select_tcp_acceptor_service(
281 195 : capy::execution_context& ctx, select_tcp_service& tcp_svc)
282 195 : : ctx_(ctx)
283 195 : , tcp_svc_(&tcp_svc)
284 195 : , state_(
285 : std::make_unique<select_tcp_acceptor_state>(
286 195 : ctx.use_service<select_scheduler>()))
287 : {
288 195 : }
289 :
290 390 : inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
291 :
292 : inline void
293 195 : select_tcp_acceptor_service::shutdown()
294 : {
295 195 : std::lock_guard lock(state_->mutex_);
296 :
297 195 : while (auto* impl = state_->impl_list_.pop_front())
298 MIS 0 : impl->close_socket();
299 :
300 : // Don't clear impl_ptrs_ here — same rationale as
301 : // select_tcp_service::shutdown(). Let ~state_ release ptrs
302 : // after scheduler shutdown has drained all queued ops.
303 HIT 195 : }
304 :
305 : inline io_object::implementation*
306 61 : select_tcp_acceptor_service::construct()
307 : {
308 61 : auto impl = std::make_shared<select_tcp_acceptor>(*this);
309 61 : auto* raw = impl.get();
310 :
311 61 : std::lock_guard lock(state_->mutex_);
312 61 : state_->impl_ptrs_.emplace(raw, std::move(impl));
313 61 : state_->impl_list_.push_back(raw);
314 :
315 61 : return raw;
316 61 : }
317 :
318 : inline void
319 61 : select_tcp_acceptor_service::destroy(io_object::implementation* impl)
320 : {
321 61 : auto* select_impl = static_cast<select_tcp_acceptor*>(impl);
322 61 : select_impl->close_socket();
323 61 : std::lock_guard lock(state_->mutex_);
324 61 : state_->impl_list_.remove(select_impl);
325 61 : state_->impl_ptrs_.erase(select_impl);
326 61 : }
327 :
328 : inline void
329 120 : select_tcp_acceptor_service::close(io_object::handle& h)
330 : {
331 120 : static_cast<select_tcp_acceptor*>(h.get())->close_socket();
332 120 : }
333 :
334 : inline std::error_code
335 59 : select_tcp_acceptor_service::open_acceptor_socket(
336 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
337 : {
338 59 : auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
339 59 : select_impl->close_socket();
340 :
341 59 : int fd = ::socket(family, type, protocol);
342 59 : if (fd < 0)
343 MIS 0 : return make_err(errno);
344 :
345 HIT 59 : int flags = ::fcntl(fd, F_GETFL, 0);
346 59 : if (flags == -1)
347 : {
348 MIS 0 : int errn = errno;
349 0 : ::close(fd);
350 0 : return make_err(errn);
351 : }
352 HIT 59 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
353 : {
354 MIS 0 : int errn = errno;
355 0 : ::close(fd);
356 0 : return make_err(errn);
357 : }
358 HIT 59 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
359 : {
360 MIS 0 : int errn = errno;
361 0 : ::close(fd);
362 0 : return make_err(errn);
363 : }
364 :
365 HIT 59 : if (fd >= FD_SETSIZE)
366 : {
367 MIS 0 : ::close(fd);
368 0 : return make_err(EMFILE);
369 : }
370 :
371 HIT 59 : if (family == AF_INET6)
372 : {
373 8 : int val = 0; // dual-stack default
374 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
375 : }
376 :
377 : #ifdef SO_NOSIGPIPE
378 : {
379 : int nosig = 1;
380 : ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
381 : }
382 : #endif
383 :
384 59 : select_impl->fd_ = fd;
385 :
386 : // Set up descriptor state but do NOT register with reactor yet
387 : // (registration happens in do_listen via reactor_acceptor base)
388 59 : select_impl->desc_state_.fd = fd;
389 : {
390 59 : std::lock_guard lock(select_impl->desc_state_.mutex);
391 59 : select_impl->desc_state_.read_op = nullptr;
392 59 : }
393 :
394 59 : return {};
395 : }
396 :
397 : inline std::error_code
398 58 : select_tcp_acceptor_service::bind_acceptor(
399 : tcp_acceptor::implementation& impl, endpoint ep)
400 : {
401 58 : return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
402 : }
403 :
404 : inline std::error_code
405 57 : select_tcp_acceptor_service::listen_acceptor(
406 : tcp_acceptor::implementation& impl, int backlog)
407 : {
408 57 : return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
409 : }
410 :
411 : inline void
412 5 : select_tcp_acceptor_service::post(scheduler_op* op)
413 : {
414 5 : state_->sched_.post(op);
415 5 : }
416 :
417 : inline void
418 2842 : select_tcp_acceptor_service::work_started() noexcept
419 : {
420 2842 : state_->sched_.work_started();
421 2842 : }
422 :
423 : inline void
424 3 : select_tcp_acceptor_service::work_finished() noexcept
425 : {
426 3 : state_->sched_.work_finished();
427 3 : }
428 :
429 : inline select_tcp_service*
430 2841 : select_tcp_acceptor_service::tcp_service() const noexcept
431 : {
432 2841 : return tcp_svc_;
433 : }
434 :
435 : } // namespace boost::corosio::detail
436 :
437 : #endif // BOOST_COROSIO_HAS_SELECT
438 :
439 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
|