LCOV - code coverage report
Current view: top level - corosio/detail - timer_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 91.8 % 376 345 31
Test Date: 2026-04-03 17:01:16 Functions: 97.8 % 46 45 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/corosio
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      12                 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      13                 : 
      14                 : #include <boost/corosio/timer.hpp>
      15                 : #include <boost/corosio/io_context.hpp>
      16                 : #include <boost/corosio/detail/scheduler_op.hpp>
      17                 : #include <boost/corosio/detail/intrusive.hpp>
      18                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      19                 : #include <boost/capy/error.hpp>
      20                 : #include <boost/capy/ex/execution_context.hpp>
      21                 : #include <boost/capy/ex/executor_ref.hpp>
      22                 : #include <system_error>
      23                 : 
      24                 : #include <atomic>
      25                 : #include <chrono>
      26                 : #include <coroutine>
      27                 : #include <cstddef>
      28                 : #include <limits>
      29                 : #include <mutex>
      30                 : #include <optional>
      31                 : #include <stop_token>
      32                 : #include <utility>
      33                 : #include <vector>
      34                 : 
      35                 : namespace boost::corosio::detail {
      36                 : 
      37                 : struct scheduler;
      38                 : 
      39                 : /*
      40                 :     Timer Service
      41                 :     =============
      42                 : 
      43                 :     Data Structures
      44                 :     ---------------
      45                 :     waiter_node holds per-waiter state: coroutine handle, executor,
      46                 :     error output, stop_token, embedded completion_op. Each concurrent
      47                 :     co_await t.wait() allocates one waiter_node.
      48                 : 
      49                 :     timer_service::implementation holds per-timer state: expiry,
      50                 :     heap index, and an intrusive_list of waiter_nodes. Multiple
      51                 :     coroutines can wait on the same timer simultaneously.
      52                 : 
      53                 :     timer_service owns a min-heap of active timers, a free list
      54                 :     of recycled impls, and a free list of recycled waiter_nodes. The
      55                 :     heap is ordered by expiry time; the scheduler queries
      56                 :     nearest_expiry() to set the epoll/timerfd timeout.
      57                 : 
      58                 :     Optimization Strategy
      59                 :     ---------------------
      60                 :     1. Deferred heap insertion — expires_after() stores the expiry
      61                 :        but does not insert into the heap. Insertion happens in wait().
      62                 :     2. Thread-local impl cache — single-slot per-thread cache.
      63                 :     3. Embedded completion_op — eliminates heap allocation per fire/cancel.
      64                 :     4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
      65                 :     5. might_have_pending_waits_ flag — skips lock when no wait issued.
      66                 :     6. Thread-local waiter cache — single-slot per-thread cache.
      67                 : 
      68                 :     Concurrency
      69                 :     -----------
      70                 :     stop_token callbacks can fire from any thread. The impl_
      71                 :     pointer on waiter_node is used as a "still in list" marker.
      72                 : */
      73                 : 
      74                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
      75                 : 
      76                 : inline void timer_service_invalidate_cache() noexcept;
      77                 : 
      78                 : // timer_service class body — member function definitions are
      79                 : // out-of-class (after implementation and waiter_node are complete)
      80                 : class BOOST_COROSIO_DECL timer_service final
      81                 :     : public capy::execution_context::service
      82                 :     , public io_object::io_service
      83                 : {
      84                 : public:
      85                 :     using clock_type = std::chrono::steady_clock;
      86                 :     using time_point = clock_type::time_point;
      87                 : 
      88                 :     /// Type-erased callback for earliest-expiry-changed notifications.
      89                 :     class callback
      90                 :     {
      91                 :         void* ctx_         = nullptr;
      92                 :         void (*fn_)(void*) = nullptr;
      93                 : 
      94                 :     public:
      95                 :         /// Construct an empty callback.
      96 HIT         517 :         callback() = default;
      97                 : 
      98                 :         /// Construct a callback with the given context and function.
      99             517 :         callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
     100                 : 
     101                 :         /// Return true if the callback is non-empty.
     102                 :         explicit operator bool() const noexcept
     103                 :         {
     104                 :             return fn_ != nullptr;
     105                 :         }
     106                 : 
     107                 :         /// Invoke the callback.
     108            7827 :         void operator()() const
     109                 :         {
     110            7827 :             if (fn_)
     111            7827 :                 fn_(ctx_);
     112            7827 :         }
     113                 :     };
     114                 : 
     115                 :     struct implementation;
     116                 : 
     117                 : private:
     118                 :     struct heap_entry
     119                 :     {
     120                 :         time_point time_;
     121                 :         implementation* timer_;
     122                 :     };
     123                 : 
     124                 :     scheduler* sched_ = nullptr;
     125                 :     mutable std::mutex mutex_;
     126                 :     std::vector<heap_entry> heap_;
     127                 :     implementation* free_list_     = nullptr;
     128                 :     waiter_node* waiter_free_list_ = nullptr;
     129                 :     callback on_earliest_changed_;
     130                 :     bool shutting_down_ = false;
     131                 :     // Avoids mutex in nearest_expiry() and empty()
     132                 :     mutable std::atomic<std::int64_t> cached_nearest_ns_{
     133                 :         (std::numeric_limits<std::int64_t>::max)()};
     134                 : 
     135                 : public:
     136                 :     /// Construct the timer service bound to a scheduler.
     137             517 :     inline timer_service(capy::execution_context&, scheduler& sched)
     138             517 :         : sched_(&sched)
     139                 :     {
     140             517 :     }
     141                 : 
     142                 :     /// Return the associated scheduler.
     143           15740 :     inline scheduler& get_scheduler() noexcept
     144                 :     {
     145           15740 :         return *sched_;
     146                 :     }
     147                 : 
     148                 :     /// Destroy the timer service.
     149            1034 :     ~timer_service() override = default;
     150                 : 
     151                 :     timer_service(timer_service const&)            = delete;
     152                 :     timer_service& operator=(timer_service const&) = delete;
     153                 : 
     154                 :     /// Register a callback invoked when the earliest expiry changes.
     155             517 :     inline void set_on_earliest_changed(callback cb)
     156                 :     {
     157             517 :         on_earliest_changed_ = cb;
     158             517 :     }
     159                 : 
     160                 :     /// Return true if no timers are in the heap.
     161                 :     inline bool empty() const noexcept
     162                 :     {
     163                 :         return cached_nearest_ns_.load(std::memory_order_acquire) ==
     164                 :             (std::numeric_limits<std::int64_t>::max)();
     165                 :     }
     166                 : 
     167                 :     /// Return the nearest timer expiry without acquiring the mutex.
     168          199099 :     inline time_point nearest_expiry() const noexcept
     169                 :     {
     170          199099 :         auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
     171          199099 :         return time_point(time_point::duration(ns));
     172                 :     }
     173                 : 
     174                 :     /// Cancel all pending timers and free cached resources.
     175                 :     inline void shutdown() override;
     176                 : 
     177                 :     /// Construct a new timer implementation.
     178                 :     inline io_object::implementation* construct() override;
     179                 : 
     180                 :     /// Destroy a timer implementation, cancelling pending waiters.
     181                 :     inline void destroy(io_object::implementation* p) override;
     182                 : 
     183                 :     /// Cancel and recycle a timer implementation.
     184                 :     inline void destroy_impl(implementation& impl);
     185                 : 
     186                 :     /// Create or recycle a waiter node.
     187                 :     inline waiter_node* create_waiter();
     188                 : 
     189                 :     /// Return a waiter node to the cache or free list.
     190                 :     inline void destroy_waiter(waiter_node* w);
     191                 : 
     192                 :     /// Update the timer expiry, cancelling existing waiters.
     193                 :     inline std::size_t update_timer(implementation& impl, time_point new_time);
     194                 : 
     195                 :     /// Insert a waiter into the timer's waiter list and the heap.
     196                 :     inline void insert_waiter(implementation& impl, waiter_node* w);
     197                 : 
     198                 :     /// Cancel all waiters on a timer.
     199                 :     inline std::size_t cancel_timer(implementation& impl);
     200                 : 
     201                 :     /// Cancel a single waiter ( stop_token callback path ).
     202                 :     inline void cancel_waiter(waiter_node* w);
     203                 : 
     204                 :     /// Cancel one waiter on a timer.
     205                 :     inline std::size_t cancel_one_waiter(implementation& impl);
     206                 : 
     207                 :     /// Complete all waiters whose timers have expired.
     208                 :     inline std::size_t process_expired();
     209                 : 
     210                 : private:
     211          237072 :     inline void refresh_cached_nearest() noexcept
     212                 :     {
     213          237072 :         auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
     214          236584 :                                 : heap_[0].time_.time_since_epoch().count();
     215          237072 :         cached_nearest_ns_.store(ns, std::memory_order_release);
     216          237072 :     }
     217                 : 
     218                 :     inline void remove_timer_impl(implementation& impl);
     219                 :     inline void up_heap(std::size_t index);
     220                 :     inline void down_heap(std::size_t index);
     221                 :     inline void swap_heap(std::size_t i1, std::size_t i2);
     222                 : };
     223                 : 
     224                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
     225                 :     : intrusive_list<waiter_node>::node
     226                 : {
     227                 :     // Embedded completion op — avoids heap allocation per fire/cancel
     228                 :     struct completion_op final : scheduler_op
     229                 :     {
     230                 :         waiter_node* waiter_ = nullptr;
     231                 : 
     232                 :         static void do_complete(
     233                 :             void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
     234                 : 
     235             210 :         completion_op() noexcept : scheduler_op(&do_complete) {}
     236                 : 
     237                 :         void operator()() override;
     238                 :         void destroy() override;
     239                 :     };
     240                 : 
     241                 :     // Per-waiter stop_token cancellation
     242                 :     struct canceller
     243                 :     {
     244                 :         waiter_node* waiter_;
     245                 :         void operator()() const;
     246                 :     };
     247                 : 
     248                 :     // nullptr once removed from timer's waiter list (concurrency marker)
     249                 :     timer_service::implementation* impl_ = nullptr;
     250                 :     timer_service* svc_                  = nullptr;
     251                 :     std::coroutine_handle<> h_;
     252                 :     capy::continuation* cont_            = nullptr;
     253                 :     capy::executor_ref d_;
     254                 :     std::error_code* ec_out_ = nullptr;
     255                 :     std::stop_token token_;
     256                 :     std::optional<std::stop_callback<canceller>> stop_cb_;
     257                 :     completion_op op_;
     258                 :     std::error_code ec_value_;
     259                 :     waiter_node* next_free_ = nullptr;
     260                 : 
     261             210 :     waiter_node() noexcept
     262             210 :     {
     263             210 :         op_.waiter_ = this;
     264             210 :     }
     265                 : };
     266                 : 
     267                 : struct timer_service::implementation final : timer::implementation
     268                 : {
     269                 :     using clock_type = std::chrono::steady_clock;
     270                 :     using time_point = clock_type::time_point;
     271                 :     using duration   = clock_type::duration;
     272                 : 
     273                 :     timer_service* svc_ = nullptr;
     274                 :     intrusive_list<waiter_node> waiters_;
     275                 : 
     276                 :     // Free list linkage (reused when impl is on free_list)
     277                 :     implementation* next_free_ = nullptr;
     278                 : 
     279                 :     inline explicit implementation(timer_service& svc) noexcept;
     280                 : 
     281                 :     inline std::coroutine_handle<> wait(
     282                 :         std::coroutine_handle<>,
     283                 :         capy::executor_ref,
     284                 :         std::stop_token,
     285                 :         std::error_code*,
     286                 :         capy::continuation*) override;
     287                 : };
     288                 : 
     289                 : // Thread-local caches avoid hot-path mutex acquisitions:
     290                 : // 1. Impl cache — single-slot, validated by comparing svc_
     291                 : // 2. Waiter cache — single-slot, no service affinity
     292                 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
     293                 : 
     294                 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
     295                 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
     296                 : 
     297                 : inline timer_service::implementation*
     298            8088 : try_pop_tl_cache(timer_service* svc) noexcept
     299                 : {
     300            8088 :     auto* impl = tl_cached_impl.get();
     301            8088 :     if (impl)
     302                 :     {
     303            7839 :         tl_cached_impl.set(nullptr);
     304            7839 :         if (impl->svc_ == svc)
     305            7839 :             return impl;
     306                 :         // Stale impl from a destroyed service
     307 MIS           0 :         delete impl;
     308                 :     }
     309 HIT         249 :     return nullptr;
     310                 : }
     311                 : 
     312                 : inline bool
     313            8080 : try_push_tl_cache(timer_service::implementation* impl) noexcept
     314                 : {
     315            8080 :     if (!tl_cached_impl.get())
     316                 :     {
     317            8000 :         tl_cached_impl.set(impl);
     318            8000 :         return true;
     319                 :     }
     320              80 :     return false;
     321                 : }
     322                 : 
     323                 : inline waiter_node*
     324            7874 : try_pop_waiter_tl_cache() noexcept
     325                 : {
     326            7874 :     auto* w = tl_cached_waiter.get();
     327            7874 :     if (w)
     328                 :     {
     329            7662 :         tl_cached_waiter.set(nullptr);
     330            7662 :         return w;
     331                 :     }
     332             212 :     return nullptr;
     333                 : }
     334                 : 
     335                 : inline bool
     336            7858 : try_push_waiter_tl_cache(waiter_node* w) noexcept
     337                 : {
     338            7858 :     if (!tl_cached_waiter.get())
     339                 :     {
     340            7778 :         tl_cached_waiter.set(w);
     341            7778 :         return true;
     342                 :     }
     343              80 :     return false;
     344                 : }
     345                 : 
     346                 : inline void
     347             517 : timer_service_invalidate_cache() noexcept
     348                 : {
     349             517 :     delete tl_cached_impl.get();
     350             517 :     tl_cached_impl.set(nullptr);
     351                 : 
     352             517 :     delete tl_cached_waiter.get();
     353             517 :     tl_cached_waiter.set(nullptr);
     354             517 : }
     355                 : 
     356                 : // timer_service out-of-class member function definitions
     357                 : 
     358             249 : inline timer_service::implementation::implementation(
     359             249 :     timer_service& svc) noexcept
     360             249 :     : svc_(&svc)
     361                 : {
     362             249 : }
     363                 : 
     364                 : inline void
     365             517 : timer_service::shutdown()
     366                 : {
     367             517 :     timer_service_invalidate_cache();
     368             517 :     shutting_down_ = true;
     369                 : 
     370                 :     // Snapshot impls and detach them from the heap so that
     371                 :     // coroutine-owned timer destructors (triggered by h.destroy()
     372                 :     // below) cannot re-enter remove_timer_impl() and mutate the
     373                 :     // vector during iteration.
     374             517 :     std::vector<implementation*> impls;
     375             517 :     impls.reserve(heap_.size());
     376             525 :     for (auto& entry : heap_)
     377                 :     {
     378               8 :         entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     379               8 :         impls.push_back(entry.timer_);
     380                 :     }
     381             517 :     heap_.clear();
     382             517 :     cached_nearest_ns_.store(
     383                 :         (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
     384                 : 
     385                 :     // Cancel waiting timers. Each waiter called work_started()
     386                 :     // in implementation::wait(). On IOCP the scheduler shutdown
     387                 :     // loop exits when outstanding_work_ reaches zero, so we must
     388                 :     // call work_finished() here to balance it. On other backends
     389                 :     // this is harmless.
     390             525 :     for (auto* impl : impls)
     391                 :     {
     392              16 :         while (auto* w = impl->waiters_.pop_front())
     393                 :         {
     394               8 :             w->stop_cb_.reset();
     395               8 :             auto h = std::exchange(w->h_, {});
     396               8 :             sched_->work_finished();
     397               8 :             if (h)
     398               8 :                 h.destroy();
     399               8 :             delete w;
     400               8 :         }
     401               8 :         delete impl;
     402                 :     }
     403                 : 
     404                 :     // Delete free-listed impls
     405             597 :     while (free_list_)
     406                 :     {
     407              80 :         auto* next = free_list_->next_free_;
     408              80 :         delete free_list_;
     409              80 :         free_list_ = next;
     410                 :     }
     411                 : 
     412                 :     // Delete free-listed waiters
     413             595 :     while (waiter_free_list_)
     414                 :     {
     415              78 :         auto* next = waiter_free_list_->next_free_;
     416              78 :         delete waiter_free_list_;
     417              78 :         waiter_free_list_ = next;
     418                 :     }
     419             517 : }
     420                 : 
     421                 : inline io_object::implementation*
     422            8088 : timer_service::construct()
     423                 : {
     424            8088 :     implementation* impl = try_pop_tl_cache(this);
     425            8088 :     if (impl)
     426                 :     {
     427            7839 :         impl->svc_        = this;
     428            7839 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     429            7839 :         impl->might_have_pending_waits_ = false;
     430            7839 :         return impl;
     431                 :     }
     432                 : 
     433             249 :     std::lock_guard lock(mutex_);
     434             249 :     if (free_list_)
     435                 :     {
     436 MIS           0 :         impl              = free_list_;
     437               0 :         free_list_        = impl->next_free_;
     438               0 :         impl->next_free_  = nullptr;
     439               0 :         impl->svc_        = this;
     440               0 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     441               0 :         impl->might_have_pending_waits_ = false;
     442                 :     }
     443                 :     else
     444                 :     {
     445 HIT         249 :         impl = new implementation(*this);
     446                 :     }
     447             249 :     return impl;
     448             249 : }
     449                 : 
     450                 : inline void
     451            8086 : timer_service::destroy(io_object::implementation* p)
     452                 : {
     453            8086 :     destroy_impl(static_cast<implementation&>(*p));
     454            8086 : }
     455                 : 
     456                 : inline void
     457            8086 : timer_service::destroy_impl(implementation& impl)
     458                 : {
     459                 :     // During shutdown the impl is owned by the shutdown loop.
     460                 :     // Re-entering here (from a coroutine-owned timer destructor
     461                 :     // triggered by h.destroy()) must not modify the heap or
     462                 :     // recycle the impl — shutdown deletes it directly.
     463            8086 :     if (shutting_down_)
     464            8006 :         return;
     465                 : 
     466            8080 :     cancel_timer(impl);
     467                 : 
     468            8080 :     if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
     469                 :     {
     470 MIS           0 :         std::lock_guard lock(mutex_);
     471               0 :         remove_timer_impl(impl);
     472               0 :         refresh_cached_nearest();
     473               0 :     }
     474                 : 
     475 HIT        8080 :     if (try_push_tl_cache(&impl))
     476            8000 :         return;
     477                 : 
     478              80 :     std::lock_guard lock(mutex_);
     479              80 :     impl.next_free_ = free_list_;
     480              80 :     free_list_      = &impl;
     481              80 : }
     482                 : 
     483                 : inline waiter_node*
     484            7874 : timer_service::create_waiter()
     485                 : {
     486            7874 :     if (auto* w = try_pop_waiter_tl_cache())
     487            7662 :         return w;
     488                 : 
     489             212 :     std::lock_guard lock(mutex_);
     490             212 :     if (waiter_free_list_)
     491                 :     {
     492               2 :         auto* w           = waiter_free_list_;
     493               2 :         waiter_free_list_ = w->next_free_;
     494               2 :         w->next_free_     = nullptr;
     495               2 :         return w;
     496                 :     }
     497                 : 
     498             210 :     return new waiter_node();
     499             212 : }
     500                 : 
     501                 : inline void
     502            7858 : timer_service::destroy_waiter(waiter_node* w)
     503                 : {
     504            7858 :     if (try_push_waiter_tl_cache(w))
     505            7778 :         return;
     506                 : 
     507              80 :     std::lock_guard lock(mutex_);
     508              80 :     w->next_free_     = waiter_free_list_;
     509              80 :     waiter_free_list_ = w;
     510              80 : }
     511                 : 
     512                 : inline std::size_t
     513               6 : timer_service::update_timer(implementation& impl, time_point new_time)
     514                 : {
     515                 :     bool in_heap =
     516               6 :         (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
     517               6 :     if (!in_heap && impl.waiters_.empty())
     518 MIS           0 :         return 0;
     519                 : 
     520 HIT           6 :     bool notify = false;
     521               6 :     intrusive_list<waiter_node> canceled;
     522                 : 
     523                 :     {
     524               6 :         std::lock_guard lock(mutex_);
     525                 : 
     526              16 :         while (auto* w = impl.waiters_.pop_front())
     527                 :         {
     528              10 :             w->impl_ = nullptr;
     529              10 :             canceled.push_back(w);
     530              10 :         }
     531                 : 
     532               6 :         if (impl.heap_index_ < heap_.size())
     533                 :         {
     534               6 :             time_point old_time           = heap_[impl.heap_index_].time_;
     535               6 :             heap_[impl.heap_index_].time_ = new_time;
     536                 : 
     537               6 :             if (new_time < old_time)
     538               6 :                 up_heap(impl.heap_index_);
     539                 :             else
     540 MIS           0 :                 down_heap(impl.heap_index_);
     541                 : 
     542 HIT           6 :             notify = (impl.heap_index_ == 0);
     543                 :         }
     544                 : 
     545               6 :         refresh_cached_nearest();
     546               6 :     }
     547                 : 
     548               6 :     std::size_t count = 0;
     549              16 :     while (auto* w = canceled.pop_front())
     550                 :     {
     551              10 :         w->ec_value_ = make_error_code(capy::error::canceled);
     552              10 :         sched_->post(&w->op_);
     553              10 :         ++count;
     554              10 :     }
     555                 : 
     556               6 :     if (notify)
     557               6 :         on_earliest_changed_();
     558                 : 
     559               6 :     return count;
     560                 : }
     561                 : 
     562                 : inline void
     563            7874 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
     564                 : {
     565            7874 :     bool notify = false;
     566                 :     {
     567            7874 :         std::lock_guard lock(mutex_);
     568            7874 :         if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
     569                 :         {
     570            7852 :             impl.heap_index_ = heap_.size();
     571            7852 :             heap_.push_back({impl.expiry_, &impl});
     572            7852 :             up_heap(heap_.size() - 1);
     573            7852 :             notify = (impl.heap_index_ == 0);
     574            7852 :             refresh_cached_nearest();
     575                 :         }
     576            7874 :         impl.waiters_.push_back(w);
     577            7874 :     }
     578            7874 :     if (notify)
     579            7821 :         on_earliest_changed_();
     580            7874 : }
     581                 : 
     582                 : inline std::size_t
     583            8088 : timer_service::cancel_timer(implementation& impl)
     584                 : {
     585            8088 :     if (!impl.might_have_pending_waits_)
     586            8064 :         return 0;
     587                 : 
     588                 :     // Not in heap and no waiters — just clear the flag
     589              24 :     if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
     590 MIS           0 :         impl.waiters_.empty())
     591                 :     {
     592               0 :         impl.might_have_pending_waits_ = false;
     593               0 :         return 0;
     594                 :     }
     595                 : 
     596 HIT          24 :     intrusive_list<waiter_node> canceled;
     597                 : 
     598                 :     {
     599              24 :         std::lock_guard lock(mutex_);
     600              24 :         remove_timer_impl(impl);
     601              52 :         while (auto* w = impl.waiters_.pop_front())
     602                 :         {
     603              28 :             w->impl_ = nullptr;
     604              28 :             canceled.push_back(w);
     605              28 :         }
     606              24 :         refresh_cached_nearest();
     607              24 :     }
     608                 : 
     609              24 :     impl.might_have_pending_waits_ = false;
     610                 : 
     611              24 :     std::size_t count = 0;
     612              52 :     while (auto* w = canceled.pop_front())
     613                 :     {
     614              28 :         w->ec_value_ = make_error_code(capy::error::canceled);
     615              28 :         sched_->post(&w->op_);
     616              28 :         ++count;
     617              28 :     }
     618                 : 
     619              24 :     return count;
     620                 : }
     621                 : 
     622                 : inline void
     623              30 : timer_service::cancel_waiter(waiter_node* w)
     624                 : {
     625                 :     {
     626              30 :         std::lock_guard lock(mutex_);
     627                 :         // Already removed by cancel_timer or process_expired
     628              30 :         if (!w->impl_)
     629 MIS           0 :             return;
     630 HIT          30 :         auto* impl = w->impl_;
     631              30 :         w->impl_   = nullptr;
     632              30 :         impl->waiters_.remove(w);
     633              30 :         if (impl->waiters_.empty())
     634                 :         {
     635              28 :             remove_timer_impl(*impl);
     636              28 :             impl->might_have_pending_waits_ = false;
     637                 :         }
     638              30 :         refresh_cached_nearest();
     639              30 :     }
     640                 : 
     641              30 :     w->ec_value_ = make_error_code(capy::error::canceled);
     642              30 :     sched_->post(&w->op_);
     643                 : }
     644                 : 
     645                 : inline std::size_t
     646               2 : timer_service::cancel_one_waiter(implementation& impl)
     647                 : {
     648               2 :     if (!impl.might_have_pending_waits_)
     649 MIS           0 :         return 0;
     650                 : 
     651 HIT           2 :     waiter_node* w = nullptr;
     652                 : 
     653                 :     {
     654               2 :         std::lock_guard lock(mutex_);
     655               2 :         w = impl.waiters_.pop_front();
     656               2 :         if (!w)
     657 MIS           0 :             return 0;
     658 HIT           2 :         w->impl_ = nullptr;
     659               2 :         if (impl.waiters_.empty())
     660                 :         {
     661 MIS           0 :             remove_timer_impl(impl);
     662               0 :             impl.might_have_pending_waits_ = false;
     663                 :         }
     664 HIT           2 :         refresh_cached_nearest();
     665               2 :     }
     666                 : 
     667               2 :     w->ec_value_ = make_error_code(capy::error::canceled);
     668               2 :     sched_->post(&w->op_);
     669               2 :     return 1;
     670                 : }
     671                 : 
     672                 : inline std::size_t
     673          229158 : timer_service::process_expired()
     674                 : {
     675          229158 :     intrusive_list<waiter_node> expired;
     676                 : 
     677                 :     {
     678          229158 :         std::lock_guard lock(mutex_);
     679          229158 :         auto now = clock_type::now();
     680                 : 
     681          236950 :         while (!heap_.empty() && heap_[0].time_ <= now)
     682                 :         {
     683            7792 :             implementation* t = heap_[0].timer_;
     684            7792 :             remove_timer_impl(*t);
     685           15588 :             while (auto* w = t->waiters_.pop_front())
     686                 :             {
     687            7796 :                 w->impl_     = nullptr;
     688            7796 :                 w->ec_value_ = {};
     689            7796 :                 expired.push_back(w);
     690            7796 :             }
     691            7792 :             t->might_have_pending_waits_ = false;
     692                 :         }
     693                 : 
     694          229158 :         refresh_cached_nearest();
     695          229158 :     }
     696                 : 
     697          229158 :     std::size_t count = 0;
     698          236954 :     while (auto* w = expired.pop_front())
     699                 :     {
     700            7796 :         sched_->post(&w->op_);
     701            7796 :         ++count;
     702            7796 :     }
     703                 : 
     704          229158 :     return count;
     705                 : }
     706                 : 
     707                 : inline void
     708            7844 : timer_service::remove_timer_impl(implementation& impl)
     709                 : {
     710            7844 :     std::size_t index = impl.heap_index_;
     711            7844 :     if (index >= heap_.size())
     712 MIS           0 :         return; // Not in heap
     713                 : 
     714 HIT        7844 :     if (index == heap_.size() - 1)
     715                 :     {
     716                 :         // Last element, just pop
     717             152 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     718             152 :         heap_.pop_back();
     719                 :     }
     720                 :     else
     721                 :     {
     722                 :         // Swap with last and reheapify
     723            7692 :         swap_heap(index, heap_.size() - 1);
     724            7692 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     725            7692 :         heap_.pop_back();
     726                 : 
     727            7692 :         if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
     728 MIS           0 :             up_heap(index);
     729                 :         else
     730 HIT        7692 :             down_heap(index);
     731                 :     }
     732                 : }
     733                 : 
     734                 : inline void
     735            7858 : timer_service::up_heap(std::size_t index)
     736                 : {
     737           15529 :     while (index > 0)
     738                 :     {
     739            7702 :         std::size_t parent = (index - 1) / 2;
     740            7702 :         if (!(heap_[index].time_ < heap_[parent].time_))
     741              31 :             break;
     742            7671 :         swap_heap(index, parent);
     743            7671 :         index = parent;
     744                 :     }
     745            7858 : }
     746                 : 
     747                 : inline void
     748            7692 : timer_service::down_heap(std::size_t index)
     749                 : {
     750            7692 :     std::size_t child = index * 2 + 1;
     751            7692 :     while (child < heap_.size())
     752                 :     {
     753               6 :         std::size_t min_child = (child + 1 == heap_.size() ||
     754 MIS           0 :                                  heap_[child].time_ < heap_[child + 1].time_)
     755 HIT           6 :             ? child
     756               6 :             : child + 1;
     757                 : 
     758               6 :         if (heap_[index].time_ < heap_[min_child].time_)
     759               6 :             break;
     760                 : 
     761 MIS           0 :         swap_heap(index, min_child);
     762               0 :         index = min_child;
     763               0 :         child = index * 2 + 1;
     764                 :     }
     765 HIT        7692 : }
     766                 : 
     767                 : inline void
     768           15363 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
     769                 : {
     770           15363 :     heap_entry tmp                = heap_[i1];
     771           15363 :     heap_[i1]                     = heap_[i2];
     772           15363 :     heap_[i2]                     = tmp;
     773           15363 :     heap_[i1].timer_->heap_index_ = i1;
     774           15363 :     heap_[i2].timer_->heap_index_ = i2;
     775           15363 : }
     776                 : 
     777                 : // waiter_node out-of-class member function definitions
     778                 : 
     779                 : inline void
     780              30 : waiter_node::canceller::operator()() const
     781                 : {
     782              30 :     waiter_->svc_->cancel_waiter(waiter_);
     783              30 : }
     784                 : 
     785                 : inline void
     786 MIS           0 : waiter_node::completion_op::do_complete(
     787                 :     [[maybe_unused]] void* owner,
     788                 :     scheduler_op* base,
     789                 :     std::uint32_t,
     790                 :     std::uint32_t)
     791                 : {
     792                 :     // owner is always non-null here. The destroy path (owner == nullptr)
     793                 :     // is unreachable because completion_op overrides destroy() directly,
     794                 :     // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
     795               0 :     BOOST_COROSIO_ASSERT(owner);
     796               0 :     static_cast<completion_op*>(base)->operator()();
     797               0 : }
     798                 : 
     799                 : inline void
     800 HIT        7858 : waiter_node::completion_op::operator()()
     801                 : {
     802            7858 :     auto* w = waiter_;
     803            7858 :     w->stop_cb_.reset();
     804            7858 :     if (w->ec_out_)
     805            7858 :         *w->ec_out_ = w->ec_value_;
     806                 : 
     807            7858 :     auto* cont  = w->cont_;
     808            7858 :     auto d      = w->d_;
     809            7858 :     auto* svc   = w->svc_;
     810            7858 :     auto& sched = svc->get_scheduler();
     811                 : 
     812            7858 :     svc->destroy_waiter(w);
     813                 : 
     814            7858 :     d.post(*cont);
     815            7858 :     sched.work_finished();
     816            7858 : }
     817                 : 
     818                 : // GCC 14 false-positive: inlining ~optional<stop_callback> through
     819                 : // delete loses track that stop_cb_ was already .reset() above.
     820                 : #if defined(__GNUC__) && !defined(__clang__)
     821                 : #pragma GCC diagnostic push
     822                 : #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
     823                 : #endif
     824                 : inline void
     825               8 : waiter_node::completion_op::destroy()
     826                 : {
     827                 :     // Called during scheduler shutdown drain when this completion_op is
     828                 :     // in the scheduler's ready queue (posted by cancel_timer() or
     829                 :     // process_expired()). Balances the work_started() from
     830                 :     // implementation::wait(). The scheduler drain loop separately
     831                 :     // balances the work_started() from post(). On IOCP both decrements
     832                 :     // are required for outstanding_work_ to reach zero; on other
     833                 :     // backends this is harmless.
     834                 :     //
     835                 :     // This override also prevents scheduler_op::destroy() from calling
     836                 :     // do_complete(nullptr, ...). See also: timer_service::shutdown()
     837                 :     // which drains waiters still in the timer heap (the other path).
     838               8 :     auto* w = waiter_;
     839               8 :     w->stop_cb_.reset();
     840               8 :     auto h      = std::exchange(w->h_, {});
     841               8 :     auto& sched = w->svc_->get_scheduler();
     842               8 :     delete w;
     843               8 :     sched.work_finished();
     844               8 :     if (h)
     845               8 :         h.destroy();
     846               8 : }
     847                 : #if defined(__GNUC__) && !defined(__clang__)
     848                 : #pragma GCC diagnostic pop
     849                 : #endif
     850                 : 
     851                 : inline std::coroutine_handle<>
     852            7875 : timer_service::implementation::wait(
     853                 :     std::coroutine_handle<> h,
     854                 :     capy::executor_ref d,
     855                 :     std::stop_token token,
     856                 :     std::error_code* ec,
     857                 :     capy::continuation* cont)
     858                 : {
     859                 :     // Already-expired fast path — no waiter_node, no mutex.
     860                 :     // Post instead of dispatch so the coroutine yields to the
     861                 :     // scheduler, allowing other queued work to run.
     862            7875 :     if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
     863                 :     {
     864            7853 :         if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
     865                 :         {
     866               1 :             if (ec)
     867               1 :                 *ec = {};
     868               1 :             d.post(*cont);
     869               1 :             return std::noop_coroutine();
     870                 :         }
     871                 :     }
     872                 : 
     873            7874 :     auto* w    = svc_->create_waiter();
     874            7874 :     w->impl_   = this;
     875            7874 :     w->svc_    = svc_;
     876            7874 :     w->h_      = h;
     877            7874 :     w->cont_   = cont;
     878            7874 :     w->d_      = d;
     879            7874 :     w->token_  = std::move(token);
     880            7874 :     w->ec_out_ = ec;
     881                 : 
     882            7874 :     svc_->insert_waiter(*this, w);
     883            7874 :     might_have_pending_waits_ = true;
     884            7874 :     svc_->get_scheduler().work_started();
     885                 : 
     886            7874 :     if (w->token_.stop_possible())
     887              48 :         w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
     888                 : 
     889            7874 :     return std::noop_coroutine();
     890                 : }
     891                 : 
     892                 : // Free functions
     893                 : 
     894                 : struct timer_service_access
     895                 : {
     896            8088 :     static timer_service& get_timer(io_context& ctx) noexcept
     897                 :     {
     898            8088 :         return *ctx.timer_svc_;
     899                 :     }
     900                 : 
     901             517 :     static void set_timer(io_context& ctx, timer_service& svc) noexcept
     902                 :     {
     903             517 :         ctx.timer_svc_ = &svc;
     904             517 :     }
     905                 : };
     906                 : 
     907                 : // Bypass find_service() mutex by reading io_context's cached pointer
     908                 : inline io_object::io_service&
     909            8088 : timer_service_direct(capy::execution_context& ctx) noexcept
     910                 : {
     911            8088 :     return timer_service_access::get_timer(static_cast<io_context&>(ctx));
     912                 : }
     913                 : 
     914                 : inline std::size_t
     915               6 : timer_service_update_expiry(timer::implementation& base)
     916                 : {
     917               6 :     auto& impl = static_cast<timer_service::implementation&>(base);
     918               6 :     return impl.svc_->update_timer(impl, impl.expiry_);
     919                 : }
     920                 : 
     921                 : inline std::size_t
     922               8 : timer_service_cancel(timer::implementation& base) noexcept
     923                 : {
     924               8 :     auto& impl = static_cast<timer_service::implementation&>(base);
     925               8 :     return impl.svc_->cancel_timer(impl);
     926                 : }
     927                 : 
     928                 : inline std::size_t
     929               2 : timer_service_cancel_one(timer::implementation& base) noexcept
     930                 : {
     931               2 :     auto& impl = static_cast<timer_service::implementation&>(base);
     932               2 :     return impl.svc_->cancel_one_waiter(impl);
     933                 : }
     934                 : 
     935                 : inline timer_service&
     936             517 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
     937                 : {
     938             517 :     auto& svc = ctx.make_service<timer_service>(sched);
     939             517 :     timer_service_access::set_timer(static_cast<io_context&>(ctx), svc);
     940             517 :     return svc;
     941                 : }
     942                 : 
     943                 : } // namespace boost::corosio::detail
     944                 : 
     945                 : #endif
        

Generated by: LCOV version 2.3