include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp

84.1% Lines (122/145) 100.0% List of functions (16/16)
posix_stream_file_service.hpp
f(x) Functions (16)
Function Calls Lines Blocks
boost::corosio::detail::posix_stream_file_service::posix_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :36 517x 100.0% 88.0% boost::corosio::detail::posix_stream_file_service::~posix_stream_file_service() :43 1034x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::construct() :48 26x 100.0% 71.0% boost::corosio::detail::posix_stream_file_service::destroy(boost::corosio::io_object::implementation*) :62 26x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::close(boost::corosio::io_object::handle&) :70 43x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::open_file(boost::corosio::stream_file::implementation&, std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :80 19x 75.0% 80.0% boost::corosio::detail::posix_stream_file_service::shutdown() :90 517x 62.5% 70.0% boost::corosio::detail::posix_stream_file_service::destroy_impl(boost::corosio::detail::posix_stream_file&) :102 26x 100.0% 67.0% boost::corosio::detail::posix_stream_file_service::post(boost::corosio::detail::scheduler_op*) :109 12x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::pool() :124 12x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::get_or_create_pool(boost::capy::execution_context&) :130 517x 80.0% 67.0% boost::corosio::detail::get_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :148 517x 100.0% 100.0% boost::corosio::detail::posix_stream_file::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :158 6x 75.0% 73.0% boost::corosio::detail::posix_stream_file::do_read_work(boost::corosio::detail::pool_work_item*) :208 6x 88.2% 83.0% boost::corosio::detail::posix_stream_file::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :242 6x 75.0% 73.0% boost::corosio::detail::posix_stream_file::do_write_work(boost::corosio::detail::pool_work_item*) :292 6x 88.2% 83.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
18 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
19 #include <boost/corosio/detail/file_service.hpp>
20 #include <boost/corosio/detail/thread_pool.hpp>
21
22 #include <mutex>
23 #include <unordered_map>
24
25 namespace boost::corosio::detail {
26
27 /** Stream file service for POSIX backends.
28
29 Owns all posix_stream_file instances. Thread lifecycle is
30 managed by the thread_pool service (shared with resolver).
31 */
32 class BOOST_COROSIO_DECL posix_stream_file_service final
33 : public file_service
34 {
35 public:
36 517x posix_stream_file_service(
37 capy::execution_context& ctx, scheduler& sched)
38 1034x : sched_(&sched)
39 517x , pool_(get_or_create_pool(ctx))
40 {
41 517x }
42
43 1034x ~posix_stream_file_service() override = default;
44
45 posix_stream_file_service(posix_stream_file_service const&) = delete;
46 posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
47
48 26x io_object::implementation* construct() override
49 {
50 26x auto ptr = std::make_shared<posix_stream_file>(*this);
51 26x auto* impl = ptr.get();
52
53 {
54 26x std::lock_guard<std::mutex> lock(mutex_);
55 26x file_list_.push_back(impl);
56 26x file_ptrs_[impl] = std::move(ptr);
57 26x }
58
59 26x return impl;
60 26x }
61
62 26x void destroy(io_object::implementation* p) override
63 {
64 26x auto& impl = static_cast<posix_stream_file&>(*p);
65 26x impl.cancel();
66 26x impl.close_file();
67 26x destroy_impl(impl);
68 26x }
69
70 43x void close(io_object::handle& h) override
71 {
72 43x if (h.get())
73 {
74 43x auto& impl = static_cast<posix_stream_file&>(*h.get());
75 43x impl.cancel();
76 43x impl.close_file();
77 }
78 43x }
79
80 19x std::error_code open_file(
81 stream_file::implementation& impl,
82 std::filesystem::path const& path,
83 file_base::flags mode) override
84 {
85 19x if (static_cast<reactor_scheduler const*>(sched_)->is_single_threaded())
86 return std::make_error_code(std::errc::operation_not_supported);
87 19x return static_cast<posix_stream_file&>(impl).open_file(path, mode);
88 }
89
90 517x void shutdown() override
91 {
92 517x std::lock_guard<std::mutex> lock(mutex_);
93 517x for (auto* impl = file_list_.pop_front(); impl != nullptr;
94 impl = file_list_.pop_front())
95 {
96 impl->cancel();
97 impl->close_file();
98 }
99 517x file_ptrs_.clear();
100 517x }
101
102 26x void destroy_impl(posix_stream_file& impl)
103 {
104 26x std::lock_guard<std::mutex> lock(mutex_);
105 26x file_list_.remove(&impl);
106 26x file_ptrs_.erase(&impl);
107 26x }
108
109 12x void post(scheduler_op* op)
110 {
111 12x sched_->post(op);
112 12x }
113
114 void work_started() noexcept
115 {
116 sched_->work_started();
117 }
118
119 void work_finished() noexcept
120 {
121 sched_->work_finished();
122 }
123
124 12x thread_pool& pool() noexcept
125 {
126 12x return pool_;
127 }
128
129 private:
130 517x static thread_pool& get_or_create_pool(capy::execution_context& ctx)
131 {
132 517x auto* p = ctx.find_service<thread_pool>();
133 517x if (p)
134 517x return *p;
135 return ctx.make_service<thread_pool>();
136 }
137
138 scheduler* sched_;
139 thread_pool& pool_;
140 std::mutex mutex_;
141 intrusive_list<posix_stream_file> file_list_;
142 std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
143 file_ptrs_;
144 };
145
146 /** Get or create the stream file service for the given context. */
147 inline posix_stream_file_service&
148 517x get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
149 {
150 517x return ctx.make_service<posix_stream_file_service>(sched);
151 }
152
153 // ---------------------------------------------------------------------------
154 // posix_stream_file inline implementations (require complete service type)
155 // ---------------------------------------------------------------------------
156
157 inline std::coroutine_handle<>
158 6x posix_stream_file::read_some(
159 std::coroutine_handle<> h,
160 capy::executor_ref ex,
161 buffer_param param,
162 std::stop_token token,
163 std::error_code* ec,
164 std::size_t* bytes_out)
165 {
166 6x auto& op = read_op_;
167 6x op.reset();
168 6x op.is_read = true;
169
170 6x capy::mutable_buffer bufs[max_buffers];
171 6x op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
172
173 6x if (op.iovec_count == 0)
174 {
175 *ec = {};
176 *bytes_out = 0;
177 op.cont_op.cont.h = h;
178 return dispatch_coro(ex, op.cont_op.cont);
179 }
180
181 12x for (int i = 0; i < op.iovec_count; ++i)
182 {
183 6x op.iovecs[i].iov_base = bufs[i].data();
184 6x op.iovecs[i].iov_len = bufs[i].size();
185 }
186
187 6x op.h = h;
188 6x op.ex = ex;
189 6x op.ec_out = ec;
190 6x op.bytes_out = bytes_out;
191 6x op.start(token);
192
193 6x op.ex.on_work_started();
194
195 6x read_pool_op_.file_ = this;
196 6x read_pool_op_.ref_ = this->shared_from_this();
197 6x read_pool_op_.func_ = &posix_stream_file::do_read_work;
198 6x if (!svc_.pool().post(&read_pool_op_))
199 {
200 op.impl_ref = std::move(read_pool_op_.ref_);
201 op.cancelled.store(true, std::memory_order_release);
202 svc_.post(&read_op_);
203 }
204 6x return std::noop_coroutine();
205 }
206
207 inline void
208 6x posix_stream_file::do_read_work(pool_work_item* w) noexcept
209 {
210 6x auto* pw = static_cast<pool_op*>(w);
211 6x auto* self = pw->file_;
212 6x auto& op = self->read_op_;
213
214 6x if (!op.cancelled.load(std::memory_order_acquire))
215 {
216 ssize_t n;
217 do
218 {
219 10x n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
220 5x static_cast<off_t>(self->offset_));
221 }
222 5x while (n < 0 && errno == EINTR);
223
224 5x if (n >= 0)
225 {
226 5x op.errn = 0;
227 5x op.bytes_transferred = static_cast<std::size_t>(n);
228 5x self->offset_ += static_cast<std::uint64_t>(n);
229 }
230 else
231 {
232 op.errn = errno;
233 op.bytes_transferred = 0;
234 }
235 }
236
237 6x op.impl_ref = std::move(pw->ref_);
238 6x self->svc_.post(&op);
239 6x }
240
241 inline std::coroutine_handle<>
242 6x posix_stream_file::write_some(
243 std::coroutine_handle<> h,
244 capy::executor_ref ex,
245 buffer_param param,
246 std::stop_token token,
247 std::error_code* ec,
248 std::size_t* bytes_out)
249 {
250 6x auto& op = write_op_;
251 6x op.reset();
252 6x op.is_read = false;
253
254 6x capy::mutable_buffer bufs[max_buffers];
255 6x op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
256
257 6x if (op.iovec_count == 0)
258 {
259 *ec = {};
260 *bytes_out = 0;
261 op.cont_op.cont.h = h;
262 return dispatch_coro(ex, op.cont_op.cont);
263 }
264
265 12x for (int i = 0; i < op.iovec_count; ++i)
266 {
267 6x op.iovecs[i].iov_base = bufs[i].data();
268 6x op.iovecs[i].iov_len = bufs[i].size();
269 }
270
271 6x op.h = h;
272 6x op.ex = ex;
273 6x op.ec_out = ec;
274 6x op.bytes_out = bytes_out;
275 6x op.start(token);
276
277 6x op.ex.on_work_started();
278
279 6x write_pool_op_.file_ = this;
280 6x write_pool_op_.ref_ = this->shared_from_this();
281 6x write_pool_op_.func_ = &posix_stream_file::do_write_work;
282 6x if (!svc_.pool().post(&write_pool_op_))
283 {
284 op.impl_ref = std::move(write_pool_op_.ref_);
285 op.cancelled.store(true, std::memory_order_release);
286 svc_.post(&write_op_);
287 }
288 6x return std::noop_coroutine();
289 }
290
291 inline void
292 6x posix_stream_file::do_write_work(pool_work_item* w) noexcept
293 {
294 6x auto* pw = static_cast<pool_op*>(w);
295 6x auto* self = pw->file_;
296 6x auto& op = self->write_op_;
297
298 6x if (!op.cancelled.load(std::memory_order_acquire))
299 {
300 ssize_t n;
301 do
302 {
303 12x n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
304 6x static_cast<off_t>(self->offset_));
305 }
306 6x while (n < 0 && errno == EINTR);
307
308 6x if (n >= 0)
309 {
310 6x op.errn = 0;
311 6x op.bytes_transferred = static_cast<std::size_t>(n);
312 6x self->offset_ += static_cast<std::uint64_t>(n);
313 }
314 else
315 {
316 op.errn = errno;
317 op.bytes_transferred = 0;
318 }
319 }
320
321 6x op.impl_ref = std::move(pw->ref_);
322 6x self->svc_.post(&op);
323 6x }
324
325 } // namespace boost::corosio::detail
326
327 #endif // BOOST_COROSIO_POSIX
328
329 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
330