include/boost/corosio/native/detail/select/select_tcp_acceptor_service.hpp

61.6% Lines (117/190) 95.0% List of functions (19/20)
select_tcp_acceptor_service.hpp
f(x) Functions (20)
Function Calls Lines Blocks
boost::corosio::detail::select_tcp_acceptor_service::scheduler() const :77 116x 100.0% 100.0% boost::corosio::detail::select_accept_op::cancel() :94 0 0.0% 0.0% boost::corosio::detail::select_accept_op::operator()() :103 3276x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::select_tcp_acceptor(boost::corosio::detail::select_tcp_acceptor_service&) :108 61x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :115 3276x 41.5% 35.0% boost::corosio::detail::select_tcp_acceptor::cancel() :268 2x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::close_socket() :274 240x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::select_tcp_acceptor_service(boost::capy::execution_context&, boost::corosio::detail::select_tcp_service&) :279 195x 100.0% 83.0% boost::corosio::detail::select_tcp_acceptor_service::~select_tcp_acceptor_service() :288 390x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::shutdown() :291 195x 80.0% 90.0% boost::corosio::detail::select_tcp_acceptor_service::construct() :304 61x 100.0% 78.0% boost::corosio::detail::select_tcp_acceptor_service::destroy(boost::corosio::io_object::implementation*) :317 61x 100.0% 83.0% boost::corosio::detail::select_tcp_acceptor_service::close(boost::corosio::io_object::handle&) :327 120x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :333 59x 61.3% 69.0% boost::corosio::detail::select_tcp_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :396 58x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :403 57x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::post(boost::corosio::detail::scheduler_op*) :410 5x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::work_started() :416 3274x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::work_finished() :422 3x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::tcp_service() const :428 3273x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25
26 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <sys/select.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 namespace boost::corosio::detail {
40
41 /// State for select acceptor service.
42 using select_tcp_acceptor_state =
43 reactor_service_state<select_scheduler, select_tcp_acceptor>;
44
45 /** select acceptor service implementation.
46
47 Inherits from tcp_acceptor_service to enable runtime polymorphism.
48 Uses key_type = tcp_acceptor_service for service lookup.
49 */
50 class BOOST_COROSIO_DECL select_tcp_acceptor_service final
51 : public tcp_acceptor_service
52 {
53 public:
54 explicit select_tcp_acceptor_service(
55 capy::execution_context& ctx, select_tcp_service& tcp_svc);
56 ~select_tcp_acceptor_service() override;
57
58 select_tcp_acceptor_service(select_tcp_acceptor_service const&) = delete;
59 select_tcp_acceptor_service&
60 operator=(select_tcp_acceptor_service const&) = delete;
61
62 void shutdown() override;
63
64 io_object::implementation* construct() override;
65 void destroy(io_object::implementation*) override;
66 void close(io_object::handle&) override;
67 std::error_code open_acceptor_socket(
68 tcp_acceptor::implementation& impl,
69 int family,
70 int type,
71 int protocol) override;
72 std::error_code
73 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
74 std::error_code
75 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
76
77 116x select_scheduler& scheduler() const noexcept
78 {
79 116x return state_->sched_;
80 }
81 void post(scheduler_op* op);
82 void work_started() noexcept;
83 void work_finished() noexcept;
84
85 /** Get the TCP service for creating peer sockets during accept. */
86 select_tcp_service* tcp_service() const noexcept;
87
88 private:
89 select_tcp_service* tcp_svc_;
90 std::unique_ptr<select_tcp_acceptor_state> state_;
91 };
92
93 inline void
94 select_accept_op::cancel() noexcept
95 {
96 if (acceptor_impl_)
97 acceptor_impl_->cancel_single_op(*this);
98 else
99 request_cancel();
100 }
101
102 inline void
103 3276x select_accept_op::operator()()
104 {
105 3276x complete_accept_op<select_tcp_socket>(*this);
106 3276x }
107
108 61x inline select_tcp_acceptor::select_tcp_acceptor(
109 61x select_tcp_acceptor_service& svc) noexcept
110 61x : reactor_acceptor(svc)
111 {
112 61x }
113
114 inline std::coroutine_handle<>
115 3276x select_tcp_acceptor::accept(
116 std::coroutine_handle<> h,
117 capy::executor_ref ex,
118 std::stop_token token,
119 std::error_code* ec,
120 io_object::implementation** impl_out)
121 {
122 3276x auto& op = acc_;
123 3276x op.reset();
124 3276x op.h = h;
125 3276x op.ex = ex;
126 3276x op.ec_out = ec;
127 3276x op.impl_out = impl_out;
128 3276x op.fd = fd_;
129 3276x op.start(token, this);
130
131 3276x sockaddr_storage peer_storage{};
132 3276x socklen_t addrlen = sizeof(peer_storage);
133 int accepted;
134 do
135 {
136 accepted =
137 3276x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
138 }
139 3276x while (accepted < 0 && errno == EINTR);
140
141 3276x if (accepted >= 0)
142 {
143 2x if (accepted >= FD_SETSIZE)
144 {
145 ::close(accepted);
146 op.complete(EINVAL, 0);
147 op.impl_ptr = shared_from_this();
148 svc_.post(&op);
149 return std::noop_coroutine();
150 }
151
152 2x int flags = ::fcntl(accepted, F_GETFL, 0);
153 2x if (flags == -1)
154 {
155 int err = errno;
156 ::close(accepted);
157 op.complete(err, 0);
158 op.impl_ptr = shared_from_this();
159 svc_.post(&op);
160 return std::noop_coroutine();
161 }
162
163 2x if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
164 {
165 int err = errno;
166 ::close(accepted);
167 op.complete(err, 0);
168 op.impl_ptr = shared_from_this();
169 svc_.post(&op);
170 return std::noop_coroutine();
171 }
172
173 2x if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
174 {
175 int err = errno;
176 ::close(accepted);
177 op.complete(err, 0);
178 op.impl_ptr = shared_from_this();
179 svc_.post(&op);
180 return std::noop_coroutine();
181 }
182
183 {
184 2x std::lock_guard lock(desc_state_.mutex);
185 2x desc_state_.read_ready = false;
186 2x }
187
188 2x if (svc_.scheduler().try_consume_inline_budget())
189 {
190 auto* socket_svc = svc_.tcp_service();
191 if (socket_svc)
192 {
193 auto& impl =
194 static_cast<select_tcp_socket&>(*socket_svc->construct());
195 impl.set_socket(accepted);
196
197 impl.desc_state_.fd = accepted;
198 {
199 std::lock_guard lock(impl.desc_state_.mutex);
200 impl.desc_state_.read_op = nullptr;
201 impl.desc_state_.write_op = nullptr;
202 impl.desc_state_.connect_op = nullptr;
203 }
204 socket_svc->scheduler().register_descriptor(
205 accepted, &impl.desc_state_);
206
207 impl.set_endpoints(
208 local_endpoint_, from_sockaddr(peer_storage));
209
210 *ec = {};
211 if (impl_out)
212 *impl_out = &impl;
213 }
214 else
215 {
216 ::close(accepted);
217 *ec = make_err(ENOENT);
218 if (impl_out)
219 *impl_out = nullptr;
220 }
221 op.cont_op.cont.h = h;
222 return dispatch_coro(ex, op.cont_op.cont);
223 }
224
225 2x op.accepted_fd = accepted;
226 2x op.peer_storage = peer_storage;
227 2x op.complete(0, 0);
228 2x op.impl_ptr = shared_from_this();
229 2x svc_.post(&op);
230 2x return std::noop_coroutine();
231 }
232
233 3274x if (errno == EAGAIN || errno == EWOULDBLOCK)
234 {
235 3274x op.impl_ptr = shared_from_this();
236 3274x svc_.work_started();
237
238 3274x std::lock_guard lock(desc_state_.mutex);
239 3274x bool io_done = false;
240 3274x if (desc_state_.read_ready)
241 {
242 desc_state_.read_ready = false;
243 op.perform_io();
244 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
245 if (!io_done)
246 op.errn = 0;
247 }
248
249 3274x if (io_done || op.cancelled.load(std::memory_order_acquire))
250 {
251 svc_.post(&op);
252 svc_.work_finished();
253 }
254 else
255 {
256 3274x desc_state_.read_op = &op;
257 }
258 3274x return std::noop_coroutine();
259 3274x }
260
261 op.complete(errno, 0);
262 op.impl_ptr = shared_from_this();
263 svc_.post(&op);
264 return std::noop_coroutine();
265 }
266
267 inline void
268 2x select_tcp_acceptor::cancel() noexcept
269 {
270 2x do_cancel();
271 2x }
272
273 inline void
274 240x select_tcp_acceptor::close_socket() noexcept
275 {
276 240x do_close_socket();
277 240x }
278
279 195x inline select_tcp_acceptor_service::select_tcp_acceptor_service(
280 195x capy::execution_context& ctx, select_tcp_service& tcp_svc)
281 195x : tcp_svc_(&tcp_svc)
282 195x , state_(
283 std::make_unique<select_tcp_acceptor_state>(
284 195x ctx.use_service<select_scheduler>()))
285 {
286 195x }
287
288 390x inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
289
290 inline void
291 195x select_tcp_acceptor_service::shutdown()
292 {
293 195x std::lock_guard lock(state_->mutex_);
294
295 195x while (auto* impl = state_->impl_list_.pop_front())
296 impl->close_socket();
297
298 // Don't clear impl_ptrs_ here — same rationale as
299 // select_tcp_service::shutdown(). Let ~state_ release ptrs
300 // after scheduler shutdown has drained all queued ops.
301 195x }
302
303 inline io_object::implementation*
304 61x select_tcp_acceptor_service::construct()
305 {
306 61x auto impl = std::make_shared<select_tcp_acceptor>(*this);
307 61x auto* raw = impl.get();
308
309 61x std::lock_guard lock(state_->mutex_);
310 61x state_->impl_ptrs_.emplace(raw, std::move(impl));
311 61x state_->impl_list_.push_back(raw);
312
313 61x return raw;
314 61x }
315
316 inline void
317 61x select_tcp_acceptor_service::destroy(io_object::implementation* impl)
318 {
319 61x auto* select_impl = static_cast<select_tcp_acceptor*>(impl);
320 61x select_impl->close_socket();
321 61x std::lock_guard lock(state_->mutex_);
322 61x state_->impl_list_.remove(select_impl);
323 61x state_->impl_ptrs_.erase(select_impl);
324 61x }
325
326 inline void
327 120x select_tcp_acceptor_service::close(io_object::handle& h)
328 {
329 120x static_cast<select_tcp_acceptor*>(h.get())->close_socket();
330 120x }
331
332 inline std::error_code
333 59x select_tcp_acceptor_service::open_acceptor_socket(
334 tcp_acceptor::implementation& impl, int family, int type, int protocol)
335 {
336 59x auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
337 59x select_impl->close_socket();
338
339 59x int fd = ::socket(family, type, protocol);
340 59x if (fd < 0)
341 return make_err(errno);
342
343 59x int flags = ::fcntl(fd, F_GETFL, 0);
344 59x if (flags == -1)
345 {
346 int errn = errno;
347 ::close(fd);
348 return make_err(errn);
349 }
350 59x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
351 {
352 int errn = errno;
353 ::close(fd);
354 return make_err(errn);
355 }
356 59x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
357 {
358 int errn = errno;
359 ::close(fd);
360 return make_err(errn);
361 }
362
363 59x if (fd >= FD_SETSIZE)
364 {
365 ::close(fd);
366 return make_err(EMFILE);
367 }
368
369 59x if (family == AF_INET6)
370 {
371 8x int val = 0; // dual-stack default
372 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
373 }
374
375 #ifdef SO_NOSIGPIPE
376 {
377 int nosig = 1;
378 ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
379 }
380 #endif
381
382 59x select_impl->fd_ = fd;
383
384 // Set up descriptor state but do NOT register with reactor yet
385 // (registration happens in do_listen via reactor_acceptor base)
386 59x select_impl->desc_state_.fd = fd;
387 {
388 59x std::lock_guard lock(select_impl->desc_state_.mutex);
389 59x select_impl->desc_state_.read_op = nullptr;
390 59x }
391
392 59x return {};
393 }
394
395 inline std::error_code
396 58x select_tcp_acceptor_service::bind_acceptor(
397 tcp_acceptor::implementation& impl, endpoint ep)
398 {
399 58x return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
400 }
401
402 inline std::error_code
403 57x select_tcp_acceptor_service::listen_acceptor(
404 tcp_acceptor::implementation& impl, int backlog)
405 {
406 57x return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
407 }
408
409 inline void
410 5x select_tcp_acceptor_service::post(scheduler_op* op)
411 {
412 5x state_->sched_.post(op);
413 5x }
414
415 inline void
416 3274x select_tcp_acceptor_service::work_started() noexcept
417 {
418 3274x state_->sched_.work_started();
419 3274x }
420
421 inline void
422 3x select_tcp_acceptor_service::work_finished() noexcept
423 {
424 3x state_->sched_.work_finished();
425 3x }
426
427 inline select_tcp_service*
428 3273x select_tcp_acceptor_service::tcp_service() const noexcept
429 {
430 3273x return tcp_svc_;
431 }
432
433 } // namespace boost::corosio::detail
434
435 #endif // BOOST_COROSIO_HAS_SELECT
436
437 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
438