TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/corosio
9 : //
10 :
11 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13 :
14 : #include <boost/corosio/timer.hpp>
15 : #include <boost/corosio/io_context.hpp>
16 : #include <boost/corosio/detail/scheduler_op.hpp>
17 : #include <boost/corosio/detail/intrusive.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 : #include <boost/capy/error.hpp>
20 : #include <boost/capy/ex/execution_context.hpp>
21 : #include <boost/capy/ex/executor_ref.hpp>
22 : #include <system_error>
23 :
24 : #include <atomic>
25 : #include <chrono>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <limits>
29 : #include <mutex>
30 : #include <optional>
31 : #include <stop_token>
32 : #include <utility>
33 : #include <vector>
34 :
35 : namespace boost::corosio::detail {
36 :
37 : struct scheduler;
38 :
39 : /*
40 : Timer Service
41 : =============
42 :
43 : Data Structures
44 : ---------------
45 : waiter_node holds per-waiter state: coroutine handle, executor,
46 : error output, stop_token, embedded completion_op. Each concurrent
47 : co_await t.wait() allocates one waiter_node.
48 :
49 : timer_service::implementation holds per-timer state: expiry,
50 : heap index, and an intrusive_list of waiter_nodes. Multiple
51 : coroutines can wait on the same timer simultaneously.
52 :
53 : timer_service owns a min-heap of active timers, a free list
54 : of recycled impls, and a free list of recycled waiter_nodes. The
55 : heap is ordered by expiry time; the scheduler queries
56 : nearest_expiry() to set the epoll/timerfd timeout.
57 :
58 : Optimization Strategy
59 : ---------------------
60 : 1. Deferred heap insertion — expires_after() stores the expiry
61 : but does not insert into the heap. Insertion happens in wait().
62 : 2. Thread-local impl cache — single-slot per-thread cache.
63 : 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64 : 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65 : 5. might_have_pending_waits_ flag — skips lock when no wait issued.
66 : 6. Thread-local waiter cache — single-slot per-thread cache.
67 :
68 : Concurrency
69 : -----------
70 : stop_token callbacks can fire from any thread. The impl_
71 : pointer on waiter_node is used as a "still in list" marker.
72 : */
73 :
74 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75 :
76 : inline void timer_service_invalidate_cache() noexcept;
77 :
78 : // timer_service class body — member function definitions are
79 : // out-of-class (after implementation and waiter_node are complete)
80 : class BOOST_COROSIO_DECL timer_service final
81 : : public capy::execution_context::service
82 : , public io_object::io_service
83 : {
84 : public:
85 : using clock_type = std::chrono::steady_clock;
86 : using time_point = clock_type::time_point;
87 :
88 : /// Type-erased callback for earliest-expiry-changed notifications.
89 : class callback
90 : {
91 : void* ctx_ = nullptr;
92 : void (*fn_)(void*) = nullptr;
93 :
94 : public:
95 : /// Construct an empty callback.
96 HIT 517 : callback() = default;
97 :
98 : /// Construct a callback with the given context and function.
99 517 : callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100 :
101 : /// Return true if the callback is non-empty.
102 : explicit operator bool() const noexcept
103 : {
104 : return fn_ != nullptr;
105 : }
106 :
107 : /// Invoke the callback.
108 7827 : void operator()() const
109 : {
110 7827 : if (fn_)
111 7827 : fn_(ctx_);
112 7827 : }
113 : };
114 :
115 : struct implementation;
116 :
117 : private:
118 : struct heap_entry
119 : {
120 : time_point time_;
121 : implementation* timer_;
122 : };
123 :
124 : scheduler* sched_ = nullptr;
125 : mutable std::mutex mutex_;
126 : std::vector<heap_entry> heap_;
127 : implementation* free_list_ = nullptr;
128 : waiter_node* waiter_free_list_ = nullptr;
129 : callback on_earliest_changed_;
130 : bool shutting_down_ = false;
131 : // Avoids mutex in nearest_expiry() and empty()
132 : mutable std::atomic<std::int64_t> cached_nearest_ns_{
133 : (std::numeric_limits<std::int64_t>::max)()};
134 :
135 : public:
136 : /// Construct the timer service bound to a scheduler.
137 517 : inline timer_service(capy::execution_context&, scheduler& sched)
138 517 : : sched_(&sched)
139 : {
140 517 : }
141 :
142 : /// Return the associated scheduler.
143 15740 : inline scheduler& get_scheduler() noexcept
144 : {
145 15740 : return *sched_;
146 : }
147 :
148 : /// Destroy the timer service.
149 1034 : ~timer_service() override = default;
150 :
151 : timer_service(timer_service const&) = delete;
152 : timer_service& operator=(timer_service const&) = delete;
153 :
154 : /// Register a callback invoked when the earliest expiry changes.
155 517 : inline void set_on_earliest_changed(callback cb)
156 : {
157 517 : on_earliest_changed_ = cb;
158 517 : }
159 :
160 : /// Return true if no timers are in the heap.
161 : inline bool empty() const noexcept
162 : {
163 : return cached_nearest_ns_.load(std::memory_order_acquire) ==
164 : (std::numeric_limits<std::int64_t>::max)();
165 : }
166 :
167 : /// Return the nearest timer expiry without acquiring the mutex.
168 199099 : inline time_point nearest_expiry() const noexcept
169 : {
170 199099 : auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
171 199099 : return time_point(time_point::duration(ns));
172 : }
173 :
174 : /// Cancel all pending timers and free cached resources.
175 : inline void shutdown() override;
176 :
177 : /// Construct a new timer implementation.
178 : inline io_object::implementation* construct() override;
179 :
180 : /// Destroy a timer implementation, cancelling pending waiters.
181 : inline void destroy(io_object::implementation* p) override;
182 :
183 : /// Cancel and recycle a timer implementation.
184 : inline void destroy_impl(implementation& impl);
185 :
186 : /// Create or recycle a waiter node.
187 : inline waiter_node* create_waiter();
188 :
189 : /// Return a waiter node to the cache or free list.
190 : inline void destroy_waiter(waiter_node* w);
191 :
192 : /// Update the timer expiry, cancelling existing waiters.
193 : inline std::size_t update_timer(implementation& impl, time_point new_time);
194 :
195 : /// Insert a waiter into the timer's waiter list and the heap.
196 : inline void insert_waiter(implementation& impl, waiter_node* w);
197 :
198 : /// Cancel all waiters on a timer.
199 : inline std::size_t cancel_timer(implementation& impl);
200 :
201 : /// Cancel a single waiter ( stop_token callback path ).
202 : inline void cancel_waiter(waiter_node* w);
203 :
204 : /// Cancel one waiter on a timer.
205 : inline std::size_t cancel_one_waiter(implementation& impl);
206 :
207 : /// Complete all waiters whose timers have expired.
208 : inline std::size_t process_expired();
209 :
210 : private:
211 237072 : inline void refresh_cached_nearest() noexcept
212 : {
213 237072 : auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
214 236584 : : heap_[0].time_.time_since_epoch().count();
215 237072 : cached_nearest_ns_.store(ns, std::memory_order_release);
216 237072 : }
217 :
218 : inline void remove_timer_impl(implementation& impl);
219 : inline void up_heap(std::size_t index);
220 : inline void down_heap(std::size_t index);
221 : inline void swap_heap(std::size_t i1, std::size_t i2);
222 : };
223 :
224 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
225 : : intrusive_list<waiter_node>::node
226 : {
227 : // Embedded completion op — avoids heap allocation per fire/cancel
228 : struct completion_op final : scheduler_op
229 : {
230 : waiter_node* waiter_ = nullptr;
231 :
232 : static void do_complete(
233 : void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
234 :
235 210 : completion_op() noexcept : scheduler_op(&do_complete) {}
236 :
237 : void operator()() override;
238 : void destroy() override;
239 : };
240 :
241 : // Per-waiter stop_token cancellation
242 : struct canceller
243 : {
244 : waiter_node* waiter_;
245 : void operator()() const;
246 : };
247 :
248 : // nullptr once removed from timer's waiter list (concurrency marker)
249 : timer_service::implementation* impl_ = nullptr;
250 : timer_service* svc_ = nullptr;
251 : std::coroutine_handle<> h_;
252 : capy::continuation* cont_ = nullptr;
253 : capy::executor_ref d_;
254 : std::error_code* ec_out_ = nullptr;
255 : std::stop_token token_;
256 : std::optional<std::stop_callback<canceller>> stop_cb_;
257 : completion_op op_;
258 : std::error_code ec_value_;
259 : waiter_node* next_free_ = nullptr;
260 :
261 210 : waiter_node() noexcept
262 210 : {
263 210 : op_.waiter_ = this;
264 210 : }
265 : };
266 :
267 : struct timer_service::implementation final : timer::implementation
268 : {
269 : using clock_type = std::chrono::steady_clock;
270 : using time_point = clock_type::time_point;
271 : using duration = clock_type::duration;
272 :
273 : timer_service* svc_ = nullptr;
274 : intrusive_list<waiter_node> waiters_;
275 :
276 : // Free list linkage (reused when impl is on free_list)
277 : implementation* next_free_ = nullptr;
278 :
279 : inline explicit implementation(timer_service& svc) noexcept;
280 :
281 : inline std::coroutine_handle<> wait(
282 : std::coroutine_handle<>,
283 : capy::executor_ref,
284 : std::stop_token,
285 : std::error_code*,
286 : capy::continuation*) override;
287 : };
288 :
289 : // Thread-local caches avoid hot-path mutex acquisitions:
290 : // 1. Impl cache — single-slot, validated by comparing svc_
291 : // 2. Waiter cache — single-slot, no service affinity
292 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
293 :
294 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
295 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
296 :
297 : inline timer_service::implementation*
298 8088 : try_pop_tl_cache(timer_service* svc) noexcept
299 : {
300 8088 : auto* impl = tl_cached_impl.get();
301 8088 : if (impl)
302 : {
303 7839 : tl_cached_impl.set(nullptr);
304 7839 : if (impl->svc_ == svc)
305 7839 : return impl;
306 : // Stale impl from a destroyed service
307 MIS 0 : delete impl;
308 : }
309 HIT 249 : return nullptr;
310 : }
311 :
312 : inline bool
313 8080 : try_push_tl_cache(timer_service::implementation* impl) noexcept
314 : {
315 8080 : if (!tl_cached_impl.get())
316 : {
317 8000 : tl_cached_impl.set(impl);
318 8000 : return true;
319 : }
320 80 : return false;
321 : }
322 :
323 : inline waiter_node*
324 7874 : try_pop_waiter_tl_cache() noexcept
325 : {
326 7874 : auto* w = tl_cached_waiter.get();
327 7874 : if (w)
328 : {
329 7662 : tl_cached_waiter.set(nullptr);
330 7662 : return w;
331 : }
332 212 : return nullptr;
333 : }
334 :
335 : inline bool
336 7858 : try_push_waiter_tl_cache(waiter_node* w) noexcept
337 : {
338 7858 : if (!tl_cached_waiter.get())
339 : {
340 7778 : tl_cached_waiter.set(w);
341 7778 : return true;
342 : }
343 80 : return false;
344 : }
345 :
346 : inline void
347 517 : timer_service_invalidate_cache() noexcept
348 : {
349 517 : delete tl_cached_impl.get();
350 517 : tl_cached_impl.set(nullptr);
351 :
352 517 : delete tl_cached_waiter.get();
353 517 : tl_cached_waiter.set(nullptr);
354 517 : }
355 :
356 : // timer_service out-of-class member function definitions
357 :
358 249 : inline timer_service::implementation::implementation(
359 249 : timer_service& svc) noexcept
360 249 : : svc_(&svc)
361 : {
362 249 : }
363 :
364 : inline void
365 517 : timer_service::shutdown()
366 : {
367 517 : timer_service_invalidate_cache();
368 517 : shutting_down_ = true;
369 :
370 : // Snapshot impls and detach them from the heap so that
371 : // coroutine-owned timer destructors (triggered by h.destroy()
372 : // below) cannot re-enter remove_timer_impl() and mutate the
373 : // vector during iteration.
374 517 : std::vector<implementation*> impls;
375 517 : impls.reserve(heap_.size());
376 525 : for (auto& entry : heap_)
377 : {
378 8 : entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
379 8 : impls.push_back(entry.timer_);
380 : }
381 517 : heap_.clear();
382 517 : cached_nearest_ns_.store(
383 : (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
384 :
385 : // Cancel waiting timers. Each waiter called work_started()
386 : // in implementation::wait(). On IOCP the scheduler shutdown
387 : // loop exits when outstanding_work_ reaches zero, so we must
388 : // call work_finished() here to balance it. On other backends
389 : // this is harmless.
390 525 : for (auto* impl : impls)
391 : {
392 16 : while (auto* w = impl->waiters_.pop_front())
393 : {
394 8 : w->stop_cb_.reset();
395 8 : auto h = std::exchange(w->h_, {});
396 8 : sched_->work_finished();
397 8 : if (h)
398 8 : h.destroy();
399 8 : delete w;
400 8 : }
401 8 : delete impl;
402 : }
403 :
404 : // Delete free-listed impls
405 597 : while (free_list_)
406 : {
407 80 : auto* next = free_list_->next_free_;
408 80 : delete free_list_;
409 80 : free_list_ = next;
410 : }
411 :
412 : // Delete free-listed waiters
413 595 : while (waiter_free_list_)
414 : {
415 78 : auto* next = waiter_free_list_->next_free_;
416 78 : delete waiter_free_list_;
417 78 : waiter_free_list_ = next;
418 : }
419 517 : }
420 :
421 : inline io_object::implementation*
422 8088 : timer_service::construct()
423 : {
424 8088 : implementation* impl = try_pop_tl_cache(this);
425 8088 : if (impl)
426 : {
427 7839 : impl->svc_ = this;
428 7839 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
429 7839 : impl->might_have_pending_waits_ = false;
430 7839 : return impl;
431 : }
432 :
433 249 : std::lock_guard lock(mutex_);
434 249 : if (free_list_)
435 : {
436 MIS 0 : impl = free_list_;
437 0 : free_list_ = impl->next_free_;
438 0 : impl->next_free_ = nullptr;
439 0 : impl->svc_ = this;
440 0 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
441 0 : impl->might_have_pending_waits_ = false;
442 : }
443 : else
444 : {
445 HIT 249 : impl = new implementation(*this);
446 : }
447 249 : return impl;
448 249 : }
449 :
450 : inline void
451 8086 : timer_service::destroy(io_object::implementation* p)
452 : {
453 8086 : destroy_impl(static_cast<implementation&>(*p));
454 8086 : }
455 :
456 : inline void
457 8086 : timer_service::destroy_impl(implementation& impl)
458 : {
459 : // During shutdown the impl is owned by the shutdown loop.
460 : // Re-entering here (from a coroutine-owned timer destructor
461 : // triggered by h.destroy()) must not modify the heap or
462 : // recycle the impl — shutdown deletes it directly.
463 8086 : if (shutting_down_)
464 8006 : return;
465 :
466 8080 : cancel_timer(impl);
467 :
468 8080 : if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
469 : {
470 MIS 0 : std::lock_guard lock(mutex_);
471 0 : remove_timer_impl(impl);
472 0 : refresh_cached_nearest();
473 0 : }
474 :
475 HIT 8080 : if (try_push_tl_cache(&impl))
476 8000 : return;
477 :
478 80 : std::lock_guard lock(mutex_);
479 80 : impl.next_free_ = free_list_;
480 80 : free_list_ = &impl;
481 80 : }
482 :
483 : inline waiter_node*
484 7874 : timer_service::create_waiter()
485 : {
486 7874 : if (auto* w = try_pop_waiter_tl_cache())
487 7662 : return w;
488 :
489 212 : std::lock_guard lock(mutex_);
490 212 : if (waiter_free_list_)
491 : {
492 2 : auto* w = waiter_free_list_;
493 2 : waiter_free_list_ = w->next_free_;
494 2 : w->next_free_ = nullptr;
495 2 : return w;
496 : }
497 :
498 210 : return new waiter_node();
499 212 : }
500 :
501 : inline void
502 7858 : timer_service::destroy_waiter(waiter_node* w)
503 : {
504 7858 : if (try_push_waiter_tl_cache(w))
505 7778 : return;
506 :
507 80 : std::lock_guard lock(mutex_);
508 80 : w->next_free_ = waiter_free_list_;
509 80 : waiter_free_list_ = w;
510 80 : }
511 :
512 : inline std::size_t
513 6 : timer_service::update_timer(implementation& impl, time_point new_time)
514 : {
515 : bool in_heap =
516 6 : (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
517 6 : if (!in_heap && impl.waiters_.empty())
518 MIS 0 : return 0;
519 :
520 HIT 6 : bool notify = false;
521 6 : intrusive_list<waiter_node> canceled;
522 :
523 : {
524 6 : std::lock_guard lock(mutex_);
525 :
526 16 : while (auto* w = impl.waiters_.pop_front())
527 : {
528 10 : w->impl_ = nullptr;
529 10 : canceled.push_back(w);
530 10 : }
531 :
532 6 : if (impl.heap_index_ < heap_.size())
533 : {
534 6 : time_point old_time = heap_[impl.heap_index_].time_;
535 6 : heap_[impl.heap_index_].time_ = new_time;
536 :
537 6 : if (new_time < old_time)
538 6 : up_heap(impl.heap_index_);
539 : else
540 MIS 0 : down_heap(impl.heap_index_);
541 :
542 HIT 6 : notify = (impl.heap_index_ == 0);
543 : }
544 :
545 6 : refresh_cached_nearest();
546 6 : }
547 :
548 6 : std::size_t count = 0;
549 16 : while (auto* w = canceled.pop_front())
550 : {
551 10 : w->ec_value_ = make_error_code(capy::error::canceled);
552 10 : sched_->post(&w->op_);
553 10 : ++count;
554 10 : }
555 :
556 6 : if (notify)
557 6 : on_earliest_changed_();
558 :
559 6 : return count;
560 : }
561 :
562 : inline void
563 7874 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
564 : {
565 7874 : bool notify = false;
566 : {
567 7874 : std::lock_guard lock(mutex_);
568 7874 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
569 : {
570 7852 : impl.heap_index_ = heap_.size();
571 7852 : heap_.push_back({impl.expiry_, &impl});
572 7852 : up_heap(heap_.size() - 1);
573 7852 : notify = (impl.heap_index_ == 0);
574 7852 : refresh_cached_nearest();
575 : }
576 7874 : impl.waiters_.push_back(w);
577 7874 : }
578 7874 : if (notify)
579 7821 : on_earliest_changed_();
580 7874 : }
581 :
582 : inline std::size_t
583 8088 : timer_service::cancel_timer(implementation& impl)
584 : {
585 8088 : if (!impl.might_have_pending_waits_)
586 8064 : return 0;
587 :
588 : // Not in heap and no waiters — just clear the flag
589 24 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
590 MIS 0 : impl.waiters_.empty())
591 : {
592 0 : impl.might_have_pending_waits_ = false;
593 0 : return 0;
594 : }
595 :
596 HIT 24 : intrusive_list<waiter_node> canceled;
597 :
598 : {
599 24 : std::lock_guard lock(mutex_);
600 24 : remove_timer_impl(impl);
601 52 : while (auto* w = impl.waiters_.pop_front())
602 : {
603 28 : w->impl_ = nullptr;
604 28 : canceled.push_back(w);
605 28 : }
606 24 : refresh_cached_nearest();
607 24 : }
608 :
609 24 : impl.might_have_pending_waits_ = false;
610 :
611 24 : std::size_t count = 0;
612 52 : while (auto* w = canceled.pop_front())
613 : {
614 28 : w->ec_value_ = make_error_code(capy::error::canceled);
615 28 : sched_->post(&w->op_);
616 28 : ++count;
617 28 : }
618 :
619 24 : return count;
620 : }
621 :
622 : inline void
623 30 : timer_service::cancel_waiter(waiter_node* w)
624 : {
625 : {
626 30 : std::lock_guard lock(mutex_);
627 : // Already removed by cancel_timer or process_expired
628 30 : if (!w->impl_)
629 MIS 0 : return;
630 HIT 30 : auto* impl = w->impl_;
631 30 : w->impl_ = nullptr;
632 30 : impl->waiters_.remove(w);
633 30 : if (impl->waiters_.empty())
634 : {
635 28 : remove_timer_impl(*impl);
636 28 : impl->might_have_pending_waits_ = false;
637 : }
638 30 : refresh_cached_nearest();
639 30 : }
640 :
641 30 : w->ec_value_ = make_error_code(capy::error::canceled);
642 30 : sched_->post(&w->op_);
643 : }
644 :
645 : inline std::size_t
646 2 : timer_service::cancel_one_waiter(implementation& impl)
647 : {
648 2 : if (!impl.might_have_pending_waits_)
649 MIS 0 : return 0;
650 :
651 HIT 2 : waiter_node* w = nullptr;
652 :
653 : {
654 2 : std::lock_guard lock(mutex_);
655 2 : w = impl.waiters_.pop_front();
656 2 : if (!w)
657 MIS 0 : return 0;
658 HIT 2 : w->impl_ = nullptr;
659 2 : if (impl.waiters_.empty())
660 : {
661 MIS 0 : remove_timer_impl(impl);
662 0 : impl.might_have_pending_waits_ = false;
663 : }
664 HIT 2 : refresh_cached_nearest();
665 2 : }
666 :
667 2 : w->ec_value_ = make_error_code(capy::error::canceled);
668 2 : sched_->post(&w->op_);
669 2 : return 1;
670 : }
671 :
672 : inline std::size_t
673 229158 : timer_service::process_expired()
674 : {
675 229158 : intrusive_list<waiter_node> expired;
676 :
677 : {
678 229158 : std::lock_guard lock(mutex_);
679 229158 : auto now = clock_type::now();
680 :
681 236950 : while (!heap_.empty() && heap_[0].time_ <= now)
682 : {
683 7792 : implementation* t = heap_[0].timer_;
684 7792 : remove_timer_impl(*t);
685 15588 : while (auto* w = t->waiters_.pop_front())
686 : {
687 7796 : w->impl_ = nullptr;
688 7796 : w->ec_value_ = {};
689 7796 : expired.push_back(w);
690 7796 : }
691 7792 : t->might_have_pending_waits_ = false;
692 : }
693 :
694 229158 : refresh_cached_nearest();
695 229158 : }
696 :
697 229158 : std::size_t count = 0;
698 236954 : while (auto* w = expired.pop_front())
699 : {
700 7796 : sched_->post(&w->op_);
701 7796 : ++count;
702 7796 : }
703 :
704 229158 : return count;
705 : }
706 :
707 : inline void
708 7844 : timer_service::remove_timer_impl(implementation& impl)
709 : {
710 7844 : std::size_t index = impl.heap_index_;
711 7844 : if (index >= heap_.size())
712 MIS 0 : return; // Not in heap
713 :
714 HIT 7844 : if (index == heap_.size() - 1)
715 : {
716 : // Last element, just pop
717 152 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
718 152 : heap_.pop_back();
719 : }
720 : else
721 : {
722 : // Swap with last and reheapify
723 7692 : swap_heap(index, heap_.size() - 1);
724 7692 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
725 7692 : heap_.pop_back();
726 :
727 7692 : if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
728 MIS 0 : up_heap(index);
729 : else
730 HIT 7692 : down_heap(index);
731 : }
732 : }
733 :
734 : inline void
735 7858 : timer_service::up_heap(std::size_t index)
736 : {
737 15529 : while (index > 0)
738 : {
739 7702 : std::size_t parent = (index - 1) / 2;
740 7702 : if (!(heap_[index].time_ < heap_[parent].time_))
741 31 : break;
742 7671 : swap_heap(index, parent);
743 7671 : index = parent;
744 : }
745 7858 : }
746 :
747 : inline void
748 7692 : timer_service::down_heap(std::size_t index)
749 : {
750 7692 : std::size_t child = index * 2 + 1;
751 7692 : while (child < heap_.size())
752 : {
753 6 : std::size_t min_child = (child + 1 == heap_.size() ||
754 MIS 0 : heap_[child].time_ < heap_[child + 1].time_)
755 HIT 6 : ? child
756 6 : : child + 1;
757 :
758 6 : if (heap_[index].time_ < heap_[min_child].time_)
759 6 : break;
760 :
761 MIS 0 : swap_heap(index, min_child);
762 0 : index = min_child;
763 0 : child = index * 2 + 1;
764 : }
765 HIT 7692 : }
766 :
767 : inline void
768 15363 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
769 : {
770 15363 : heap_entry tmp = heap_[i1];
771 15363 : heap_[i1] = heap_[i2];
772 15363 : heap_[i2] = tmp;
773 15363 : heap_[i1].timer_->heap_index_ = i1;
774 15363 : heap_[i2].timer_->heap_index_ = i2;
775 15363 : }
776 :
777 : // waiter_node out-of-class member function definitions
778 :
779 : inline void
780 30 : waiter_node::canceller::operator()() const
781 : {
782 30 : waiter_->svc_->cancel_waiter(waiter_);
783 30 : }
784 :
785 : inline void
786 MIS 0 : waiter_node::completion_op::do_complete(
787 : [[maybe_unused]] void* owner,
788 : scheduler_op* base,
789 : std::uint32_t,
790 : std::uint32_t)
791 : {
792 : // owner is always non-null here. The destroy path (owner == nullptr)
793 : // is unreachable because completion_op overrides destroy() directly,
794 : // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
795 0 : BOOST_COROSIO_ASSERT(owner);
796 0 : static_cast<completion_op*>(base)->operator()();
797 0 : }
798 :
799 : inline void
800 HIT 7858 : waiter_node::completion_op::operator()()
801 : {
802 7858 : auto* w = waiter_;
803 7858 : w->stop_cb_.reset();
804 7858 : if (w->ec_out_)
805 7858 : *w->ec_out_ = w->ec_value_;
806 :
807 7858 : auto* cont = w->cont_;
808 7858 : auto d = w->d_;
809 7858 : auto* svc = w->svc_;
810 7858 : auto& sched = svc->get_scheduler();
811 :
812 7858 : svc->destroy_waiter(w);
813 :
814 7858 : d.post(*cont);
815 7858 : sched.work_finished();
816 7858 : }
817 :
818 : // GCC 14 false-positive: inlining ~optional<stop_callback> through
819 : // delete loses track that stop_cb_ was already .reset() above.
820 : #if defined(__GNUC__) && !defined(__clang__)
821 : #pragma GCC diagnostic push
822 : #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
823 : #endif
824 : inline void
825 8 : waiter_node::completion_op::destroy()
826 : {
827 : // Called during scheduler shutdown drain when this completion_op is
828 : // in the scheduler's ready queue (posted by cancel_timer() or
829 : // process_expired()). Balances the work_started() from
830 : // implementation::wait(). The scheduler drain loop separately
831 : // balances the work_started() from post(). On IOCP both decrements
832 : // are required for outstanding_work_ to reach zero; on other
833 : // backends this is harmless.
834 : //
835 : // This override also prevents scheduler_op::destroy() from calling
836 : // do_complete(nullptr, ...). See also: timer_service::shutdown()
837 : // which drains waiters still in the timer heap (the other path).
838 8 : auto* w = waiter_;
839 8 : w->stop_cb_.reset();
840 8 : auto h = std::exchange(w->h_, {});
841 8 : auto& sched = w->svc_->get_scheduler();
842 8 : delete w;
843 8 : sched.work_finished();
844 8 : if (h)
845 8 : h.destroy();
846 8 : }
847 : #if defined(__GNUC__) && !defined(__clang__)
848 : #pragma GCC diagnostic pop
849 : #endif
850 :
851 : inline std::coroutine_handle<>
852 7875 : timer_service::implementation::wait(
853 : std::coroutine_handle<> h,
854 : capy::executor_ref d,
855 : std::stop_token token,
856 : std::error_code* ec,
857 : capy::continuation* cont)
858 : {
859 : // Already-expired fast path — no waiter_node, no mutex.
860 : // Post instead of dispatch so the coroutine yields to the
861 : // scheduler, allowing other queued work to run.
862 7875 : if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
863 : {
864 7853 : if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
865 : {
866 1 : if (ec)
867 1 : *ec = {};
868 1 : d.post(*cont);
869 1 : return std::noop_coroutine();
870 : }
871 : }
872 :
873 7874 : auto* w = svc_->create_waiter();
874 7874 : w->impl_ = this;
875 7874 : w->svc_ = svc_;
876 7874 : w->h_ = h;
877 7874 : w->cont_ = cont;
878 7874 : w->d_ = d;
879 7874 : w->token_ = std::move(token);
880 7874 : w->ec_out_ = ec;
881 :
882 7874 : svc_->insert_waiter(*this, w);
883 7874 : might_have_pending_waits_ = true;
884 7874 : svc_->get_scheduler().work_started();
885 :
886 7874 : if (w->token_.stop_possible())
887 48 : w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
888 :
889 7874 : return std::noop_coroutine();
890 : }
891 :
892 : // Free functions
893 :
894 : struct timer_service_access
895 : {
896 8088 : static timer_service& get_timer(io_context& ctx) noexcept
897 : {
898 8088 : return *ctx.timer_svc_;
899 : }
900 :
901 517 : static void set_timer(io_context& ctx, timer_service& svc) noexcept
902 : {
903 517 : ctx.timer_svc_ = &svc;
904 517 : }
905 : };
906 :
907 : // Bypass find_service() mutex by reading io_context's cached pointer
908 : inline io_object::io_service&
909 8088 : timer_service_direct(capy::execution_context& ctx) noexcept
910 : {
911 8088 : return timer_service_access::get_timer(static_cast<io_context&>(ctx));
912 : }
913 :
914 : inline std::size_t
915 6 : timer_service_update_expiry(timer::implementation& base)
916 : {
917 6 : auto& impl = static_cast<timer_service::implementation&>(base);
918 6 : return impl.svc_->update_timer(impl, impl.expiry_);
919 : }
920 :
921 : inline std::size_t
922 8 : timer_service_cancel(timer::implementation& base) noexcept
923 : {
924 8 : auto& impl = static_cast<timer_service::implementation&>(base);
925 8 : return impl.svc_->cancel_timer(impl);
926 : }
927 :
928 : inline std::size_t
929 2 : timer_service_cancel_one(timer::implementation& base) noexcept
930 : {
931 2 : auto& impl = static_cast<timer_service::implementation&>(base);
932 2 : return impl.svc_->cancel_one_waiter(impl);
933 : }
934 :
935 : inline timer_service&
936 517 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
937 : {
938 517 : auto& svc = ctx.make_service<timer_service>(sched);
939 517 : timer_service_access::set_timer(static_cast<io_context&>(ctx), svc);
940 517 : return svc;
941 : }
942 :
943 : } // namespace boost::corosio::detail
944 :
945 : #endif
|