include/boost/corosio/native/detail/select/select_tcp_acceptor_service.hpp

61.8% Lines (118/191) 95.0% List of functions (19/20)
select_tcp_acceptor_service.hpp
f(x) Functions (20)
Function Calls Lines Blocks
boost::corosio::detail::select_tcp_acceptor_service::scheduler() const :77 116x 100.0% 100.0% boost::corosio::detail::select_accept_op::cancel() :95 0 0.0% 0.0% boost::corosio::detail::select_accept_op::operator()() :104 2844x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::select_tcp_acceptor(boost::corosio::detail::select_tcp_acceptor_service&) :109 61x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :116 2844x 41.5% 35.0% boost::corosio::detail::select_tcp_acceptor::cancel() :269 2x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::close_socket() :275 240x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::select_tcp_acceptor_service(boost::capy::execution_context&, boost::corosio::detail::select_tcp_service&) :280 195x 100.0% 83.0% boost::corosio::detail::select_tcp_acceptor_service::~select_tcp_acceptor_service() :290 390x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::shutdown() :293 195x 80.0% 90.0% boost::corosio::detail::select_tcp_acceptor_service::construct() :306 61x 100.0% 78.0% boost::corosio::detail::select_tcp_acceptor_service::destroy(boost::corosio::io_object::implementation*) :319 61x 100.0% 83.0% boost::corosio::detail::select_tcp_acceptor_service::close(boost::corosio::io_object::handle&) :329 120x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :335 59x 61.3% 69.0% boost::corosio::detail::select_tcp_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :398 58x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :405 57x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::post(boost::corosio::detail::scheduler_op*) :412 5x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::work_started() :418 2842x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::work_finished() :424 3x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::tcp_service() const :430 2841x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25
26 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <sys/select.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 namespace boost::corosio::detail {
40
41 /// State for select acceptor service.
42 using select_tcp_acceptor_state =
43 reactor_service_state<select_scheduler, select_tcp_acceptor>;
44
45 /** select acceptor service implementation.
46
47 Inherits from tcp_acceptor_service to enable runtime polymorphism.
48 Uses key_type = tcp_acceptor_service for service lookup.
49 */
50 class BOOST_COROSIO_DECL select_tcp_acceptor_service final
51 : public tcp_acceptor_service
52 {
53 public:
54 explicit select_tcp_acceptor_service(
55 capy::execution_context& ctx, select_tcp_service& tcp_svc);
56 ~select_tcp_acceptor_service() override;
57
58 select_tcp_acceptor_service(select_tcp_acceptor_service const&) = delete;
59 select_tcp_acceptor_service&
60 operator=(select_tcp_acceptor_service const&) = delete;
61
62 void shutdown() override;
63
64 io_object::implementation* construct() override;
65 void destroy(io_object::implementation*) override;
66 void close(io_object::handle&) override;
67 std::error_code open_acceptor_socket(
68 tcp_acceptor::implementation& impl,
69 int family,
70 int type,
71 int protocol) override;
72 std::error_code
73 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
74 std::error_code
75 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
76
77 116x select_scheduler& scheduler() const noexcept
78 {
79 116x return state_->sched_;
80 }
81 void post(scheduler_op* op);
82 void work_started() noexcept;
83 void work_finished() noexcept;
84
85 /** Get the TCP service for creating peer sockets during accept. */
86 select_tcp_service* tcp_service() const noexcept;
87
88 private:
89 capy::execution_context& ctx_;
90 select_tcp_service* tcp_svc_;
91 std::unique_ptr<select_tcp_acceptor_state> state_;
92 };
93
94 inline void
95 select_accept_op::cancel() noexcept
96 {
97 if (acceptor_impl_)
98 acceptor_impl_->cancel_single_op(*this);
99 else
100 request_cancel();
101 }
102
103 inline void
104 2844x select_accept_op::operator()()
105 {
106 2844x complete_accept_op<select_tcp_socket>(*this);
107 2844x }
108
109 61x inline select_tcp_acceptor::select_tcp_acceptor(
110 61x select_tcp_acceptor_service& svc) noexcept
111 61x : reactor_acceptor(svc)
112 {
113 61x }
114
115 inline std::coroutine_handle<>
116 2844x select_tcp_acceptor::accept(
117 std::coroutine_handle<> h,
118 capy::executor_ref ex,
119 std::stop_token token,
120 std::error_code* ec,
121 io_object::implementation** impl_out)
122 {
123 2844x auto& op = acc_;
124 2844x op.reset();
125 2844x op.h = h;
126 2844x op.ex = ex;
127 2844x op.ec_out = ec;
128 2844x op.impl_out = impl_out;
129 2844x op.fd = fd_;
130 2844x op.start(token, this);
131
132 2844x sockaddr_storage peer_storage{};
133 2844x socklen_t addrlen = sizeof(peer_storage);
134 int accepted;
135 do
136 {
137 accepted =
138 2844x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
139 }
140 2844x while (accepted < 0 && errno == EINTR);
141
142 2844x if (accepted >= 0)
143 {
144 2x if (accepted >= FD_SETSIZE)
145 {
146 ::close(accepted);
147 op.complete(EINVAL, 0);
148 op.impl_ptr = shared_from_this();
149 svc_.post(&op);
150 return std::noop_coroutine();
151 }
152
153 2x int flags = ::fcntl(accepted, F_GETFL, 0);
154 2x if (flags == -1)
155 {
156 int err = errno;
157 ::close(accepted);
158 op.complete(err, 0);
159 op.impl_ptr = shared_from_this();
160 svc_.post(&op);
161 return std::noop_coroutine();
162 }
163
164 2x if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
165 {
166 int err = errno;
167 ::close(accepted);
168 op.complete(err, 0);
169 op.impl_ptr = shared_from_this();
170 svc_.post(&op);
171 return std::noop_coroutine();
172 }
173
174 2x if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
175 {
176 int err = errno;
177 ::close(accepted);
178 op.complete(err, 0);
179 op.impl_ptr = shared_from_this();
180 svc_.post(&op);
181 return std::noop_coroutine();
182 }
183
184 {
185 2x std::lock_guard lock(desc_state_.mutex);
186 2x desc_state_.read_ready = false;
187 2x }
188
189 2x if (svc_.scheduler().try_consume_inline_budget())
190 {
191 auto* socket_svc = svc_.tcp_service();
192 if (socket_svc)
193 {
194 auto& impl =
195 static_cast<select_tcp_socket&>(*socket_svc->construct());
196 impl.set_socket(accepted);
197
198 impl.desc_state_.fd = accepted;
199 {
200 std::lock_guard lock(impl.desc_state_.mutex);
201 impl.desc_state_.read_op = nullptr;
202 impl.desc_state_.write_op = nullptr;
203 impl.desc_state_.connect_op = nullptr;
204 }
205 socket_svc->scheduler().register_descriptor(
206 accepted, &impl.desc_state_);
207
208 impl.set_endpoints(
209 local_endpoint_, from_sockaddr(peer_storage));
210
211 *ec = {};
212 if (impl_out)
213 *impl_out = &impl;
214 }
215 else
216 {
217 ::close(accepted);
218 *ec = make_err(ENOENT);
219 if (impl_out)
220 *impl_out = nullptr;
221 }
222 op.cont_op.cont.h = h;
223 return dispatch_coro(ex, op.cont_op.cont);
224 }
225
226 2x op.accepted_fd = accepted;
227 2x op.peer_storage = peer_storage;
228 2x op.complete(0, 0);
229 2x op.impl_ptr = shared_from_this();
230 2x svc_.post(&op);
231 2x return std::noop_coroutine();
232 }
233
234 2842x if (errno == EAGAIN || errno == EWOULDBLOCK)
235 {
236 2842x op.impl_ptr = shared_from_this();
237 2842x svc_.work_started();
238
239 2842x std::lock_guard lock(desc_state_.mutex);
240 2842x bool io_done = false;
241 2842x if (desc_state_.read_ready)
242 {
243 desc_state_.read_ready = false;
244 op.perform_io();
245 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
246 if (!io_done)
247 op.errn = 0;
248 }
249
250 2842x if (io_done || op.cancelled.load(std::memory_order_acquire))
251 {
252 svc_.post(&op);
253 svc_.work_finished();
254 }
255 else
256 {
257 2842x desc_state_.read_op = &op;
258 }
259 2842x return std::noop_coroutine();
260 2842x }
261
262 op.complete(errno, 0);
263 op.impl_ptr = shared_from_this();
264 svc_.post(&op);
265 return std::noop_coroutine();
266 }
267
268 inline void
269 2x select_tcp_acceptor::cancel() noexcept
270 {
271 2x do_cancel();
272 2x }
273
274 inline void
275 240x select_tcp_acceptor::close_socket() noexcept
276 {
277 240x do_close_socket();
278 240x }
279
280 195x inline select_tcp_acceptor_service::select_tcp_acceptor_service(
281 195x capy::execution_context& ctx, select_tcp_service& tcp_svc)
282 195x : ctx_(ctx)
283 195x , tcp_svc_(&tcp_svc)
284 195x , state_(
285 std::make_unique<select_tcp_acceptor_state>(
286 195x ctx.use_service<select_scheduler>()))
287 {
288 195x }
289
290 390x inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
291
292 inline void
293 195x select_tcp_acceptor_service::shutdown()
294 {
295 195x std::lock_guard lock(state_->mutex_);
296
297 195x while (auto* impl = state_->impl_list_.pop_front())
298 impl->close_socket();
299
300 // Don't clear impl_ptrs_ here — same rationale as
301 // select_tcp_service::shutdown(). Let ~state_ release ptrs
302 // after scheduler shutdown has drained all queued ops.
303 195x }
304
305 inline io_object::implementation*
306 61x select_tcp_acceptor_service::construct()
307 {
308 61x auto impl = std::make_shared<select_tcp_acceptor>(*this);
309 61x auto* raw = impl.get();
310
311 61x std::lock_guard lock(state_->mutex_);
312 61x state_->impl_ptrs_.emplace(raw, std::move(impl));
313 61x state_->impl_list_.push_back(raw);
314
315 61x return raw;
316 61x }
317
318 inline void
319 61x select_tcp_acceptor_service::destroy(io_object::implementation* impl)
320 {
321 61x auto* select_impl = static_cast<select_tcp_acceptor*>(impl);
322 61x select_impl->close_socket();
323 61x std::lock_guard lock(state_->mutex_);
324 61x state_->impl_list_.remove(select_impl);
325 61x state_->impl_ptrs_.erase(select_impl);
326 61x }
327
328 inline void
329 120x select_tcp_acceptor_service::close(io_object::handle& h)
330 {
331 120x static_cast<select_tcp_acceptor*>(h.get())->close_socket();
332 120x }
333
334 inline std::error_code
335 59x select_tcp_acceptor_service::open_acceptor_socket(
336 tcp_acceptor::implementation& impl, int family, int type, int protocol)
337 {
338 59x auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
339 59x select_impl->close_socket();
340
341 59x int fd = ::socket(family, type, protocol);
342 59x if (fd < 0)
343 return make_err(errno);
344
345 59x int flags = ::fcntl(fd, F_GETFL, 0);
346 59x if (flags == -1)
347 {
348 int errn = errno;
349 ::close(fd);
350 return make_err(errn);
351 }
352 59x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
353 {
354 int errn = errno;
355 ::close(fd);
356 return make_err(errn);
357 }
358 59x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
359 {
360 int errn = errno;
361 ::close(fd);
362 return make_err(errn);
363 }
364
365 59x if (fd >= FD_SETSIZE)
366 {
367 ::close(fd);
368 return make_err(EMFILE);
369 }
370
371 59x if (family == AF_INET6)
372 {
373 8x int val = 0; // dual-stack default
374 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
375 }
376
377 #ifdef SO_NOSIGPIPE
378 {
379 int nosig = 1;
380 ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
381 }
382 #endif
383
384 59x select_impl->fd_ = fd;
385
386 // Set up descriptor state but do NOT register with reactor yet
387 // (registration happens in do_listen via reactor_acceptor base)
388 59x select_impl->desc_state_.fd = fd;
389 {
390 59x std::lock_guard lock(select_impl->desc_state_.mutex);
391 59x select_impl->desc_state_.read_op = nullptr;
392 59x }
393
394 59x return {};
395 }
396
397 inline std::error_code
398 58x select_tcp_acceptor_service::bind_acceptor(
399 tcp_acceptor::implementation& impl, endpoint ep)
400 {
401 58x return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
402 }
403
404 inline std::error_code
405 57x select_tcp_acceptor_service::listen_acceptor(
406 tcp_acceptor::implementation& impl, int backlog)
407 {
408 57x return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
409 }
410
411 inline void
412 5x select_tcp_acceptor_service::post(scheduler_op* op)
413 {
414 5x state_->sched_.post(op);
415 5x }
416
417 inline void
418 2842x select_tcp_acceptor_service::work_started() noexcept
419 {
420 2842x state_->sched_.work_started();
421 2842x }
422
423 inline void
424 3x select_tcp_acceptor_service::work_finished() noexcept
425 {
426 3x state_->sched_.work_finished();
427 3x }
428
429 inline select_tcp_service*
430 2841x select_tcp_acceptor_service::tcp_service() const noexcept
431 {
432 2841x return tcp_svc_;
433 }
434
435 } // namespace boost::corosio::detail
436
437 #endif // BOOST_COROSIO_HAS_SELECT
438
439 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
440