TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/capy/ex/execution_context.hpp>
15 :
16 : #include <boost/corosio/detail/scheduler.hpp>
17 : #include <boost/corosio/detail/scheduler_op.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 :
20 : #include <atomic>
21 : #include <chrono>
22 : #include <coroutine>
23 : #include <cstddef>
24 : #include <cstdint>
25 : #include <limits>
26 : #include <memory>
27 : #include <stdexcept>
28 :
29 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 : #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : // Forward declarations
35 : class reactor_scheduler;
36 : class timer_service;
37 :
38 : /** Per-thread state for a reactor scheduler.
39 :
40 : Each thread running a scheduler's event loop has one of these
41 : on a thread-local stack. It holds a private work queue and
42 : inline completion budget for speculative I/O fast paths.
43 : */
44 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 : {
46 : /// Scheduler this context belongs to.
47 : reactor_scheduler const* key;
48 :
49 : /// Next context frame on this thread's stack.
50 : reactor_scheduler_context* next;
51 :
52 : /// Private work queue for reduced contention.
53 : op_queue private_queue;
54 :
55 : /// Unflushed work count for the private queue.
56 : std::int64_t private_outstanding_work;
57 :
58 : /// Remaining inline completions allowed this cycle.
59 : int inline_budget;
60 :
61 : /// Maximum inline budget (adaptive, 2-16).
62 : int inline_budget_max;
63 :
64 : /// True if no other thread absorbed queued work last cycle.
65 : bool unassisted;
66 :
67 : /// Construct a context frame linked to @a n.
68 : reactor_scheduler_context(
69 : reactor_scheduler const* k,
70 : reactor_scheduler_context* n);
71 : };
72 :
73 : /// Thread-local context stack for reactor schedulers.
74 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75 :
76 : /// Find the context frame for a scheduler on this thread.
77 : inline reactor_scheduler_context*
78 HIT 1238821 : reactor_find_context(reactor_scheduler const* self) noexcept
79 : {
80 1238821 : for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 : {
82 1235849 : if (c->key == self)
83 1235849 : return c;
84 : }
85 2972 : return nullptr;
86 : }
87 :
88 : /// Flush private work count to global counter.
89 : inline void
90 MIS 0 : reactor_flush_private_work(
91 : reactor_scheduler_context* ctx,
92 : std::atomic<std::int64_t>& outstanding_work) noexcept
93 : {
94 0 : if (ctx && ctx->private_outstanding_work > 0)
95 : {
96 0 : outstanding_work.fetch_add(
97 : ctx->private_outstanding_work, std::memory_order_relaxed);
98 0 : ctx->private_outstanding_work = 0;
99 : }
100 0 : }
101 :
102 : /** Drain private queue to global queue, flushing work count first.
103 :
104 : @return True if any ops were drained.
105 : */
106 : inline bool
107 HIT 2 : reactor_drain_private_queue(
108 : reactor_scheduler_context* ctx,
109 : std::atomic<std::int64_t>& outstanding_work,
110 : op_queue& completed_ops) noexcept
111 : {
112 2 : if (!ctx || ctx->private_queue.empty())
113 2 : return false;
114 :
115 MIS 0 : reactor_flush_private_work(ctx, outstanding_work);
116 0 : completed_ops.splice(ctx->private_queue);
117 0 : return true;
118 : }
119 :
120 : /** Non-template base for reactor-backed scheduler implementations.
121 :
122 : Provides the complete threading model shared by epoll, kqueue,
123 : and select schedulers: signal state machine, inline completion
124 : budget, work counting, run/poll methods, and the do_one event
125 : loop.
126 :
127 : Derived classes provide platform-specific hooks by overriding:
128 : - `run_task(lock, ctx)` to run the reactor poll
129 : - `interrupt_reactor()` to wake a blocked reactor
130 :
131 : De-templated from the original CRTP design to eliminate
132 : duplicate instantiations when multiple backends are compiled
133 : into the same binary. Virtual dispatch for run_task (called
134 : once per reactor cycle, before a blocking syscall) has
135 : negligible overhead.
136 :
137 : @par Thread Safety
138 : All public member functions are thread-safe.
139 : */
140 : class reactor_scheduler
141 : : public scheduler
142 : , public capy::execution_context::service
143 : {
144 : public:
145 : using key_type = scheduler;
146 : using context_type = reactor_scheduler_context;
147 : using mutex_type = conditionally_enabled_mutex;
148 : using lock_type = mutex_type::scoped_lock;
149 : using event_type = conditionally_enabled_event;
150 :
151 : /// Post a coroutine for deferred execution.
152 : void post(std::coroutine_handle<> h) const override;
153 :
154 : /// Post a scheduler operation for deferred execution.
155 : void post(scheduler_op* h) const override;
156 :
157 : /// Return true if called from a thread running this scheduler.
158 : bool running_in_this_thread() const noexcept override;
159 :
160 : /// Request the scheduler to stop dispatching handlers.
161 : void stop() override;
162 :
163 : /// Return true if the scheduler has been stopped.
164 : bool stopped() const noexcept override;
165 :
166 : /// Reset the stopped state so `run()` can resume.
167 : void restart() override;
168 :
169 : /// Run the event loop until no work remains.
170 : std::size_t run() override;
171 :
172 : /// Run until one handler completes or no work remains.
173 : std::size_t run_one() override;
174 :
175 : /// Run until one handler completes or @a usec elapses.
176 : std::size_t wait_one(long usec) override;
177 :
178 : /// Run ready handlers without blocking.
179 : std::size_t poll() override;
180 :
181 : /// Run at most one ready handler without blocking.
182 : std::size_t poll_one() override;
183 :
184 : /// Increment the outstanding work count.
185 : void work_started() noexcept override;
186 :
187 : /// Decrement the outstanding work count, stopping on zero.
188 : void work_finished() noexcept override;
189 :
190 : /** Reset the thread's inline completion budget.
191 :
192 : Called at the start of each posted completion handler to
193 : grant a fresh budget for speculative inline completions.
194 : */
195 : void reset_inline_budget() const noexcept;
196 :
197 : /** Consume one unit of inline budget if available.
198 :
199 : @return True if budget was available and consumed.
200 : */
201 : bool try_consume_inline_budget() const noexcept;
202 :
203 : /** Offset a forthcoming work_finished from work_cleanup.
204 :
205 : Called by descriptor_state when all I/O returned EAGAIN and
206 : no handler will be executed. Must be called from a scheduler
207 : thread.
208 : */
209 : void compensating_work_started() const noexcept;
210 :
211 : /** Drain work from thread context's private queue to global queue.
212 :
213 : Flushes private work count to the global counter, then
214 : transfers the queue under mutex protection.
215 :
216 : @param queue The private queue to drain.
217 : @param count Private work count to flush before draining.
218 : */
219 : void drain_thread_queue(op_queue& queue, std::int64_t count) const;
220 :
221 : /** Post completed operations for deferred invocation.
222 :
223 : If called from a thread running this scheduler, operations
224 : go to the thread's private queue (fast path). Otherwise,
225 : operations are added to the global queue under mutex and a
226 : waiter is signaled.
227 :
228 : @par Preconditions
229 : work_started() must have been called for each operation.
230 :
231 : @param ops Queue of operations to post.
232 : */
233 : void post_deferred_completions(op_queue& ops) const;
234 :
235 : /** Apply runtime configuration to the scheduler.
236 :
237 : Called by `io_context` after construction. Values that do
238 : not apply to this backend are silently ignored.
239 :
240 : @param max_events Event buffer size for epoll/kqueue.
241 : @param budget_init Starting inline completion budget.
242 : @param budget_max Hard ceiling on adaptive budget ramp-up.
243 : @param unassisted Budget when single-threaded.
244 : */
245 : virtual void configure_reactor(
246 : unsigned max_events,
247 : unsigned budget_init,
248 : unsigned budget_max,
249 : unsigned unassisted);
250 :
251 : /// Return the configured initial inline budget.
252 HIT 483 : unsigned inline_budget_initial() const noexcept
253 : {
254 483 : return inline_budget_initial_;
255 : }
256 :
257 : /// Return true if single-threaded (lockless) mode is active.
258 64 : bool is_single_threaded() const noexcept
259 : {
260 64 : return single_threaded_;
261 : }
262 :
263 : /** Enable or disable single-threaded (lockless) mode.
264 :
265 : When enabled, all scheduler mutex and condition variable
266 : operations become no-ops. Cross-thread post() is
267 : undefined behavior.
268 : */
269 MIS 0 : void configure_single_threaded(bool v) noexcept
270 : {
271 0 : single_threaded_ = v;
272 0 : mutex_.set_enabled(!v);
273 0 : cond_.set_enabled(!v);
274 0 : }
275 :
276 : protected:
277 : timer_service* timer_svc_ = nullptr;
278 : bool single_threaded_ = false;
279 :
280 HIT 517 : reactor_scheduler() = default;
281 :
282 : /** Drain completed_ops during shutdown.
283 :
284 : Pops all operations from the global queue and destroys them,
285 : skipping the task sentinel. Signals all waiting threads.
286 : Derived classes call this from their shutdown() override
287 : before performing platform-specific cleanup.
288 : */
289 : void shutdown_drain();
290 :
291 : /// RAII guard that re-inserts the task sentinel after `run_task`.
292 : struct task_cleanup
293 : {
294 : reactor_scheduler const* sched;
295 : lock_type* lock;
296 : context_type* ctx;
297 : ~task_cleanup();
298 : };
299 :
300 : mutable mutex_type mutex_{true};
301 : mutable event_type cond_{true};
302 : mutable op_queue completed_ops_;
303 : mutable std::atomic<std::int64_t> outstanding_work_{0};
304 : std::atomic<bool> stopped_{false};
305 : mutable std::atomic<bool> task_running_{false};
306 : mutable bool task_interrupted_ = false;
307 :
308 : // Runtime-configurable reactor tuning parameters.
309 : // Defaults match the library's built-in values.
310 : unsigned max_events_per_poll_ = 128;
311 : unsigned inline_budget_initial_ = 2;
312 : unsigned inline_budget_max_ = 16;
313 : unsigned unassisted_budget_ = 4;
314 :
315 : /// Bit 0 of `state_`: set when the condvar should be signaled.
316 : static constexpr std::size_t signaled_bit = 1;
317 :
318 : /// Increment per waiting thread in `state_`.
319 : static constexpr std::size_t waiter_increment = 2;
320 : mutable std::size_t state_ = 0;
321 :
322 : /// Sentinel op that triggers a reactor poll when dequeued.
323 : struct task_op final : scheduler_op
324 : {
325 MIS 0 : void operator()() override {}
326 0 : void destroy() override {}
327 : };
328 : task_op task_op_;
329 :
330 : /// Run the platform-specific reactor poll.
331 : virtual void
332 : run_task(lock_type& lock, context_type* ctx,
333 : long timeout_us) = 0;
334 :
335 : /// Wake a blocked reactor (e.g. write to eventfd or pipe).
336 : virtual void interrupt_reactor() const = 0;
337 :
338 : private:
339 : struct work_cleanup
340 : {
341 : reactor_scheduler* sched;
342 : lock_type* lock;
343 : context_type* ctx;
344 : ~work_cleanup();
345 : };
346 :
347 : std::size_t do_one(
348 : lock_type& lock, long timeout_us, context_type* ctx);
349 :
350 : void signal_all(lock_type& lock) const;
351 : bool maybe_unlock_and_signal_one(lock_type& lock) const;
352 : bool unlock_and_signal_one(lock_type& lock) const;
353 : void clear_signal() const;
354 : void wait_for_signal(lock_type& lock) const;
355 : void wait_for_signal_for(
356 : lock_type& lock, long timeout_us) const;
357 : void wake_one_thread_and_unlock(lock_type& lock) const;
358 : };
359 :
360 : /** RAII guard that pushes/pops a scheduler context frame.
361 :
362 : On construction, pushes a new context frame onto the
363 : thread-local stack. On destruction, drains any remaining
364 : private queue items to the global queue and pops the frame.
365 : */
366 : struct reactor_thread_context_guard
367 : {
368 : /// The context frame managed by this guard.
369 : reactor_scheduler_context frame_;
370 :
371 : /// Construct the guard, pushing a frame for @a sched.
372 HIT 483 : explicit reactor_thread_context_guard(
373 : reactor_scheduler const* sched) noexcept
374 483 : : frame_(sched, reactor_context_stack.get())
375 : {
376 483 : reactor_context_stack.set(&frame_);
377 483 : }
378 :
379 : /// Destroy the guard, draining private work and popping the frame.
380 483 : ~reactor_thread_context_guard() noexcept
381 : {
382 483 : if (!frame_.private_queue.empty())
383 MIS 0 : frame_.key->drain_thread_queue(
384 0 : frame_.private_queue, frame_.private_outstanding_work);
385 HIT 483 : reactor_context_stack.set(frame_.next);
386 483 : }
387 : };
388 :
389 : // ---- Inline implementations ------------------------------------------------
390 :
391 : inline
392 483 : reactor_scheduler_context::reactor_scheduler_context(
393 : reactor_scheduler const* k,
394 483 : reactor_scheduler_context* n)
395 483 : : key(k)
396 483 : , next(n)
397 483 : , private_outstanding_work(0)
398 483 : , inline_budget(0)
399 483 : , inline_budget_max(
400 483 : static_cast<int>(k->inline_budget_initial()))
401 483 : , unassisted(false)
402 : {
403 483 : }
404 :
405 : inline void
406 MIS 0 : reactor_scheduler::configure_reactor(
407 : unsigned max_events,
408 : unsigned budget_init,
409 : unsigned budget_max,
410 : unsigned unassisted)
411 : {
412 0 : if (max_events < 1 ||
413 0 : max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
414 : throw std::out_of_range(
415 0 : "max_events_per_poll must be in [1, INT_MAX]");
416 0 : if (budget_max < 1 ||
417 0 : budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
418 : throw std::out_of_range(
419 0 : "inline_budget_max must be in [1, INT_MAX]");
420 :
421 : // Clamp initial and unassisted to budget_max.
422 0 : if (budget_init > budget_max)
423 0 : budget_init = budget_max;
424 0 : if (unassisted > budget_max)
425 0 : unassisted = budget_max;
426 :
427 0 : max_events_per_poll_ = max_events;
428 0 : inline_budget_initial_ = budget_init;
429 0 : inline_budget_max_ = budget_max;
430 0 : unassisted_budget_ = unassisted;
431 0 : }
432 :
433 : inline void
434 HIT 152148 : reactor_scheduler::reset_inline_budget() const noexcept
435 : {
436 152148 : if (auto* ctx = reactor_find_context(this))
437 : {
438 : // Cap when no other thread absorbed queued work
439 152148 : if (ctx->unassisted)
440 : {
441 152148 : ctx->inline_budget_max =
442 152148 : static_cast<int>(unassisted_budget_);
443 152148 : ctx->inline_budget =
444 152148 : static_cast<int>(unassisted_budget_);
445 152148 : return;
446 : }
447 : // Ramp up when previous cycle fully consumed budget
448 MIS 0 : if (ctx->inline_budget == 0)
449 0 : ctx->inline_budget_max = (std::min)(
450 0 : ctx->inline_budget_max * 2,
451 0 : static_cast<int>(inline_budget_max_));
452 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
453 0 : ctx->inline_budget_max =
454 0 : static_cast<int>(inline_budget_initial_);
455 0 : ctx->inline_budget = ctx->inline_budget_max;
456 : }
457 : }
458 :
459 : inline bool
460 HIT 685148 : reactor_scheduler::try_consume_inline_budget() const noexcept
461 : {
462 685148 : if (auto* ctx = reactor_find_context(this))
463 : {
464 685148 : if (ctx->inline_budget > 0)
465 : {
466 548151 : --ctx->inline_budget;
467 548151 : return true;
468 : }
469 : }
470 136997 : return false;
471 : }
472 :
473 : inline void
474 2117 : reactor_scheduler::post(std::coroutine_handle<> h) const
475 : {
476 : struct post_handler final : scheduler_op
477 : {
478 : std::coroutine_handle<> h_;
479 :
480 2117 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
481 4234 : ~post_handler() override = default;
482 :
483 2108 : void operator()() override
484 : {
485 2108 : auto saved = h_;
486 2108 : delete this;
487 : // Ensure stores from the posting thread are visible
488 : std::atomic_thread_fence(std::memory_order_acquire);
489 2108 : saved.resume();
490 2108 : }
491 :
492 9 : void destroy() override
493 : {
494 9 : auto saved = h_;
495 9 : delete this;
496 9 : saved.destroy();
497 9 : }
498 : };
499 :
500 2117 : auto ph = std::make_unique<post_handler>(h);
501 :
502 2117 : if (auto* ctx = reactor_find_context(this))
503 : {
504 6 : ++ctx->private_outstanding_work;
505 6 : ctx->private_queue.push(ph.release());
506 6 : return;
507 : }
508 :
509 2111 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
510 :
511 2111 : lock_type lock(mutex_);
512 2111 : completed_ops_.push(ph.release());
513 2111 : wake_one_thread_and_unlock(lock);
514 2117 : }
515 :
516 : inline void
517 153469 : reactor_scheduler::post(scheduler_op* h) const
518 : {
519 153469 : if (auto* ctx = reactor_find_context(this))
520 : {
521 153297 : ++ctx->private_outstanding_work;
522 153297 : ctx->private_queue.push(h);
523 153297 : return;
524 : }
525 :
526 172 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
527 :
528 172 : lock_type lock(mutex_);
529 172 : completed_ops_.push(h);
530 172 : wake_one_thread_and_unlock(lock);
531 172 : }
532 :
533 : inline bool
534 1308 : reactor_scheduler::running_in_this_thread() const noexcept
535 : {
536 1308 : return reactor_find_context(this) != nullptr;
537 : }
538 :
539 : inline void
540 392 : reactor_scheduler::stop()
541 : {
542 392 : lock_type lock(mutex_);
543 392 : if (!stopped_.load(std::memory_order_acquire))
544 : {
545 376 : stopped_.store(true, std::memory_order_release);
546 376 : signal_all(lock);
547 376 : interrupt_reactor();
548 : }
549 392 : }
550 :
551 : inline bool
552 62 : reactor_scheduler::stopped() const noexcept
553 : {
554 62 : return stopped_.load(std::memory_order_acquire);
555 : }
556 :
557 : inline void
558 91 : reactor_scheduler::restart()
559 : {
560 91 : stopped_.store(false, std::memory_order_release);
561 91 : }
562 :
563 : inline std::size_t
564 388 : reactor_scheduler::run()
565 : {
566 776 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
567 : {
568 6 : stop();
569 6 : return 0;
570 : }
571 :
572 382 : reactor_thread_context_guard ctx(this);
573 382 : lock_type lock(mutex_);
574 :
575 382 : std::size_t n = 0;
576 : for (;;)
577 : {
578 415347 : if (!do_one(lock, -1, &ctx.frame_))
579 382 : break;
580 414965 : if (n != (std::numeric_limits<std::size_t>::max)())
581 414965 : ++n;
582 414965 : if (!lock.owns_lock())
583 269499 : lock.lock();
584 : }
585 382 : return n;
586 382 : }
587 :
588 : inline std::size_t
589 2 : reactor_scheduler::run_one()
590 : {
591 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
592 : {
593 MIS 0 : stop();
594 0 : return 0;
595 : }
596 :
597 HIT 2 : reactor_thread_context_guard ctx(this);
598 2 : lock_type lock(mutex_);
599 2 : return do_one(lock, -1, &ctx.frame_);
600 2 : }
601 :
602 : inline std::size_t
603 102 : reactor_scheduler::wait_one(long usec)
604 : {
605 204 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
606 : {
607 10 : stop();
608 10 : return 0;
609 : }
610 :
611 92 : reactor_thread_context_guard ctx(this);
612 92 : lock_type lock(mutex_);
613 92 : return do_one(lock, usec, &ctx.frame_);
614 92 : }
615 :
616 : inline std::size_t
617 6 : reactor_scheduler::poll()
618 : {
619 12 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
620 : {
621 1 : stop();
622 1 : return 0;
623 : }
624 :
625 5 : reactor_thread_context_guard ctx(this);
626 5 : lock_type lock(mutex_);
627 :
628 5 : std::size_t n = 0;
629 : for (;;)
630 : {
631 11 : if (!do_one(lock, 0, &ctx.frame_))
632 5 : break;
633 6 : if (n != (std::numeric_limits<std::size_t>::max)())
634 6 : ++n;
635 6 : if (!lock.owns_lock())
636 6 : lock.lock();
637 : }
638 5 : return n;
639 5 : }
640 :
641 : inline std::size_t
642 4 : reactor_scheduler::poll_one()
643 : {
644 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
645 : {
646 2 : stop();
647 2 : return 0;
648 : }
649 :
650 2 : reactor_thread_context_guard ctx(this);
651 2 : lock_type lock(mutex_);
652 2 : return do_one(lock, 0, &ctx.frame_);
653 2 : }
654 :
655 : inline void
656 24488 : reactor_scheduler::work_started() noexcept
657 : {
658 24488 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
659 24488 : }
660 :
661 : inline void
662 34569 : reactor_scheduler::work_finished() noexcept
663 : {
664 69138 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
665 368 : stop();
666 34569 : }
667 :
668 : inline void
669 244631 : reactor_scheduler::compensating_work_started() const noexcept
670 : {
671 244631 : auto* ctx = reactor_find_context(this);
672 244631 : if (ctx)
673 244631 : ++ctx->private_outstanding_work;
674 244631 : }
675 :
676 : inline void
677 MIS 0 : reactor_scheduler::drain_thread_queue(
678 : op_queue& queue, std::int64_t count) const
679 : {
680 0 : if (count > 0)
681 0 : outstanding_work_.fetch_add(count, std::memory_order_relaxed);
682 :
683 0 : lock_type lock(mutex_);
684 0 : completed_ops_.splice(queue);
685 0 : if (count > 0)
686 0 : maybe_unlock_and_signal_one(lock);
687 0 : }
688 :
689 : inline void
690 HIT 14826 : reactor_scheduler::post_deferred_completions(op_queue& ops) const
691 : {
692 14826 : if (ops.empty())
693 14826 : return;
694 :
695 MIS 0 : if (auto* ctx = reactor_find_context(this))
696 : {
697 0 : ctx->private_queue.splice(ops);
698 0 : return;
699 : }
700 :
701 0 : lock_type lock(mutex_);
702 0 : completed_ops_.splice(ops);
703 0 : wake_one_thread_and_unlock(lock);
704 0 : }
705 :
706 : inline void
707 HIT 517 : reactor_scheduler::shutdown_drain()
708 : {
709 517 : lock_type lock(mutex_);
710 :
711 1120 : while (auto* h = completed_ops_.pop())
712 : {
713 603 : if (h == &task_op_)
714 517 : continue;
715 86 : lock.unlock();
716 86 : h->destroy();
717 86 : lock.lock();
718 603 : }
719 :
720 517 : signal_all(lock);
721 517 : }
722 :
723 : inline void
724 893 : reactor_scheduler::signal_all(lock_type&) const
725 : {
726 893 : state_ |= signaled_bit;
727 893 : cond_.notify_all();
728 893 : }
729 :
730 : inline bool
731 2283 : reactor_scheduler::maybe_unlock_and_signal_one(
732 : lock_type& lock) const
733 : {
734 2283 : state_ |= signaled_bit;
735 2283 : if (state_ > signaled_bit)
736 : {
737 MIS 0 : lock.unlock();
738 0 : cond_.notify_one();
739 0 : return true;
740 : }
741 HIT 2283 : return false;
742 : }
743 :
744 : inline bool
745 488867 : reactor_scheduler::unlock_and_signal_one(
746 : lock_type& lock) const
747 : {
748 488867 : state_ |= signaled_bit;
749 488867 : bool have_waiters = state_ > signaled_bit;
750 488867 : lock.unlock();
751 488867 : if (have_waiters)
752 MIS 0 : cond_.notify_one();
753 HIT 488867 : return have_waiters;
754 : }
755 :
756 : inline void
757 2 : reactor_scheduler::clear_signal() const
758 : {
759 2 : state_ &= ~signaled_bit;
760 2 : }
761 :
762 : inline void
763 2 : reactor_scheduler::wait_for_signal(
764 : lock_type& lock) const
765 : {
766 4 : while ((state_ & signaled_bit) == 0)
767 : {
768 2 : state_ += waiter_increment;
769 2 : cond_.wait(lock);
770 2 : state_ -= waiter_increment;
771 : }
772 2 : }
773 :
774 : inline void
775 MIS 0 : reactor_scheduler::wait_for_signal_for(
776 : lock_type& lock, long timeout_us) const
777 : {
778 0 : if ((state_ & signaled_bit) == 0)
779 : {
780 0 : state_ += waiter_increment;
781 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
782 0 : state_ -= waiter_increment;
783 : }
784 0 : }
785 :
786 : inline void
787 HIT 2283 : reactor_scheduler::wake_one_thread_and_unlock(
788 : lock_type& lock) const
789 : {
790 2283 : if (maybe_unlock_and_signal_one(lock))
791 MIS 0 : return;
792 :
793 HIT 2283 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
794 : {
795 57 : task_interrupted_ = true;
796 57 : lock.unlock();
797 57 : interrupt_reactor();
798 : }
799 : else
800 : {
801 2226 : lock.unlock();
802 : }
803 : }
804 :
805 415026 : inline reactor_scheduler::work_cleanup::~work_cleanup()
806 : {
807 415026 : if (ctx)
808 : {
809 415026 : std::int64_t produced = ctx->private_outstanding_work;
810 415026 : if (produced > 1)
811 15 : sched->outstanding_work_.fetch_add(
812 : produced - 1, std::memory_order_relaxed);
813 415011 : else if (produced < 1)
814 24907 : sched->work_finished();
815 415026 : ctx->private_outstanding_work = 0;
816 :
817 415026 : if (!ctx->private_queue.empty())
818 : {
819 145488 : lock->lock();
820 145488 : sched->completed_ops_.splice(ctx->private_queue);
821 : }
822 : }
823 : else
824 : {
825 MIS 0 : sched->work_finished();
826 : }
827 HIT 415026 : }
828 :
829 554034 : inline reactor_scheduler::task_cleanup::~task_cleanup()
830 : {
831 277017 : if (!ctx)
832 MIS 0 : return;
833 :
834 HIT 277017 : if (ctx->private_outstanding_work > 0)
835 : {
836 7788 : sched->outstanding_work_.fetch_add(
837 7788 : ctx->private_outstanding_work, std::memory_order_relaxed);
838 7788 : ctx->private_outstanding_work = 0;
839 : }
840 :
841 277017 : if (!ctx->private_queue.empty())
842 : {
843 7788 : if (!lock->owns_lock())
844 MIS 0 : lock->lock();
845 HIT 7788 : sched->completed_ops_.splice(ctx->private_queue);
846 : }
847 277017 : }
848 :
849 : inline std::size_t
850 415454 : reactor_scheduler::do_one(
851 : lock_type& lock, long timeout_us, context_type* ctx)
852 : {
853 : for (;;)
854 : {
855 692432 : if (stopped_.load(std::memory_order_acquire))
856 383 : return 0;
857 :
858 692049 : scheduler_op* op = completed_ops_.pop();
859 :
860 : // Handle reactor sentinel — time to poll for I/O
861 692049 : if (op == &task_op_)
862 : {
863 : bool more_handlers =
864 277021 : !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
865 :
866 480201 : if (!more_handlers &&
867 406360 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
868 : timeout_us == 0))
869 : {
870 4 : completed_ops_.push(&task_op_);
871 4 : return 0;
872 : }
873 :
874 277017 : long task_timeout_us = more_handlers ? 0 : timeout_us;
875 277017 : task_interrupted_ = task_timeout_us == 0;
876 277017 : task_running_.store(true, std::memory_order_release);
877 :
878 277017 : if (more_handlers)
879 73841 : unlock_and_signal_one(lock);
880 :
881 : try
882 : {
883 277017 : run_task(lock, ctx, task_timeout_us);
884 : }
885 MIS 0 : catch (...)
886 : {
887 0 : task_running_.store(false, std::memory_order_relaxed);
888 0 : throw;
889 0 : }
890 :
891 HIT 277017 : task_running_.store(false, std::memory_order_relaxed);
892 277017 : completed_ops_.push(&task_op_);
893 277017 : if (timeout_us > 0)
894 41 : return 0;
895 276976 : continue;
896 276976 : }
897 :
898 : // Handle operation
899 415028 : if (op != nullptr)
900 : {
901 415026 : bool more = !completed_ops_.empty();
902 :
903 415026 : if (more)
904 415026 : ctx->unassisted = !unlock_and_signal_one(lock);
905 : else
906 : {
907 MIS 0 : ctx->unassisted = false;
908 0 : lock.unlock();
909 : }
910 :
911 HIT 415026 : work_cleanup on_exit{this, &lock, ctx};
912 : (void)on_exit;
913 :
914 415026 : (*op)();
915 415026 : return 1;
916 415026 : }
917 :
918 : // Try private queue before blocking
919 2 : if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
920 MIS 0 : continue;
921 :
922 HIT 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
923 : timeout_us == 0)
924 MIS 0 : return 0;
925 :
926 HIT 2 : clear_signal();
927 2 : if (timeout_us < 0)
928 2 : wait_for_signal(lock);
929 : else
930 MIS 0 : wait_for_signal_for(lock, timeout_us);
931 HIT 276978 : }
932 : }
933 :
934 : } // namespace boost::corosio::detail
935 :
936 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
|