include/boost/corosio/native/detail/posix/posix_random_access_file_service.hpp

86.4% Lines (121/140) 100.0% List of functions (15/15)
posix_random_access_file_service.hpp
f(x) Functions (15)
Function Calls Lines Blocks
boost::corosio::detail::posix_random_access_file_service::posix_random_access_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :33 517x 100.0% 88.0% boost::corosio::detail::posix_random_access_file_service::~posix_random_access_file_service() :40 1034x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::construct() :47 24x 100.0% 71.0% boost::corosio::detail::posix_random_access_file_service::destroy(boost::corosio::io_object::implementation*) :61 24x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::close(boost::corosio::io_object::handle&) :69 42x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::open_file(boost::corosio::random_access_file::implementation&, std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :79 19x 80.0% 83.0% boost::corosio::detail::posix_random_access_file_service::shutdown() :90 517x 62.5% 70.0% boost::corosio::detail::posix_random_access_file_service::destroy_impl(boost::corosio::detail::posix_random_access_file&) :102 24x 100.0% 67.0% boost::corosio::detail::posix_random_access_file_service::post(boost::corosio::detail::scheduler_op*) :109 126x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::pool() :124 126x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::get_or_create_pool(boost::capy::execution_context&) :130 517x 80.0% 67.0% boost::corosio::detail::get_random_access_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :150 517x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::read_some_at(unsigned long, std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :160 116x 83.3% 79.0% boost::corosio::detail::posix_random_access_file::write_some_at(unsigned long, std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :215 10x 83.3% 79.0% boost::corosio::detail::posix_random_access_file::raf_op::do_work(boost::corosio::detail::pool_work_item*) :272 126x 83.3% 76.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/native/detail/posix/posix_random_access_file.hpp>
18 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
19 #include <boost/corosio/detail/random_access_file_service.hpp>
20 #include <boost/corosio/detail/thread_pool.hpp>
21
22 #include <limits>
23 #include <mutex>
24 #include <unordered_map>
25
26 namespace boost::corosio::detail {
27
28 /** Random-access file service for POSIX backends. */
29 class BOOST_COROSIO_DECL posix_random_access_file_service final
30 : public random_access_file_service
31 {
32 public:
33 517x posix_random_access_file_service(
34 capy::execution_context& ctx, scheduler& sched)
35 1034x : sched_(&sched)
36 517x , pool_(get_or_create_pool(ctx))
37 {
38 517x }
39
40 1034x ~posix_random_access_file_service() override = default;
41
42 posix_random_access_file_service(
43 posix_random_access_file_service const&) = delete;
44 posix_random_access_file_service& operator=(
45 posix_random_access_file_service const&) = delete;
46
47 24x io_object::implementation* construct() override
48 {
49 24x auto ptr = std::make_shared<posix_random_access_file>(*this);
50 24x auto* impl = ptr.get();
51
52 {
53 24x std::lock_guard<std::mutex> lock(mutex_);
54 24x file_list_.push_back(impl);
55 24x file_ptrs_[impl] = std::move(ptr);
56 24x }
57
58 24x return impl;
59 24x }
60
61 24x void destroy(io_object::implementation* p) override
62 {
63 24x auto& impl = static_cast<posix_random_access_file&>(*p);
64 24x impl.cancel();
65 24x impl.close_file();
66 24x destroy_impl(impl);
67 24x }
68
69 42x void close(io_object::handle& h) override
70 {
71 42x if (h.get())
72 {
73 42x auto& impl = static_cast<posix_random_access_file&>(*h.get());
74 42x impl.cancel();
75 42x impl.close_file();
76 }
77 42x }
78
79 19x std::error_code open_file(
80 random_access_file::implementation& impl,
81 std::filesystem::path const& path,
82 file_base::flags mode) override
83 {
84 19x if (static_cast<reactor_scheduler const*>(sched_)->is_single_threaded())
85 return std::make_error_code(std::errc::operation_not_supported);
86 19x return static_cast<posix_random_access_file&>(impl).open_file(
87 19x path, mode);
88 }
89
90 517x void shutdown() override
91 {
92 517x std::lock_guard<std::mutex> lock(mutex_);
93 517x for (auto* impl = file_list_.pop_front(); impl != nullptr;
94 impl = file_list_.pop_front())
95 {
96 impl->cancel();
97 impl->close_file();
98 }
99 517x file_ptrs_.clear();
100 517x }
101
102 24x void destroy_impl(posix_random_access_file& impl)
103 {
104 24x std::lock_guard<std::mutex> lock(mutex_);
105 24x file_list_.remove(&impl);
106 24x file_ptrs_.erase(&impl);
107 24x }
108
109 126x void post(scheduler_op* op)
110 {
111 126x sched_->post(op);
112 126x }
113
114 void work_started() noexcept
115 {
116 sched_->work_started();
117 }
118
119 void work_finished() noexcept
120 {
121 sched_->work_finished();
122 }
123
124 126x thread_pool& pool() noexcept
125 {
126 126x return pool_;
127 }
128
129 private:
130 517x static thread_pool& get_or_create_pool(capy::execution_context& ctx)
131 {
132 517x auto* p = ctx.find_service<thread_pool>();
133 517x if (p)
134 517x return *p;
135 return ctx.make_service<thread_pool>();
136 }
137
138 scheduler* sched_;
139 thread_pool& pool_;
140 std::mutex mutex_;
141 intrusive_list<posix_random_access_file> file_list_;
142 std::unordered_map<
143 posix_random_access_file*,
144 std::shared_ptr<posix_random_access_file>>
145 file_ptrs_;
146 };
147
148 /** Get or create the random-access file service for the given context. */
149 inline posix_random_access_file_service&
150 517x get_random_access_file_service(capy::execution_context& ctx, scheduler& sched)
151 {
152 517x return ctx.make_service<posix_random_access_file_service>(sched);
153 }
154
155 // ---------------------------------------------------------------------------
156 // posix_random_access_file inline implementations (require complete service)
157 // ---------------------------------------------------------------------------
158
159 inline std::coroutine_handle<>
160 116x posix_random_access_file::read_some_at(
161 std::uint64_t offset,
162 std::coroutine_handle<> h,
163 capy::executor_ref ex,
164 buffer_param param,
165 std::stop_token token,
166 std::error_code* ec,
167 std::size_t* bytes_out)
168 {
169 116x capy::mutable_buffer bufs[max_buffers];
170 116x auto count = param.copy_to(bufs, max_buffers);
171
172 116x if (count == 0)
173 {
174 *ec = {};
175 *bytes_out = 0;
176 return h;
177 }
178
179 116x auto* op = new raf_op();
180 116x op->is_read = true;
181 116x op->offset = offset;
182
183 116x op->iovec_count = static_cast<int>(count);
184 232x for (int i = 0; i < op->iovec_count; ++i)
185 {
186 116x op->iovecs[i].iov_base = bufs[i].data();
187 116x op->iovecs[i].iov_len = bufs[i].size();
188 }
189
190 116x op->h = h;
191 116x op->ex = ex;
192 116x op->ec_out = ec;
193 116x op->bytes_out = bytes_out;
194 116x op->file_ = this;
195 116x op->file_ref = this->shared_from_this();
196 116x op->start(token);
197
198 116x op->ex.on_work_started();
199
200 {
201 116x std::lock_guard<std::mutex> lock(ops_mutex_);
202 116x outstanding_ops_.push_back(op);
203 116x }
204
205 116x static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
206 116x if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
207 {
208 op->cancelled.store(true, std::memory_order_release);
209 svc_.post(static_cast<scheduler_op*>(op));
210 }
211 116x return std::noop_coroutine();
212 }
213
214 inline std::coroutine_handle<>
215 10x posix_random_access_file::write_some_at(
216 std::uint64_t offset,
217 std::coroutine_handle<> h,
218 capy::executor_ref ex,
219 buffer_param param,
220 std::stop_token token,
221 std::error_code* ec,
222 std::size_t* bytes_out)
223 {
224 10x capy::mutable_buffer bufs[max_buffers];
225 10x auto count = param.copy_to(bufs, max_buffers);
226
227 10x if (count == 0)
228 {
229 *ec = {};
230 *bytes_out = 0;
231 return h;
232 }
233
234 10x auto* op = new raf_op();
235 10x op->is_read = false;
236 10x op->offset = offset;
237
238 10x op->iovec_count = static_cast<int>(count);
239 20x for (int i = 0; i < op->iovec_count; ++i)
240 {
241 10x op->iovecs[i].iov_base = bufs[i].data();
242 10x op->iovecs[i].iov_len = bufs[i].size();
243 }
244
245 10x op->h = h;
246 10x op->ex = ex;
247 10x op->ec_out = ec;
248 10x op->bytes_out = bytes_out;
249 10x op->file_ = this;
250 10x op->file_ref = this->shared_from_this();
251 10x op->start(token);
252
253 10x op->ex.on_work_started();
254
255 {
256 10x std::lock_guard<std::mutex> lock(ops_mutex_);
257 10x outstanding_ops_.push_back(op);
258 10x }
259
260 10x static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
261 10x if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
262 {
263 op->cancelled.store(true, std::memory_order_release);
264 svc_.post(static_cast<scheduler_op*>(op));
265 }
266 10x return std::noop_coroutine();
267 }
268
269 // -- raf_op thread-pool work function --
270
271 inline void
272 126x posix_random_access_file::raf_op::do_work(pool_work_item* w) noexcept
273 {
274 126x auto* op = static_cast<raf_op*>(w);
275 126x auto* self = op->file_;
276
277 126x if (op->cancelled.load(std::memory_order_acquire))
278 {
279 1x op->errn = ECANCELED;
280 1x op->bytes_transferred = 0;
281 }
282 250x else if (op->offset >
283 125x static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
284 {
285 op->errn = EOVERFLOW;
286 op->bytes_transferred = 0;
287 }
288 else
289 {
290 ssize_t n;
291 125x if (op->is_read)
292 {
293 do
294 {
295 230x n = ::preadv(self->fd_, op->iovecs, op->iovec_count,
296 115x static_cast<off_t>(op->offset));
297 }
298 115x while (n < 0 && errno == EINTR);
299 }
300 else
301 {
302 do
303 {
304 20x n = ::pwritev(self->fd_, op->iovecs, op->iovec_count,
305 10x static_cast<off_t>(op->offset));
306 }
307 10x while (n < 0 && errno == EINTR);
308 }
309
310 125x if (n >= 0)
311 {
312 125x op->errn = 0;
313 125x op->bytes_transferred = static_cast<std::size_t>(n);
314 }
315 else
316 {
317 op->errn = errno;
318 op->bytes_transferred = 0;
319 }
320 }
321
322 126x self->svc_.post(static_cast<scheduler_op*>(op));
323 126x }
324
325 } // namespace boost::corosio::detail
326
327 #endif // BOOST_COROSIO_POSIX
328
329 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
330