1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29  

29  

30  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <cstdint>
34  
#include <cstdint>
35  
#include <mutex>
35  
#include <mutex>
36  
#include <vector>
36  
#include <vector>
37  

37  

38  
#include <errno.h>
38  
#include <errno.h>
39  
#include <sys/epoll.h>
39  
#include <sys/epoll.h>
40  
#include <sys/eventfd.h>
40  
#include <sys/eventfd.h>
41  
#include <sys/timerfd.h>
41  
#include <sys/timerfd.h>
42  
#include <unistd.h>
42  
#include <unistd.h>
43  

43  

44  
namespace boost::corosio::detail {
44  
namespace boost::corosio::detail {
45  

45  

46  
struct epoll_op;
46  
struct epoll_op;
47  
struct descriptor_state;
47  
struct descriptor_state;
48  

48  

49  
/** Linux scheduler using epoll for I/O multiplexing.
49  
/** Linux scheduler using epoll for I/O multiplexing.
50  

50  

51  
    This scheduler implements the scheduler interface using Linux epoll
51  
    This scheduler implements the scheduler interface using Linux epoll
52  
    for efficient I/O event notification. It uses a single reactor model
52  
    for efficient I/O event notification. It uses a single reactor model
53  
    where one thread runs epoll_wait while other threads
53  
    where one thread runs epoll_wait while other threads
54  
    wait on a condition variable for handler work. This design provides:
54  
    wait on a condition variable for handler work. This design provides:
55  

55  

56  
    - Handler parallelism: N posted handlers can execute on N threads
56  
    - Handler parallelism: N posted handlers can execute on N threads
57  
    - No thundering herd: condition_variable wakes exactly one thread
57  
    - No thundering herd: condition_variable wakes exactly one thread
58  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
58  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
59  

59  

60  
    When threads call run(), they first try to execute queued handlers.
60  
    When threads call run(), they first try to execute queued handlers.
61  
    If the queue is empty and no reactor is running, one thread becomes
61  
    If the queue is empty and no reactor is running, one thread becomes
62  
    the reactor and runs epoll_wait. Other threads wait on a condition
62  
    the reactor and runs epoll_wait. Other threads wait on a condition
63  
    variable until handlers are available.
63  
    variable until handlers are available.
64  

64  

65  
    @par Thread Safety
65  
    @par Thread Safety
66  
    All public member functions are thread-safe.
66  
    All public member functions are thread-safe.
67  
*/
67  
*/
68 -
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68 +
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler
69  
{
69  
{
70  
public:
70  
public:
71  
    /** Construct the scheduler.
71  
    /** Construct the scheduler.
72  

72  

73  
        Creates an epoll instance, eventfd for reactor interruption,
73  
        Creates an epoll instance, eventfd for reactor interruption,
74  
        and timerfd for kernel-managed timer expiry.
74  
        and timerfd for kernel-managed timer expiry.
75  

75  

76  
        @param ctx Reference to the owning execution_context.
76  
        @param ctx Reference to the owning execution_context.
77  
        @param concurrency_hint Hint for expected thread count (unused).
77  
        @param concurrency_hint Hint for expected thread count (unused).
78  
    */
78  
    */
79  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80  

80  

81  
    /// Destroy the scheduler.
81  
    /// Destroy the scheduler.
82  
    ~epoll_scheduler() override;
82  
    ~epoll_scheduler() override;
83  

83  

84  
    epoll_scheduler(epoll_scheduler const&)            = delete;
84  
    epoll_scheduler(epoll_scheduler const&)            = delete;
85  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
86  

86  

87  
    /// Shut down the scheduler, draining pending operations.
87  
    /// Shut down the scheduler, draining pending operations.
88  
    void shutdown() override;
88  
    void shutdown() override;
89  

89  

90  
    /// Apply runtime configuration, resizing the event buffer.
90  
    /// Apply runtime configuration, resizing the event buffer.
91  
    void configure_reactor(
91  
    void configure_reactor(
92  
        unsigned max_events,
92  
        unsigned max_events,
93  
        unsigned budget_init,
93  
        unsigned budget_init,
94  
        unsigned budget_max,
94  
        unsigned budget_max,
95  
        unsigned unassisted) override;
95  
        unsigned unassisted) override;
96  

96  

97  
    /** Return the epoll file descriptor.
97  
    /** Return the epoll file descriptor.
98  

98  

99  
        Used by socket services to register file descriptors
99  
        Used by socket services to register file descriptors
100  
        for I/O event notification.
100  
        for I/O event notification.
101  

101  

102  
        @return The epoll file descriptor.
102  
        @return The epoll file descriptor.
103  
    */
103  
    */
104  
    int epoll_fd() const noexcept
104  
    int epoll_fd() const noexcept
105  
    {
105  
    {
106  
        return epoll_fd_;
106  
        return epoll_fd_;
107  
    }
107  
    }
108  

108  

109  
    /** Register a descriptor for persistent monitoring.
109  
    /** Register a descriptor for persistent monitoring.
110  

110  

111  
        The fd is registered once and stays registered until explicitly
111  
        The fd is registered once and stays registered until explicitly
112  
        deregistered. Events are dispatched via descriptor_state which
112  
        deregistered. Events are dispatched via descriptor_state which
113  
        tracks pending read/write/connect operations.
113  
        tracks pending read/write/connect operations.
114  

114  

115  
        @param fd The file descriptor to register.
115  
        @param fd The file descriptor to register.
116  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
116  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
117  
    */
117  
    */
118  
    void register_descriptor(int fd, descriptor_state* desc) const;
118  
    void register_descriptor(int fd, descriptor_state* desc) const;
119  

119  

120  
    /** Deregister a persistently registered descriptor.
120  
    /** Deregister a persistently registered descriptor.
121  

121  

122  
        @param fd The file descriptor to deregister.
122  
        @param fd The file descriptor to deregister.
123  
    */
123  
    */
124  
    void deregister_descriptor(int fd) const;
124  
    void deregister_descriptor(int fd) const;
125  

125  

126  
private:
126  
private:
127  
    void
127  
    void
128  
    run_task(lock_type& lock, context_type* ctx,
128  
    run_task(lock_type& lock, context_type* ctx,
129  
        long timeout_us) override;
129  
        long timeout_us) override;
130  
    void interrupt_reactor() const override;
130  
    void interrupt_reactor() const override;
131  
    void update_timerfd() const;
131  
    void update_timerfd() const;
132  

132  

133  
    int epoll_fd_;
133  
    int epoll_fd_;
134  
    int event_fd_;
134  
    int event_fd_;
135  
    int timer_fd_;
135  
    int timer_fd_;
136  

136  

137  
    // Edge-triggered eventfd state
137  
    // Edge-triggered eventfd state
138  
    mutable std::atomic<bool> eventfd_armed_{false};
138  
    mutable std::atomic<bool> eventfd_armed_{false};
139  

139  

140  
    // Set when the earliest timer changes; flushed before epoll_wait
140  
    // Set when the earliest timer changes; flushed before epoll_wait
141  
    mutable std::atomic<bool> timerfd_stale_{false};
141  
    mutable std::atomic<bool> timerfd_stale_{false};
142  

142  

143  
    // Event buffer sized from max_events_per_poll_ (set at construction,
143  
    // Event buffer sized from max_events_per_poll_ (set at construction,
144  
    // resized by configure_reactor via io_context_options).
144  
    // resized by configure_reactor via io_context_options).
145  
    std::vector<epoll_event> event_buffer_;
145  
    std::vector<epoll_event> event_buffer_;
146  
};
146  
};
147  

147  

148  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
148  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
149  
    : epoll_fd_(-1)
149  
    : epoll_fd_(-1)
150  
    , event_fd_(-1)
150  
    , event_fd_(-1)
151  
    , timer_fd_(-1)
151  
    , timer_fd_(-1)
152  
    , event_buffer_(max_events_per_poll_)
152  
    , event_buffer_(max_events_per_poll_)
153  
{
153  
{
154  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
154  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
155  
    if (epoll_fd_ < 0)
155  
    if (epoll_fd_ < 0)
156  
        detail::throw_system_error(make_err(errno), "epoll_create1");
156  
        detail::throw_system_error(make_err(errno), "epoll_create1");
157  

157  

158  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
158  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
159  
    if (event_fd_ < 0)
159  
    if (event_fd_ < 0)
160  
    {
160  
    {
161  
        int errn = errno;
161  
        int errn = errno;
162  
        ::close(epoll_fd_);
162  
        ::close(epoll_fd_);
163  
        detail::throw_system_error(make_err(errn), "eventfd");
163  
        detail::throw_system_error(make_err(errn), "eventfd");
164  
    }
164  
    }
165  

165  

166  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
166  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
167  
    if (timer_fd_ < 0)
167  
    if (timer_fd_ < 0)
168  
    {
168  
    {
169  
        int errn = errno;
169  
        int errn = errno;
170  
        ::close(event_fd_);
170  
        ::close(event_fd_);
171  
        ::close(epoll_fd_);
171  
        ::close(epoll_fd_);
172  
        detail::throw_system_error(make_err(errn), "timerfd_create");
172  
        detail::throw_system_error(make_err(errn), "timerfd_create");
173  
    }
173  
    }
174  

174  

175  
    epoll_event ev{};
175  
    epoll_event ev{};
176  
    ev.events   = EPOLLIN | EPOLLET;
176  
    ev.events   = EPOLLIN | EPOLLET;
177  
    ev.data.ptr = nullptr;
177  
    ev.data.ptr = nullptr;
178  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
178  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
179  
    {
179  
    {
180  
        int errn = errno;
180  
        int errn = errno;
181  
        ::close(timer_fd_);
181  
        ::close(timer_fd_);
182  
        ::close(event_fd_);
182  
        ::close(event_fd_);
183  
        ::close(epoll_fd_);
183  
        ::close(epoll_fd_);
184  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
184  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
185  
    }
185  
    }
186  

186  

187  
    epoll_event timer_ev{};
187  
    epoll_event timer_ev{};
188  
    timer_ev.events   = EPOLLIN | EPOLLERR;
188  
    timer_ev.events   = EPOLLIN | EPOLLERR;
189  
    timer_ev.data.ptr = &timer_fd_;
189  
    timer_ev.data.ptr = &timer_fd_;
190  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
190  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
191  
    {
191  
    {
192  
        int errn = errno;
192  
        int errn = errno;
193  
        ::close(timer_fd_);
193  
        ::close(timer_fd_);
194  
        ::close(event_fd_);
194  
        ::close(event_fd_);
195  
        ::close(epoll_fd_);
195  
        ::close(epoll_fd_);
196  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
196  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
197  
    }
197  
    }
198  

198  

199  
    timer_svc_ = &get_timer_service(ctx, *this);
199  
    timer_svc_ = &get_timer_service(ctx, *this);
200  
    timer_svc_->set_on_earliest_changed(
200  
    timer_svc_->set_on_earliest_changed(
201  
        timer_service::callback(this, [](void* p) {
201  
        timer_service::callback(this, [](void* p) {
202  
            auto* self = static_cast<epoll_scheduler*>(p);
202  
            auto* self = static_cast<epoll_scheduler*>(p);
203  
            self->timerfd_stale_.store(true, std::memory_order_release);
203  
            self->timerfd_stale_.store(true, std::memory_order_release);
204  
            self->interrupt_reactor();
204  
            self->interrupt_reactor();
205  
        }));
205  
        }));
206  

206  

207  
    get_resolver_service(ctx, *this);
207  
    get_resolver_service(ctx, *this);
208  
    get_signal_service(ctx, *this);
208  
    get_signal_service(ctx, *this);
209  
    get_stream_file_service(ctx, *this);
209  
    get_stream_file_service(ctx, *this);
210  
    get_random_access_file_service(ctx, *this);
210  
    get_random_access_file_service(ctx, *this);
211  

211  

212  
    completed_ops_.push(&task_op_);
212  
    completed_ops_.push(&task_op_);
213  
}
213  
}
214  

214  

215  
inline epoll_scheduler::~epoll_scheduler()
215  
inline epoll_scheduler::~epoll_scheduler()
216  
{
216  
{
217  
    if (timer_fd_ >= 0)
217  
    if (timer_fd_ >= 0)
218  
        ::close(timer_fd_);
218  
        ::close(timer_fd_);
219  
    if (event_fd_ >= 0)
219  
    if (event_fd_ >= 0)
220  
        ::close(event_fd_);
220  
        ::close(event_fd_);
221  
    if (epoll_fd_ >= 0)
221  
    if (epoll_fd_ >= 0)
222  
        ::close(epoll_fd_);
222  
        ::close(epoll_fd_);
223  
}
223  
}
224  

224  

225  
inline void
225  
inline void
226  
epoll_scheduler::shutdown()
226  
epoll_scheduler::shutdown()
227  
{
227  
{
228  
    shutdown_drain();
228  
    shutdown_drain();
229  

229  

230  
    if (event_fd_ >= 0)
230  
    if (event_fd_ >= 0)
231  
        interrupt_reactor();
231  
        interrupt_reactor();
232  
}
232  
}
233  

233  

234  
inline void
234  
inline void
235  
epoll_scheduler::configure_reactor(
235  
epoll_scheduler::configure_reactor(
236  
    unsigned max_events,
236  
    unsigned max_events,
237  
    unsigned budget_init,
237  
    unsigned budget_init,
238  
    unsigned budget_max,
238  
    unsigned budget_max,
239  
    unsigned unassisted)
239  
    unsigned unassisted)
240  
{
240  
{
241 -
    reactor_scheduler_base::configure_reactor(
241 +
    reactor_scheduler::configure_reactor(
242  
        max_events, budget_init, budget_max, unassisted);
242  
        max_events, budget_init, budget_max, unassisted);
243  
    event_buffer_.resize(max_events_per_poll_);
243  
    event_buffer_.resize(max_events_per_poll_);
244  
}
244  
}
245  

245  

246  
inline void
246  
inline void
247  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
247  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
248  
{
248  
{
249  
    epoll_event ev{};
249  
    epoll_event ev{};
250  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
250  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
251  
    ev.data.ptr = desc;
251  
    ev.data.ptr = desc;
252  

252  

253  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
253  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
254  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
254  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
255  

255  

256  
    desc->registered_events = ev.events;
256  
    desc->registered_events = ev.events;
257  
    desc->fd                = fd;
257  
    desc->fd                = fd;
258  
    desc->scheduler_        = this;
258  
    desc->scheduler_        = this;
259  
    desc->mutex.set_enabled(!single_threaded_);
259  
    desc->mutex.set_enabled(!single_threaded_);
260  
    desc->ready_events_.store(0, std::memory_order_relaxed);
260  
    desc->ready_events_.store(0, std::memory_order_relaxed);
261  

261  

262  
    conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
262  
    conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
263  
    desc->impl_ref_.reset();
263  
    desc->impl_ref_.reset();
264  
    desc->read_ready  = false;
264  
    desc->read_ready  = false;
265  
    desc->write_ready = false;
265  
    desc->write_ready = false;
266  
}
266  
}
267  

267  

268  
inline void
268  
inline void
269  
epoll_scheduler::deregister_descriptor(int fd) const
269  
epoll_scheduler::deregister_descriptor(int fd) const
270  
{
270  
{
271  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
271  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
272  
}
272  
}
273  

273  

274  
inline void
274  
inline void
275  
epoll_scheduler::interrupt_reactor() const
275  
epoll_scheduler::interrupt_reactor() const
276  
{
276  
{
277  
    bool expected = false;
277  
    bool expected = false;
278  
    if (eventfd_armed_.compare_exchange_strong(
278  
    if (eventfd_armed_.compare_exchange_strong(
279  
            expected, true, std::memory_order_release,
279  
            expected, true, std::memory_order_release,
280  
            std::memory_order_relaxed))
280  
            std::memory_order_relaxed))
281  
    {
281  
    {
282  
        std::uint64_t val       = 1;
282  
        std::uint64_t val       = 1;
283  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
283  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
284  
    }
284  
    }
285  
}
285  
}
286  

286  

287  
inline void
287  
inline void
288  
epoll_scheduler::update_timerfd() const
288  
epoll_scheduler::update_timerfd() const
289  
{
289  
{
290  
    auto nearest = timer_svc_->nearest_expiry();
290  
    auto nearest = timer_svc_->nearest_expiry();
291  

291  

292  
    itimerspec ts{};
292  
    itimerspec ts{};
293  
    int flags = 0;
293  
    int flags = 0;
294  

294  

295  
    if (nearest == timer_service::time_point::max())
295  
    if (nearest == timer_service::time_point::max())
296  
    {
296  
    {
297  
        // No timers — disarm by setting to 0 (relative)
297  
        // No timers — disarm by setting to 0 (relative)
298  
    }
298  
    }
299  
    else
299  
    else
300  
    {
300  
    {
301  
        auto now = std::chrono::steady_clock::now();
301  
        auto now = std::chrono::steady_clock::now();
302  
        if (nearest <= now)
302  
        if (nearest <= now)
303  
        {
303  
        {
304  
            // Use 1ns instead of 0 — zero disarms the timerfd
304  
            // Use 1ns instead of 0 — zero disarms the timerfd
305  
            ts.it_value.tv_nsec = 1;
305  
            ts.it_value.tv_nsec = 1;
306  
        }
306  
        }
307  
        else
307  
        else
308  
        {
308  
        {
309  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
309  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
310  
                            nearest - now)
310  
                            nearest - now)
311  
                            .count();
311  
                            .count();
312  
            ts.it_value.tv_sec  = nsec / 1000000000;
312  
            ts.it_value.tv_sec  = nsec / 1000000000;
313  
            ts.it_value.tv_nsec = nsec % 1000000000;
313  
            ts.it_value.tv_nsec = nsec % 1000000000;
314  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
314  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
315  
                ts.it_value.tv_nsec = 1;
315  
                ts.it_value.tv_nsec = 1;
316  
        }
316  
        }
317  
    }
317  
    }
318  

318  

319  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
319  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
320  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
320  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
321  
}
321  
}
322  

