libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2) -% Branches (0/0)
libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/scheduler.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include "src/detail/scheduler_op.hpp"
22 #include "src/detail/timer_service.hpp"
23
24 #include <atomic>
25 #include <condition_variable>
26 #include <cstddef>
27 #include <cstdint>
28 #include <mutex>
29
30 namespace boost::corosio::detail {
31
32 struct epoll_op;
33 struct descriptor_state;
34 struct scheduler_context;
35
36 /** Linux scheduler using epoll for I/O multiplexing.
37
38 This scheduler implements the scheduler interface using Linux epoll
39 for efficient I/O event notification. It uses a single reactor model
40 where one thread runs epoll_wait while other threads
41 wait on a condition variable for handler work. This design provides:
42
43 - Handler parallelism: N posted handlers can execute on N threads
44 - No thundering herd: condition_variable wakes exactly one thread
45 - IOCP parity: Behavior matches Windows I/O completion port semantics
46
47 When threads call run(), they first try to execute queued handlers.
48 If the queue is empty and no reactor is running, one thread becomes
49 the reactor and runs epoll_wait. Other threads wait on a condition
50 variable until handlers are available.
51
52 @par Thread Safety
53 All public member functions are thread-safe.
54 */
55 class epoll_scheduler
56 : public scheduler
57 , public capy::execution_context::service
58 {
59 public:
60 using key_type = scheduler;
61
62 /** Construct the scheduler.
63
64 Creates an epoll instance, eventfd for reactor interruption,
65 and timerfd for kernel-managed timer expiry.
66
67 @param ctx Reference to the owning execution_context.
68 @param concurrency_hint Hint for expected thread count (unused).
69 */
70 epoll_scheduler(
71 capy::execution_context& ctx,
72 int concurrency_hint = -1);
73
74 /// Destroy the scheduler.
75 ~epoll_scheduler();
76
77 epoll_scheduler(epoll_scheduler const&) = delete;
78 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
79
80 void shutdown() override;
81 void post(std::coroutine_handle<> h) const override;
82 void post(scheduler_op* h) const override;
83 void on_work_started() noexcept override;
84 void on_work_finished() noexcept override;
85 bool running_in_this_thread() const noexcept override;
86 void stop() override;
87 bool stopped() const noexcept override;
88 void restart() override;
89 std::size_t run() override;
90 std::size_t run_one() override;
91 std::size_t wait_one(long usec) override;
92 std::size_t poll() override;
93 std::size_t poll_one() override;
94
95 /** Return the epoll file descriptor.
96
97 Used by socket services to register file descriptors
98 for I/O event notification.
99
100 @return The epoll file descriptor.
101 */
102 int epoll_fd() const noexcept { return epoll_fd_; }
103
104 /** Reset the thread's inline completion budget.
105
106 Called at the start of each posted completion handler to
107 grant a fresh budget for speculative inline completions.
108 */
109 void reset_inline_budget() const noexcept;
110
111 /** Consume one unit of inline budget if available.
112
113 @return True if budget was available and consumed.
114 */
115 bool try_consume_inline_budget() const noexcept;
116
117 /** Register a descriptor for persistent monitoring.
118
119 The fd is registered once and stays registered until explicitly
120 deregistered. Events are dispatched via descriptor_state which
121 tracks pending read/write/connect operations.
122
123 @param fd The file descriptor to register.
124 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
125 */
126 void register_descriptor(int fd, descriptor_state* desc) const;
127
128 /** Deregister a persistently registered descriptor.
129
130 @param fd The file descriptor to deregister.
131 */
132 void deregister_descriptor(int fd) const;
133
134 /** For use by I/O operations to track pending work. */
135 void work_started() const noexcept override;
136
137 /** For use by I/O operations to track completed work. */
138 void work_finished() const noexcept override;
139
140 /** Offset a forthcoming work_finished from work_cleanup.
141
142 Called by descriptor_state when all I/O returned EAGAIN and no
143 handler will be executed. Must be called from a scheduler thread.
144 */
145 void compensating_work_started() const noexcept;
146
147 /** Drain work from thread context's private queue to global queue.
148
149 Called by thread_context_guard destructor when a thread exits run().
150 Transfers pending work to the global queue under mutex protection.
151
152 @param queue The private queue to drain.
153 @param count Item count for wakeup decisions (wakes other threads if positive).
154 */
155 void drain_thread_queue(op_queue& queue, long count) const;
156
157 /** Post completed operations for deferred invocation.
158
159 If called from a thread running this scheduler, operations go to
160 the thread's private queue (fast path). Otherwise, operations are
161 added to the global queue under mutex and a waiter is signaled.
162
163 @par Preconditions
164 work_started() must have been called for each operation.
165
166 @param ops Queue of operations to post.
167 */
168 void post_deferred_completions(op_queue& ops) const;
169
170 private:
171 friend struct work_cleanup;
172 friend struct task_cleanup;
173
174 std::size_t do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx);
175 void run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx);
176 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
177 void interrupt_reactor() const;
178 void update_timerfd() const;
179
180 /** Set the signaled state and wake all waiting threads.
181
182 @par Preconditions
183 Mutex must be held.
184
185 @param lock The held mutex lock.
186 */
187 void signal_all(std::unique_lock<std::mutex>& lock) const;
188
189 /** Set the signaled state and wake one waiter if any exist.
190
191 Only unlocks and signals if at least one thread is waiting.
192 Use this when the caller needs to perform a fallback action
193 (such as interrupting the reactor) when no waiters exist.
194
195 @par Preconditions
196 Mutex must be held.
197
198 @param lock The held mutex lock.
199
200 @return `true` if unlocked and signaled, `false` if lock still held.
201 */
202 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
203
204 /** Set the signaled state, unlock, and wake one waiter if any exist.
205
206 Always unlocks the mutex. Use this when the caller will release
207 the lock regardless of whether a waiter exists.
208
209 @par Preconditions
210 Mutex must be held.
211
212 @param lock The held mutex lock.
213 */
214 void unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
215
216 /** Clear the signaled state before waiting.
217
218 @par Preconditions
219 Mutex must be held.
220 */
221 void clear_signal() const;
222
223 /** Block until the signaled state is set.
224
225 Returns immediately if already signaled (fast-path). Otherwise
226 increments the waiter count, waits on the condition variable,
227 and decrements the waiter count upon waking.
228
229 @par Preconditions
230 Mutex must be held.
231
232 @param lock The held mutex lock.
233 */
234 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
235
236 /** Block until signaled or timeout expires.
237
238 @par Preconditions
239 Mutex must be held.
240
241 @param lock The held mutex lock.
242 @param timeout_us Maximum time to wait in microseconds.
243 */
244 void wait_for_signal_for(
245 std::unique_lock<std::mutex>& lock,
246 long timeout_us) const;
247
248 int epoll_fd_;
249 int event_fd_; // for interrupting reactor
250 int timer_fd_; // timerfd for kernel-managed timer expiry
251 int max_inline_budget_ = 2;
252 mutable std::mutex mutex_;
253 mutable std::condition_variable cond_;
254 mutable op_queue completed_ops_;
255 mutable std::atomic<long> outstanding_work_;
256 bool stopped_;
257 bool shutdown_;
258 timer_service* timer_svc_ = nullptr;
259
260 // True while a thread is blocked in epoll_wait. Used by
261 // wake_one_thread_and_unlock and work_finished to know when
262 // an eventfd interrupt is needed instead of a condvar signal.
263 mutable std::atomic<bool> task_running_{false};
264
265 // True when the reactor has been told to do a non-blocking poll
266 // (more handlers queued or poll mode). Prevents redundant eventfd
267 // writes and controls the epoll_wait timeout.
268 mutable bool task_interrupted_ = false;
269
270 // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
271 mutable std::size_t state_ = 0;
272
273 // Edge-triggered eventfd state
274 mutable std::atomic<bool> eventfd_armed_{false};
275
276 // Set when the earliest timer changes; flushed before epoll_wait
277 // blocks. Avoids timerfd_settime syscalls for timers that are
278 // scheduled then cancelled without being waited on.
279 mutable std::atomic<bool> timerfd_stale_{false};
280
281 // Sentinel operation for interleaving reactor runs with handler execution.
282 // Ensures the reactor runs periodically even when handlers are continuously
283 // posted, preventing starvation of I/O events, timers, and signals.
284 struct task_op final : scheduler_op
285 {
286 void operator()() override {}
287 void destroy() override {}
288 };
289 task_op task_op_;
290 };
291
292 } // namespace boost::corosio::detail
293
294 #endif // BOOST_COROSIO_HAS_EPOLL
295
296 #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
297