TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : // Copyright (c) 2026 Michael Vandeberg
5 : //
6 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
7 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 : //
9 : // Official repository: https://github.com/cppalliance/corosio
10 : //
11 :
12 : #ifndef BOOST_COROSIO_IO_CONTEXT_HPP
13 : #define BOOST_COROSIO_IO_CONTEXT_HPP
14 :
15 : #include <boost/corosio/detail/config.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/corosio/detail/platform.hpp>
18 : #include <boost/corosio/detail/scheduler.hpp>
19 : #include <boost/capy/continuation.hpp>
20 : #include <boost/capy/ex/execution_context.hpp>
21 :
22 : #include <chrono>
23 : #include <coroutine>
24 : #include <cstddef>
25 : #include <limits>
26 : #include <thread>
27 :
28 : namespace boost::corosio {
29 :
30 : /** Runtime tuning options for @ref io_context.
31 :
32 : All fields have defaults that match the library's built-in
33 : values, so constructing a default `io_context_options` produces
34 : identical behavior to an unconfigured context.
35 :
36 : Options that apply only to a specific backend family are
37 : silently ignored when the active backend does not support them.
38 :
39 : @par Example
40 : @code
41 : io_context_options opts;
42 : opts.max_events_per_poll = 256; // larger batch per syscall
43 : opts.inline_budget_max = 32; // more speculative completions
44 : opts.thread_pool_size = 4; // more file-I/O workers
45 :
46 : io_context ioc(opts);
47 : @endcode
48 :
49 : @see io_context, native_io_context
50 : */
51 : struct io_context_options
52 : {
53 : /** Maximum events fetched per reactor poll call.
54 :
55 : Controls the buffer size passed to `epoll_wait()` or
56 : `kevent()`. Larger values reduce syscall frequency under
57 : high load; smaller values improve fairness between
58 : connections. Ignored on IOCP and select backends.
59 : */
60 : unsigned max_events_per_poll = 128;
61 :
62 : /** Starting inline completion budget per handler chain.
63 :
64 : After a posted handler executes, the reactor grants this
65 : many speculative inline completions before forcing a
66 : re-queue. Applies to reactor backends only.
67 : */
68 : unsigned inline_budget_initial = 2;
69 :
70 : /** Hard ceiling on adaptive inline budget ramp-up.
71 :
72 : The budget doubles each cycle it is fully consumed, up to
73 : this limit. Applies to reactor backends only.
74 : */
75 : unsigned inline_budget_max = 16;
76 :
77 : /** Inline budget when no other thread assists the reactor.
78 :
79 : When only one thread is running the event loop, this
80 : value caps the inline budget to preserve fairness.
81 : Applies to reactor backends only.
82 : */
83 : unsigned unassisted_budget = 4;
84 :
85 : /** Maximum `GetQueuedCompletionStatus` timeout in milliseconds.
86 :
87 : Bounds how long the IOCP scheduler blocks between timer
88 : rechecks. Lower values improve timer responsiveness at the
89 : cost of more syscalls. Applies to IOCP only.
90 : */
91 : unsigned gqcs_timeout_ms = 500;
92 :
93 : /** Thread pool size for blocking I/O (file I/O, DNS resolution).
94 :
95 : Sets the number of worker threads in the shared thread pool
96 : used by POSIX file services and DNS resolution. Must be at
97 : least 1. Applies to POSIX backends only; ignored on IOCP
98 : where file I/O uses native overlapped I/O.
99 : */
100 : unsigned thread_pool_size = 1;
101 :
102 : /** Enable single-threaded mode (disable scheduler locking).
103 :
104 : When true, the scheduler skips all mutex lock/unlock and
105 : condition variable operations on the hot path. This
106 : eliminates synchronization overhead when only one thread
107 : calls `run()`.
108 :
109 : @par Restrictions
110 : - Only one thread may call `run()` (or any run variant).
111 : - Posting work from another thread is undefined behavior.
112 : - DNS resolution returns `operation_not_supported`.
113 : - POSIX file I/O returns `operation_not_supported`.
114 : - Signal sets should not be shared across contexts.
115 : */
116 : bool single_threaded = false;
117 : };
118 :
119 : namespace detail {
120 : class timer_service;
121 : struct timer_service_access;
122 : } // namespace detail
123 :
124 : /** An I/O context for running asynchronous operations.
125 :
126 : The io_context provides an execution environment for async
127 : operations. It maintains a queue of pending work items and
128 : processes them when `run()` is called.
129 :
130 : The default and unsigned constructors select the platform's
131 : native backend:
132 : - Windows: IOCP
133 : - Linux: epoll
134 : - BSD/macOS: kqueue
135 : - Other POSIX: select
136 :
137 : The template constructor accepts a backend tag value to
138 : choose a specific backend at compile time:
139 :
140 : @par Example
141 : @code
142 : io_context ioc; // platform default
143 : io_context ioc2(corosio::epoll); // explicit backend
144 : @endcode
145 :
146 : @par Thread Safety
147 : Distinct objects: Safe.@n
148 : Shared objects: Safe, if using a concurrency hint greater
149 : than 1.
150 :
151 : @see epoll_t, select_t, kqueue_t, iocp_t
152 : */
153 : class BOOST_COROSIO_DECL io_context : public capy::execution_context
154 : {
155 : friend struct detail::timer_service_access;
156 :
157 : /// Pre-create services that depend on options (before construct).
158 : void apply_options_pre_(io_context_options const& opts);
159 :
160 : /// Apply runtime tuning to the scheduler (after construct).
161 : void apply_options_post_(io_context_options const& opts);
162 :
163 : protected:
164 : detail::timer_service* timer_svc_ = nullptr;
165 : detail::scheduler* sched_;
166 :
167 : public:
168 : /** The executor type for this context. */
169 : class executor_type;
170 :
171 : /** Construct with default concurrency and platform backend. */
172 : io_context();
173 :
174 : /** Construct with a concurrency hint and platform backend.
175 :
176 : @param concurrency_hint Hint for the number of threads
177 : that will call `run()`.
178 : */
179 : explicit io_context(unsigned concurrency_hint);
180 :
181 : /** Construct with runtime tuning options and platform backend.
182 :
183 : @param opts Runtime options controlling scheduler and
184 : service behavior.
185 : @param concurrency_hint Hint for the number of threads
186 : that will call `run()`.
187 : */
188 : explicit io_context(
189 : io_context_options const& opts,
190 : unsigned concurrency_hint = std::thread::hardware_concurrency());
191 :
192 : /** Construct with an explicit backend tag.
193 :
194 : @param backend The backend tag value selecting the I/O
195 : multiplexer (e.g. `corosio::epoll`).
196 : @param concurrency_hint Hint for the number of threads
197 : that will call `run()`.
198 : */
199 : template<class Backend>
200 : requires requires { Backend::construct; }
201 HIT 390 : explicit io_context(
202 : Backend backend,
203 : unsigned concurrency_hint = std::thread::hardware_concurrency())
204 : : capy::execution_context(this)
205 390 : , sched_(nullptr)
206 : {
207 : (void)backend;
208 390 : sched_ = &Backend::construct(*this, concurrency_hint);
209 390 : }
210 :
211 : /** Construct with an explicit backend tag and runtime options.
212 :
213 : @param backend The backend tag value selecting the I/O
214 : multiplexer (e.g. `corosio::epoll`).
215 : @param opts Runtime options controlling scheduler and
216 : service behavior.
217 : @param concurrency_hint Hint for the number of threads
218 : that will call `run()`.
219 : */
220 : template<class Backend>
221 : requires requires { Backend::construct; }
222 : explicit io_context(
223 : Backend backend,
224 : io_context_options const& opts,
225 : unsigned concurrency_hint = std::thread::hardware_concurrency())
226 : : capy::execution_context(this)
227 : , sched_(nullptr)
228 : {
229 : (void)backend;
230 : apply_options_pre_(opts);
231 : sched_ = &Backend::construct(*this, concurrency_hint);
232 : apply_options_post_(opts);
233 : }
234 :
235 : ~io_context();
236 :
237 : io_context(io_context const&) = delete;
238 : io_context& operator=(io_context const&) = delete;
239 :
240 : /** Return an executor for this context.
241 :
242 : The returned executor can be used to dispatch coroutines
243 : and post work items to this context.
244 :
245 : @return An executor associated with this context.
246 : */
247 : executor_type get_executor() const noexcept;
248 :
249 : /** Signal the context to stop processing.
250 :
251 : This causes `run()` to return as soon as possible. Any pending
252 : work items remain queued.
253 : */
254 5 : void stop()
255 : {
256 5 : sched_->stop();
257 5 : }
258 :
259 : /** Return whether the context has been stopped.
260 :
261 : @return `true` if `stop()` has been called and `restart()`
262 : has not been called since.
263 : */
264 62 : bool stopped() const noexcept
265 : {
266 62 : return sched_->stopped();
267 : }
268 :
269 : /** Restart the context after being stopped.
270 :
271 : This function must be called before `run()` can be called
272 : again after `stop()` has been called.
273 : */
274 91 : void restart()
275 : {
276 91 : sched_->restart();
277 91 : }
278 :
279 : /** Process all pending work items.
280 :
281 : This function blocks until all pending work items have been
282 : executed or `stop()` is called. The context is stopped
283 : when there is no more outstanding work.
284 :
285 : @note The context must be restarted with `restart()` before
286 : calling this function again after it returns.
287 :
288 : @return The number of handlers executed.
289 : */
290 388 : std::size_t run()
291 : {
292 388 : return sched_->run();
293 : }
294 :
295 : /** Process at most one pending work item.
296 :
297 : This function blocks until one work item has been executed
298 : or `stop()` is called. The context is stopped when there
299 : is no more outstanding work.
300 :
301 : @note The context must be restarted with `restart()` before
302 : calling this function again after it returns.
303 :
304 : @return The number of handlers executed (0 or 1).
305 : */
306 2 : std::size_t run_one()
307 : {
308 2 : return sched_->run_one();
309 : }
310 :
311 : /** Process work items for the specified duration.
312 :
313 : This function blocks until work items have been executed for
314 : the specified duration, or `stop()` is called. The context
315 : is stopped when there is no more outstanding work.
316 :
317 : @note The context must be restarted with `restart()` before
318 : calling this function again after it returns.
319 :
320 : @param rel_time The duration for which to process work.
321 :
322 : @return The number of handlers executed.
323 : */
324 : template<class Rep, class Period>
325 9 : std::size_t run_for(std::chrono::duration<Rep, Period> const& rel_time)
326 : {
327 9 : return run_until(std::chrono::steady_clock::now() + rel_time);
328 : }
329 :
330 : /** Process work items until the specified time.
331 :
332 : This function blocks until the specified time is reached
333 : or `stop()` is called. The context is stopped when there
334 : is no more outstanding work.
335 :
336 : @note The context must be restarted with `restart()` before
337 : calling this function again after it returns.
338 :
339 : @param abs_time The time point until which to process work.
340 :
341 : @return The number of handlers executed.
342 : */
343 : template<class Clock, class Duration>
344 : std::size_t
345 9 : run_until(std::chrono::time_point<Clock, Duration> const& abs_time)
346 : {
347 9 : std::size_t n = 0;
348 58 : while (run_one_until(abs_time))
349 49 : if (n != (std::numeric_limits<std::size_t>::max)())
350 49 : ++n;
351 9 : return n;
352 : }
353 :
354 : /** Process at most one work item for the specified duration.
355 :
356 : This function blocks until one work item has been executed,
357 : the specified duration has elapsed, or `stop()` is called.
358 : The context is stopped when there is no more outstanding work.
359 :
360 : @note The context must be restarted with `restart()` before
361 : calling this function again after it returns.
362 :
363 : @param rel_time The duration for which the call may block.
364 :
365 : @return The number of handlers executed (0 or 1).
366 : */
367 : template<class Rep, class Period>
368 3 : std::size_t run_one_for(std::chrono::duration<Rep, Period> const& rel_time)
369 : {
370 3 : return run_one_until(std::chrono::steady_clock::now() + rel_time);
371 : }
372 :
373 : /** Process at most one work item until the specified time.
374 :
375 : This function blocks until one work item has been executed,
376 : the specified time is reached, or `stop()` is called.
377 : The context is stopped when there is no more outstanding work.
378 :
379 : @note The context must be restarted with `restart()` before
380 : calling this function again after it returns.
381 :
382 : @param abs_time The time point until which the call may block.
383 :
384 : @return The number of handlers executed (0 or 1).
385 : */
386 : template<class Clock, class Duration>
387 : std::size_t
388 63 : run_one_until(std::chrono::time_point<Clock, Duration> const& abs_time)
389 : {
390 63 : typename Clock::time_point now = Clock::now();
391 104 : while (now < abs_time)
392 : {
393 102 : auto rel_time = abs_time - now;
394 102 : if (rel_time > std::chrono::seconds(1))
395 MIS 0 : rel_time = std::chrono::seconds(1);
396 :
397 HIT 102 : std::size_t s = sched_->wait_one(
398 : static_cast<long>(
399 102 : std::chrono::duration_cast<std::chrono::microseconds>(
400 : rel_time)
401 102 : .count()));
402 :
403 102 : if (s || stopped())
404 61 : return s;
405 :
406 41 : now = Clock::now();
407 : }
408 2 : return 0;
409 : }
410 :
411 : /** Process all ready work items without blocking.
412 :
413 : This function executes all work items that are ready to run
414 : without blocking for more work. The context is stopped
415 : when there is no more outstanding work.
416 :
417 : @note The context must be restarted with `restart()` before
418 : calling this function again after it returns.
419 :
420 : @return The number of handlers executed.
421 : */
422 6 : std::size_t poll()
423 : {
424 6 : return sched_->poll();
425 : }
426 :
427 : /** Process at most one ready work item without blocking.
428 :
429 : This function executes at most one work item that is ready
430 : to run without blocking for more work. The context is
431 : stopped when there is no more outstanding work.
432 :
433 : @note The context must be restarted with `restart()` before
434 : calling this function again after it returns.
435 :
436 : @return The number of handlers executed (0 or 1).
437 : */
438 4 : std::size_t poll_one()
439 : {
440 4 : return sched_->poll_one();
441 : }
442 : };
443 :
444 : /** An executor for dispatching work to an I/O context.
445 :
446 : The executor provides the interface for posting work items and
447 : dispatching coroutines to the associated context. It satisfies
448 : the `capy::Executor` concept.
449 :
450 : Executors are lightweight handles that can be copied and compared
451 : for equality. Two executors compare equal if they refer to the
452 : same context.
453 :
454 : @par Thread Safety
455 : Distinct objects: Safe.@n
456 : Shared objects: Safe.
457 : */
458 : class io_context::executor_type
459 : {
460 : io_context* ctx_ = nullptr;
461 :
462 : public:
463 : /** Default constructor.
464 :
465 : Constructs an executor not associated with any context.
466 : */
467 : executor_type() = default;
468 :
469 : /** Construct an executor from a context.
470 :
471 : @param ctx The context to associate with this executor.
472 : */
473 608 : explicit executor_type(io_context& ctx) noexcept : ctx_(&ctx) {}
474 :
475 : /** Return a reference to the associated execution context.
476 :
477 : @return Reference to the context.
478 : */
479 1296 : io_context& context() const noexcept
480 : {
481 1296 : return *ctx_;
482 : }
483 :
484 : /** Check if the current thread is running this executor's context.
485 :
486 : @return `true` if `run()` is being called on this thread.
487 : */
488 1308 : bool running_in_this_thread() const noexcept
489 : {
490 1308 : return ctx_->sched_->running_in_this_thread();
491 : }
492 :
493 : /** Informs the executor that work is beginning.
494 :
495 : Must be paired with `on_work_finished()`.
496 : */
497 1453 : void on_work_started() const noexcept
498 : {
499 1453 : ctx_->sched_->work_started();
500 1453 : }
501 :
502 : /** Informs the executor that work has completed.
503 :
504 : @par Preconditions
505 : A preceding call to `on_work_started()` on an equal executor.
506 : */
507 1427 : void on_work_finished() const noexcept
508 : {
509 1427 : ctx_->sched_->work_finished();
510 1427 : }
511 :
512 : /** Dispatch a continuation.
513 :
514 : Returns a handle for symmetric transfer. If called from
515 : within `run()`, returns `c.h`. Otherwise posts the
516 : enclosing continuation_op as a scheduler_op for later
517 : execution and returns `std::noop_coroutine()`.
518 :
519 : @param c The continuation to dispatch. Must be the `cont`
520 : member of a `detail::continuation_op`.
521 :
522 : @return A handle for symmetric transfer or `std::noop_coroutine()`.
523 : */
524 1306 : std::coroutine_handle<> dispatch(capy::continuation& c) const
525 : {
526 1306 : if (running_in_this_thread())
527 618 : return c.h;
528 688 : post(c);
529 688 : return std::noop_coroutine();
530 : }
531 :
532 : /** Post a continuation for deferred execution.
533 :
534 : If the continuation is backed by a continuation_op
535 : (tagged), posts it directly as a scheduler_op — zero
536 : heap allocation. Otherwise falls back to the
537 : heap-allocating post(coroutine_handle<>) path.
538 : */
539 8786 : void post(capy::continuation& c) const
540 : {
541 8786 : auto* op = detail::continuation_op::try_from_continuation(c);
542 8786 : if (op)
543 8095 : ctx_->sched_->post(op);
544 : else
545 691 : ctx_->sched_->post(c.h);
546 8786 : }
547 :
548 : /** Post a bare coroutine handle for deferred execution.
549 :
550 : Heap-allocates a scheduler_op to wrap the handle. Prefer
551 : posting through a continuation_op-backed continuation when
552 : the continuation has suitable lifetime.
553 :
554 : @param h The coroutine handle to post.
555 : */
556 1426 : void post(std::coroutine_handle<> h) const
557 : {
558 1426 : ctx_->sched_->post(h);
559 1426 : }
560 :
561 : /** Compare two executors for equality.
562 :
563 : @return `true` if both executors refer to the same context.
564 : */
565 1 : bool operator==(executor_type const& other) const noexcept
566 : {
567 1 : return ctx_ == other.ctx_;
568 : }
569 :
570 : /** Compare two executors for inequality.
571 :
572 : @return `true` if the executors refer to different contexts.
573 : */
574 : bool operator!=(executor_type const& other) const noexcept
575 : {
576 : return ctx_ != other.ctx_;
577 : }
578 : };
579 :
580 : inline io_context::executor_type
581 608 : io_context::get_executor() const noexcept
582 : {
583 608 : return executor_type(const_cast<io_context&>(*this));
584 : }
585 :
586 : } // namespace boost::corosio
587 :
588 : #endif // BOOST_COROSIO_IO_CONTEXT_HPP
|