322  

323  
inline void
323  
inline void
324  
epoll_scheduler::run_task(
324  
epoll_scheduler::run_task(
325  
    lock_type& lock, context_type* ctx, long timeout_us)
325  
    lock_type& lock, context_type* ctx, long timeout_us)
326  
{
326  
{
327  
    int timeout_ms;
327  
    int timeout_ms;
328  
    if (task_interrupted_)
328  
    if (task_interrupted_)
329  
        timeout_ms = 0;
329  
        timeout_ms = 0;
330  
    else if (timeout_us < 0)
330  
    else if (timeout_us < 0)
331  
        timeout_ms = -1;
331  
        timeout_ms = -1;
332  
    else
332  
    else
333  
        timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
333  
        timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
334  

334  

335  
    if (lock.owns_lock())
335  
    if (lock.owns_lock())
336  
        lock.unlock();
336  
        lock.unlock();
337  

337  

338  
    task_cleanup on_exit{this, &lock, ctx};
338  
    task_cleanup on_exit{this, &lock, ctx};
339  

339  

340  
    // Flush deferred timerfd programming before blocking
340  
    // Flush deferred timerfd programming before blocking
341  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
341  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
342  
        update_timerfd();
342  
        update_timerfd();
343  

343  

344  
    int nfds = ::epoll_wait(
344  
    int nfds = ::epoll_wait(
345  
        epoll_fd_, event_buffer_.data(),
345  
        epoll_fd_, event_buffer_.data(),
346  
        static_cast<int>(event_buffer_.size()), timeout_ms);
346  
        static_cast<int>(event_buffer_.size()), timeout_ms);
347  

347  

348  
    if (nfds < 0 && errno != EINTR)
348  
    if (nfds < 0 && errno != EINTR)
349  
        detail::throw_system_error(make_err(errno), "epoll_wait");
349  
        detail::throw_system_error(make_err(errno), "epoll_wait");
350  

350  

351  
    bool check_timers = false;
351  
    bool check_timers = false;
352  
    op_queue local_ops;
352  
    op_queue local_ops;
353  

353  

354  
    for (int i = 0; i < nfds; ++i)
354  
    for (int i = 0; i < nfds; ++i)
355  
    {
355  
    {
356  
        if (event_buffer_[i].data.ptr == nullptr)
356  
        if (event_buffer_[i].data.ptr == nullptr)
357  
        {
357  
        {
358  
            std::uint64_t val;
358  
            std::uint64_t val;
359  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
359  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
360  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
360  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
361  
            eventfd_armed_.store(false, std::memory_order_relaxed);
361  
            eventfd_armed_.store(false, std::memory_order_relaxed);
362  
            continue;
362  
            continue;
363  
        }
363  
        }
364  

364  

365  
        if (event_buffer_[i].data.ptr == &timer_fd_)
365  
        if (event_buffer_[i].data.ptr == &timer_fd_)
366  
        {
366  
        {
367  
            std::uint64_t expirations;
367  
            std::uint64_t expirations;
368  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
368  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
369  
            [[maybe_unused]] auto r =
369  
            [[maybe_unused]] auto r =
370  
                ::read(timer_fd_, &expirations, sizeof(expirations));
370  
                ::read(timer_fd_, &expirations, sizeof(expirations));
371  
            check_timers = true;
371  
            check_timers = true;
372  
            continue;
372  
            continue;
373  
        }
373  
        }
374  

374  

375  
        auto* desc =
375  
        auto* desc =
376  
            static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
376  
            static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
377  
        desc->add_ready_events(event_buffer_[i].events);
377  
        desc->add_ready_events(event_buffer_[i].events);
378  

378  

379  
        bool expected = false;
379  
        bool expected = false;
380  
        if (desc->is_enqueued_.compare_exchange_strong(
380  
        if (desc->is_enqueued_.compare_exchange_strong(
381  
                expected, true, std::memory_order_release,
381  
                expected, true, std::memory_order_release,
382  
                std::memory_order_relaxed))
382  
                std::memory_order_relaxed))
383  
        {
383  
        {
384  
            local_ops.push(desc);
384  
            local_ops.push(desc);
385  
        }
385  
        }
386  
    }
386  
    }
387  

387  

388  
    if (check_timers)
388  
    if (check_timers)
389  
    {
389  
    {
390  
        timer_svc_->process_expired();
390  
        timer_svc_->process_expired();
391  
        update_timerfd();
391  
        update_timerfd();
392  
    }
392  
    }
393  

393  

394  
    lock.lock();
394  
    lock.lock();
395  

395  

396  
    if (!local_ops.empty())
396  
    if (!local_ops.empty())
397  
        completed_ops_.splice(local_ops);
397  
        completed_ops_.splice(local_ops);
398  
}
398  
}
399  

399  

400  
} // namespace boost::corosio::detail
400  
} // namespace boost::corosio::detail
401  

401  

402  
#endif // BOOST_COROSIO_HAS_EPOLL
402  
#endif // BOOST_COROSIO_HAS_EPOLL
403  

403  

404  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
404  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP