libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

80.9% Lines (402/497) 89.6% Functions (43/48) 68.6% Branches (208/303)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25 #include <utility>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on cond_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The task_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Signaling State (state_)
64 ------------------------
65 The state_ variable encodes two pieces of information:
66 - Bit 0: signaled flag (1 = signaled, persists until cleared)
67 - Upper bits: waiter count (each waiter adds 2 before blocking)
68
69 This allows efficient coordination:
70 - Signalers only call notify when waiters exist (state_ > 1)
71 - Waiters check if already signaled before blocking (fast-path)
72
73 Wake Coordination (wake_one_thread_and_unlock)
74 ----------------------------------------------
75 When posting work:
76 - If waiters exist (state_ > 1): signal and notify_one()
77 - Else if reactor running: interrupt via eventfd write
78 - Else: no-op (thread will find work when it checks queue)
79
80 This avoids waking threads unnecessarily. With cascading wakes,
81 each handler execution wakes at most one additional thread if
82 more work exists in the queue.
83
84 Work Counting
85 -------------
86 outstanding_work_ tracks pending operations. When it hits zero, run()
87 returns. Each operation increments on start, decrements on completion.
88
89 Timer Integration
90 -----------------
91 Timers are handled by timer_service. The reactor adjusts epoll_wait
92 timeout to wake for the nearest timer expiry. When a new timer is
93 scheduled earlier than current, timer_service calls interrupt_reactor()
94 to re-evaluate the timeout.
95 */
96
97 namespace boost::corosio::detail {
98
99 struct scheduler_context
100 {
101 epoll_scheduler const* key;
102 scheduler_context* next;
103 op_queue private_queue;
104 long private_outstanding_work;
105 int inline_budget;
106
107 164 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
108 164 : key(k)
109 164 , next(n)
110 164 , private_outstanding_work(0)
111 164 , inline_budget(0)
112 {
113 164 }
114 };
115
116 namespace {
117
118 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
119
120 struct thread_context_guard
121 {
122 scheduler_context frame_;
123
124 164 explicit thread_context_guard(
125 epoll_scheduler const* ctx) noexcept
126 164 : frame_(ctx, context_stack.get())
127 {
128 164 context_stack.set(&frame_);
129 164 }
130
131 164 ~thread_context_guard() noexcept
132 {
133
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 164 times.
164 if (!frame_.private_queue.empty())
134 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
135 164 context_stack.set(frame_.next);
136 164 }
137 };
138
139 scheduler_context*
140 442734 find_context(epoll_scheduler const* self) noexcept
141 {
142
2/2
✓ Branch 1 taken 441084 times.
✓ Branch 2 taken 1650 times.
442734 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
143
1/2
✓ Branch 0 taken 441084 times.
✗ Branch 1 not taken.
441084 if (c->key == self)
144 441084 return c;
145 1650 return nullptr;
146 }
147
148 } // namespace
149
150 void
151 81024 epoll_scheduler::
152 reset_inline_budget() const noexcept
153 {
154
1/2
✓ Branch 1 taken 81024 times.
✗ Branch 2 not taken.
81024 if (auto* ctx = find_context(this))
155 81024 ctx->inline_budget = max_inline_budget_;
156 81024 }
157
158 bool
159 226754 epoll_scheduler::
160 try_consume_inline_budget() const noexcept
161 {
162
1/2
✓ Branch 1 taken 226754 times.
✗ Branch 2 not taken.
226754 if (auto* ctx = find_context(this))
163 {
164
2/2
✓ Branch 0 taken 151224 times.
✓ Branch 1 taken 75530 times.
226754 if (ctx->inline_budget > 0)
165 {
166 151224 --ctx->inline_budget;
167 151224 return true;
168 }
169 }
170 75530 return false;
171 }
172
173 void
174 57007 descriptor_state::
175 operator()()
176 {
177 57007 is_enqueued_.store(false, std::memory_order_relaxed);
178
179 // Take ownership of impl ref set by close_socket() to prevent
180 // the owning impl from being freed while we're executing
181 57007 auto prevent_impl_destruction = std::move(impl_ref_);
182
183 57007 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 57007 times.
57007 if (ev == 0)
185 {
186 scheduler_->compensating_work_started();
187 return;
188 }
189
190 57007 op_queue local_ops;
191
192 57007 int err = 0;
193
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 57006 times.
57007 if (ev & EPOLLERR)
194 {
195 1 socklen_t len = sizeof(err);
196
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
197 err = errno;
198
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
199 1 err = EIO;
200 }
201
202 {
203
1/1
✓ Branch 1 taken 57007 times.
57007 std::lock_guard lock(mutex);
204
2/2
✓ Branch 0 taken 16492 times.
✓ Branch 1 taken 40515 times.
57007 if (ev & EPOLLIN)
205 {
206
2/2
✓ Branch 0 taken 2645 times.
✓ Branch 1 taken 13847 times.
16492 if (read_op)
207 {
208 2645 auto* rd = read_op;
209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2645 times.
2645 if (err)
210 rd->complete(err, 0);
211 else
212 2645 rd->perform_io();
213
214
2/4
✓ Branch 0 taken 2645 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2645 times.
2645 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
215 {
216 rd->errn = 0;
217 }
218 else
219 {
220 2645 read_op = nullptr;
221 2645 local_ops.push(rd);
222 }
223 }
224 else
225 {
226 13847 read_ready = true;
227 }
228 }
229
2/2
✓ Branch 0 taken 54366 times.
✓ Branch 1 taken 2641 times.
57007 if (ev & EPOLLOUT)
230 {
231
3/4
✓ Branch 0 taken 51722 times.
✓ Branch 1 taken 2644 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 51722 times.
54366 bool had_write_op = (connect_op || write_op);
232
2/2
✓ Branch 0 taken 2644 times.
✓ Branch 1 taken 51722 times.
54366 if (connect_op)
233 {
234 2644 auto* cn = connect_op;
235
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2644 times.
2644 if (err)
236 cn->complete(err, 0);
237 else
238 2644 cn->perform_io();
239 2644 connect_op = nullptr;
240 2644 local_ops.push(cn);
241 }
242
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 54366 times.
54366 if (write_op)
243 {
244 auto* wr = write_op;
245 if (err)
246 wr->complete(err, 0);
247 else
248 wr->perform_io();
249
250 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
251 {
252 wr->errn = 0;
253 }
254 else
255 {
256 write_op = nullptr;
257 local_ops.push(wr);
258 }
259 }
260
2/2
✓ Branch 0 taken 51722 times.
✓ Branch 1 taken 2644 times.
54366 if (!had_write_op)
261 51722 write_ready = true;
262 }
263
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 57006 times.
57007 if (err)
264 {
265
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
266 {
267 read_op->complete(err, 0);
268 local_ops.push(std::exchange(read_op, nullptr));
269 }
270
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
271 {
272 write_op->complete(err, 0);
273 local_ops.push(std::exchange(write_op, nullptr));
274 }
275
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
276 {
277 connect_op->complete(err, 0);
278 local_ops.push(std::exchange(connect_op, nullptr));
279 }
280 }
281 57007 }
282
283 // Execute first handler inline — the scheduler's work_cleanup
284 // accounts for this as the "consumed" work item
285 57007 scheduler_op* first = local_ops.pop();
286
2/2
✓ Branch 0 taken 5289 times.
✓ Branch 1 taken 51718 times.
57007 if (first)
287 {
288
1/1
✓ Branch 1 taken 5289 times.
5289 scheduler_->post_deferred_completions(local_ops);
289
1/1
✓ Branch 1 taken 5289 times.
5289 (*first)();
290 }
291 else
292 {
293 51718 scheduler_->compensating_work_started();
294 }
295 57007 }
296
297 190 epoll_scheduler::
298 epoll_scheduler(
299 capy::execution_context& ctx,
300 190 int)
301 190 : epoll_fd_(-1)
302 190 , event_fd_(-1)
303 190 , timer_fd_(-1)
304 190 , outstanding_work_(0)
305 190 , stopped_(false)
306 190 , shutdown_(false)
307 190 , task_running_{false}
308 190 , task_interrupted_(false)
309 380 , state_(0)
310 {
311 190 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
312
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 190 times.
190 if (epoll_fd_ < 0)
313 detail::throw_system_error(make_err(errno), "epoll_create1");
314
315 190 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
316
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 190 times.
190 if (event_fd_ < 0)
317 {
318 int errn = errno;
319 ::close(epoll_fd_);
320 detail::throw_system_error(make_err(errn), "eventfd");
321 }
322
323 190 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
324
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 190 times.
190 if (timer_fd_ < 0)
325 {
326 int errn = errno;
327 ::close(event_fd_);
328 ::close(epoll_fd_);
329 detail::throw_system_error(make_err(errn), "timerfd_create");
330 }
331
332 190 epoll_event ev{};
333 190 ev.events = EPOLLIN | EPOLLET;
334 190 ev.data.ptr = nullptr;
335
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 190 times.
190 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
336 {
337 int errn = errno;
338 ::close(timer_fd_);
339 ::close(event_fd_);
340 ::close(epoll_fd_);
341 detail::throw_system_error(make_err(errn), "epoll_ctl");
342 }
343
344 190 epoll_event timer_ev{};
345 190 timer_ev.events = EPOLLIN | EPOLLERR;
346 190 timer_ev.data.ptr = &timer_fd_;
347
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 190 times.
190 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
348 {
349 int errn = errno;
350 ::close(timer_fd_);
351 ::close(event_fd_);
352 ::close(epoll_fd_);
353 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
354 }
355
356
1/1
✓ Branch 1 taken 190 times.
190 timer_svc_ = &get_timer_service(ctx, *this);
357
1/1
✓ Branch 3 taken 190 times.
190 timer_svc_->set_on_earliest_changed(
358 timer_service::callback(
359 this,
360 [](void* p) {
361 2834 auto* self = static_cast<epoll_scheduler*>(p);
362 2834 self->timerfd_stale_.store(true, std::memory_order_release);
363
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2834 times.
2834 if (self->task_running_.load(std::memory_order_acquire))
364 self->interrupt_reactor();
365 2834 }));
366
367 // Initialize resolver service
368
1/1
✓ Branch 1 taken 190 times.
190 get_resolver_service(ctx, *this);
369
370 // Initialize signal service
371
1/1
✓ Branch 1 taken 190 times.
190 get_signal_service(ctx, *this);
372
373 // Push task sentinel to interleave reactor runs with handler execution
374 190 completed_ops_.push(&task_op_);
375 190 }
376
377 380 epoll_scheduler::
378 190 ~epoll_scheduler()
379 {
380
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (timer_fd_ >= 0)
381 190 ::close(timer_fd_);
382
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (event_fd_ >= 0)
383 190 ::close(event_fd_);
384
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (epoll_fd_ >= 0)
385 190 ::close(epoll_fd_);
386 380 }
387
388 void
389 190 epoll_scheduler::
390 shutdown()
391 {
392 {
393
1/1
✓ Branch 1 taken 190 times.
190 std::unique_lock lock(mutex_);
394 190 shutdown_ = true;
395
396
2/2
✓ Branch 1 taken 190 times.
✓ Branch 2 taken 190 times.
380 while (auto* h = completed_ops_.pop())
397 {
398
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (h == &task_op_)
399 190 continue;
400 lock.unlock();
401 h->destroy();
402 lock.lock();
403 190 }
404
405 190 signal_all(lock);
406 190 }
407
408 190 outstanding_work_.store(0, std::memory_order_release);
409
410
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (event_fd_ >= 0)
411 190 interrupt_reactor();
412 190 }
413
414 void
415 4626 epoll_scheduler::
416 post(std::coroutine_handle<> h) const
417 {
418 struct post_handler final
419 : scheduler_op
420 {
421 std::coroutine_handle<> h_;
422
423 explicit
424 4626 post_handler(std::coroutine_handle<> h)
425 4626 : h_(h)
426 {
427 4626 }
428
429 9252 ~post_handler() = default;
430
431 4626 void operator()() override
432 {
433 4626 auto h = h_;
434
1/2
✓ Branch 0 taken 4626 times.
✗ Branch 1 not taken.
4626 delete this;
435
1/1
✓ Branch 1 taken 4626 times.
4626 h.resume();
436 4626 }
437
438 void destroy() override
439 {
440 delete this;
441 }
442 };
443
444
1/1
✓ Branch 1 taken 4626 times.
4626 auto ph = std::make_unique<post_handler>(h);
445
446 // Fast path: same thread posts to private queue
447 // Only count locally; work_cleanup batches to global counter
448
2/2
✓ Branch 1 taken 3002 times.
✓ Branch 2 taken 1624 times.
4626 if (auto* ctx = find_context(this))
449 {
450 3002 ++ctx->private_outstanding_work;
451 3002 ctx->private_queue.push(ph.release());
452 3002 return;
453 }
454
455 // Slow path: cross-thread post requires mutex
456 1624 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
457
458
1/1
✓ Branch 1 taken 1624 times.
1624 std::unique_lock lock(mutex_);
459 1624 completed_ops_.push(ph.release());
460
1/1
✓ Branch 1 taken 1624 times.
1624 wake_one_thread_and_unlock(lock);
461 4626 }
462
463 void
464 78612 epoll_scheduler::
465 post(scheduler_op* h) const
466 {
467 // Fast path: same thread posts to private queue
468 // Only count locally; work_cleanup batches to global counter
469
2/2
✓ Branch 1 taken 78586 times.
✓ Branch 2 taken 26 times.
78612 if (auto* ctx = find_context(this))
470 {
471 78586 ++ctx->private_outstanding_work;
472 78586 ctx->private_queue.push(h);
473 78586 return;
474 }
475
476 // Slow path: cross-thread post requires mutex
477 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
478
479
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
480 26 completed_ops_.push(h);
481
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
482 26 }
483
484 void
485 3512 epoll_scheduler::
486 on_work_started() noexcept
487 {
488 3512 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
489 3512 }
490
491 void
492 3480 epoll_scheduler::
493 on_work_finished() noexcept
494 {
495
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3480 times.
6960 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
496 stop();
497 3480 }
498
499 bool
500 151883 epoll_scheduler::
501 running_in_this_thread() const noexcept
502 {
503
2/2
✓ Branch 1 taken 151672 times.
✓ Branch 2 taken 211 times.
151883 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
504
1/2
✓ Branch 0 taken 151672 times.
✗ Branch 1 not taken.
151672 if (c->key == this)
505 151672 return true;
506 211 return false;
507 }
508
509 void
510 35 epoll_scheduler::
511 stop()
512 {
513
1/1
✓ Branch 1 taken 35 times.
35 std::unique_lock lock(mutex_);
514
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 15 times.
35 if (!stopped_)
515 {
516 20 stopped_ = true;
517 20 signal_all(lock);
518
1/1
✓ Branch 1 taken 20 times.
20 interrupt_reactor();
519 }
520 35 }
521
522 bool
523 16 epoll_scheduler::
524 stopped() const noexcept
525 {
526 16 std::unique_lock lock(mutex_);
527 32 return stopped_;
528 16 }
529
530 void
531 49 epoll_scheduler::
532 restart()
533 {
534
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
535 49 stopped_ = false;
536 49 }
537
538 std::size_t
539 176 epoll_scheduler::
540 run()
541 {
542
2/2
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 150 times.
352 if (outstanding_work_.load(std::memory_order_acquire) == 0)
543 {
544
1/1
✓ Branch 1 taken 26 times.
26 stop();
545 26 return 0;
546 }
547
548 150 thread_context_guard ctx(this);
549
1/1
✓ Branch 1 taken 150 times.
150 std::unique_lock lock(mutex_);
550
551 150 std::size_t n = 0;
552 for (;;)
553 {
554
3/3
✓ Branch 1 taken 140380 times.
✓ Branch 3 taken 150 times.
✓ Branch 4 taken 140230 times.
140380 if (!do_one(lock, -1, &ctx.frame_))
555 150 break;
556
1/2
✓ Branch 1 taken 140230 times.
✗ Branch 2 not taken.
140230 if (n != (std::numeric_limits<std::size_t>::max)())
557 140230 ++n;
558
2/2
✓ Branch 1 taken 61571 times.
✓ Branch 2 taken 78659 times.
140230 if (!lock.owns_lock())
559
1/1
✓ Branch 1 taken 61571 times.
61571 lock.lock();
560 }
561 150 return n;
562 150 }
563
564 std::size_t
565 2 epoll_scheduler::
566 run_one()
567 {
568
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
569 {
570 stop();
571 return 0;
572 }
573
574 2 thread_context_guard ctx(this);
575
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
576
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
577 2 }
578
579 std::size_t
580 14 epoll_scheduler::
581 wait_one(long usec)
582 {
583
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
584 {
585
1/1
✓ Branch 1 taken 5 times.
5 stop();
586 5 return 0;
587 }
588
589 9 thread_context_guard ctx(this);
590
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
591
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
592 9 }
593
594 std::size_t
595 2 epoll_scheduler::
596 poll()
597 {
598
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
599 {
600
1/1
✓ Branch 1 taken 1 time.
1 stop();
601 1 return 0;
602 }
603
604 1 thread_context_guard ctx(this);
605
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
606
607 1 std::size_t n = 0;
608 for (;;)
609 {
610
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
611 1 break;
612
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
613 2 ++n;
614
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
615
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
616 }
617 1 return n;
618 1 }
619
620 std::size_t
621 4 epoll_scheduler::
622 poll_one()
623 {
624
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
625 {
626
1/1
✓ Branch 1 taken 2 times.
2 stop();
627 2 return 0;
628 }
629
630 2 thread_context_guard ctx(this);
631
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
632
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
633 2 }
634
635 void
636 5360 epoll_scheduler::
637 register_descriptor(int fd, descriptor_state* desc) const
638 {
639 5360 epoll_event ev{};
640 5360 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
641 5360 ev.data.ptr = desc;
642
643
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5360 times.
5360 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
644 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
645
646 5360 desc->registered_events = ev.events;
647 5360 desc->fd = fd;
648 5360 desc->scheduler_ = this;
649
650
1/1
✓ Branch 1 taken 5360 times.
5360 std::lock_guard lock(desc->mutex);
651 5360 desc->read_ready = false;
652 5360 desc->write_ready = false;
653 5360 }
654
655 void
656 5360 epoll_scheduler::
657 deregister_descriptor(int fd) const
658 {
659 5360 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
660 5360 }
661
662 void
663 5492 epoll_scheduler::
664 work_started() const noexcept
665 {
666 5492 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
667 5492 }
668
669 void
670 10100 epoll_scheduler::
671 work_finished() const noexcept
672 {
673
2/2
✓ Branch 0 taken 149 times.
✓ Branch 1 taken 9951 times.
20200 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
674 {
675 // Last work item completed - wake all threads so they can exit.
676 // signal_all() wakes threads waiting on the condvar.
677 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
678 // Both are needed because they target different blocking mechanisms.
679 149 std::unique_lock lock(mutex_);
680 149 signal_all(lock);
681
5/6
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 147 times.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 147 times.
149 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
682 {
683 2 task_interrupted_ = true;
684 2 lock.unlock();
685 2 interrupt_reactor();
686 }
687 149 }
688 10100 }
689
690 void
691 51718 epoll_scheduler::
692 compensating_work_started() const noexcept
693 {
694 51718 auto* ctx = find_context(this);
695
1/2
✓ Branch 0 taken 51718 times.
✗ Branch 1 not taken.
51718 if (ctx)
696 51718 ++ctx->private_outstanding_work;
697 51718 }
698
699 void
700 epoll_scheduler::
701 drain_thread_queue(op_queue& queue, long count) const
702 {
703 // Note: outstanding_work_ was already incremented when posting
704 std::unique_lock lock(mutex_);
705 completed_ops_.splice(queue);
706 if (count > 0)
707 maybe_unlock_and_signal_one(lock);
708 }
709
710 void
711 5289 epoll_scheduler::
712 post_deferred_completions(op_queue& ops) const
713 {
714
1/2
✓ Branch 1 taken 5289 times.
✗ Branch 2 not taken.
5289 if (ops.empty())
715 5289 return;
716
717 // Fast path: if on scheduler thread, use private queue
718 if (auto* ctx = find_context(this))
719 {
720 ctx->private_queue.splice(ops);
721 return;
722 }
723
724 // Slow path: add to global queue and wake a thread
725 std::unique_lock lock(mutex_);
726 completed_ops_.splice(ops);
727 wake_one_thread_and_unlock(lock);
728 }
729
730 void
731 238 epoll_scheduler::
732 interrupt_reactor() const
733 {
734 // Only write if not already armed to avoid redundant writes
735 238 bool expected = false;
736
2/2
✓ Branch 1 taken 224 times.
✓ Branch 2 taken 14 times.
238 if (eventfd_armed_.compare_exchange_strong(expected, true,
737 std::memory_order_release, std::memory_order_relaxed))
738 {
739 224 std::uint64_t val = 1;
740
1/1
✓ Branch 1 taken 224 times.
224 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
741 }
742 238 }
743
744 void
745 359 epoll_scheduler::
746 signal_all(std::unique_lock<std::mutex>&) const
747 {
748 359 state_ |= 1;
749 359 cond_.notify_all();
750 359 }
751
752 bool
753 1650 epoll_scheduler::
754 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
755 {
756 1650 state_ |= 1;
757
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1650 times.
1650 if (state_ > 1)
758 {
759 lock.unlock();
760 cond_.notify_one();
761 return true;
762 }
763 1650 return false;
764 }
765
766 void
767 177001 epoll_scheduler::
768 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
769 {
770 177001 state_ |= 1;
771 177001 bool have_waiters = state_ > 1;
772 177001 lock.unlock();
773
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 177001 times.
177001 if (have_waiters)
774 cond_.notify_one();
775 177001 }
776
777 void
778 epoll_scheduler::
779 clear_signal() const
780 {
781 state_ &= ~std::size_t(1);
782 }
783
784 void
785 epoll_scheduler::
786 wait_for_signal(std::unique_lock<std::mutex>& lock) const
787 {
788 while ((state_ & 1) == 0)
789 {
790 state_ += 2;
791 cond_.wait(lock);
792 state_ -= 2;
793 }
794 }
795
796 void
797 epoll_scheduler::
798 wait_for_signal_for(
799 std::unique_lock<std::mutex>& lock,
800 long timeout_us) const
801 {
802 if ((state_ & 1) == 0)
803 {
804 state_ += 2;
805 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
806 state_ -= 2;
807 }
808 }
809
810 void
811 1650 epoll_scheduler::
812 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
813 {
814
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1650 times.
1650 if (maybe_unlock_and_signal_one(lock))
815 return;
816
817
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1624 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1624 times.
1650 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
818 {
819 26 task_interrupted_ = true;
820 26 lock.unlock();
821 26 interrupt_reactor();
822 }
823 else
824 {
825 1624 lock.unlock();
826 }
827 }
828
829 /** RAII guard for handler execution work accounting.
830
831 Handler consumes 1 work item, may produce N new items via fast-path posts.
832 Net change = N - 1:
833 - If N > 1: add (N-1) to global (more work produced than consumed)
834 - If N == 1: net zero, do nothing
835 - If N < 1: call work_finished() (work consumed, may trigger stop)
836
837 Also drains private queue to global for other threads to process.
838 */
839 struct work_cleanup
840 {
841 epoll_scheduler const* scheduler;
842 std::unique_lock<std::mutex>* lock;
843 scheduler_context* ctx;
844
845 140245 ~work_cleanup()
846 {
847
1/2
✓ Branch 0 taken 140245 times.
✗ Branch 1 not taken.
140245 if (ctx)
848 {
849 140245 long produced = ctx->private_outstanding_work;
850
2/2
✓ Branch 0 taken 89 times.
✓ Branch 1 taken 140156 times.
140245 if (produced > 1)
851 89 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
852
2/2
✓ Branch 0 taken 9865 times.
✓ Branch 1 taken 130291 times.
140156 else if (produced < 1)
853 9865 scheduler->work_finished();
854 // produced == 1: net zero, handler consumed what it produced
855 140245 ctx->private_outstanding_work = 0;
856
857
2/2
✓ Branch 1 taken 78662 times.
✓ Branch 2 taken 61583 times.
140245 if (!ctx->private_queue.empty())
858 {
859 78662 lock->lock();
860 78662 scheduler->completed_ops_.splice(ctx->private_queue);
861 }
862 }
863 else
864 {
865 // No thread context - slow-path op was already counted globally
866 scheduler->work_finished();
867 }
868 140245 }
869 };
870
871 /** RAII guard for reactor work accounting.
872
873 Reactor only produces work via timer/signal callbacks posting handlers.
874 Unlike handler execution which consumes 1, the reactor consumes nothing.
875 All produced work must be flushed to global counter.
876 */
877 struct task_cleanup
878 {
879 epoll_scheduler const* scheduler;
880 std::unique_lock<std::mutex>* lock;
881 scheduler_context* ctx;
882
883 42263 ~task_cleanup()
884 42263 {
885
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42263 times.
42263 if (!ctx)
886 return;
887
888
2/2
✓ Branch 0 taken 2835 times.
✓ Branch 1 taken 39428 times.
42263 if (ctx->private_outstanding_work > 0)
889 {
890 2835 scheduler->outstanding_work_.fetch_add(
891 2835 ctx->private_outstanding_work, std::memory_order_relaxed);
892 2835 ctx->private_outstanding_work = 0;
893 }
894
895
2/2
✓ Branch 1 taken 2835 times.
✓ Branch 2 taken 39428 times.
42263 if (!ctx->private_queue.empty())
896 {
897
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2835 times.
2835 if (!lock->owns_lock())
898 lock->lock();
899 2835 scheduler->completed_ops_.splice(ctx->private_queue);
900 }
901 42263 }
902 };
903
904 void
905 5665 epoll_scheduler::
906 update_timerfd() const
907 {
908 5665 auto nearest = timer_svc_->nearest_expiry();
909
910 5665 itimerspec ts{};
911 5665 int flags = 0;
912
913
3/3
✓ Branch 2 taken 5665 times.
✓ Branch 4 taken 5625 times.
✓ Branch 5 taken 40 times.
5665 if (nearest == timer_service::time_point::max())
914 {
915 // No timers - disarm by setting to 0 (relative)
916 }
917 else
918 {
919 5625 auto now = std::chrono::steady_clock::now();
920
3/3
✓ Branch 1 taken 5625 times.
✓ Branch 4 taken 14 times.
✓ Branch 5 taken 5611 times.
5625 if (nearest <= now)
921 {
922 // Use 1ns instead of 0 - zero disarms the timerfd
923 14 ts.it_value.tv_nsec = 1;
924 }
925 else
926 {
927 5611 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
928
1/1
✓ Branch 1 taken 5611 times.
11222 nearest - now).count();
929 5611 ts.it_value.tv_sec = nsec / 1000000000;
930 5611 ts.it_value.tv_nsec = nsec % 1000000000;
931 // Ensure non-zero to avoid disarming if duration rounds to 0
932
3/4
✓ Branch 0 taken 5607 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5607 times.
5611 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
933 ts.it_value.tv_nsec = 1;
934 }
935 }
936
937
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5665 times.
5665 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
938 detail::throw_system_error(make_err(errno), "timerfd_settime");
939 5665 }
940
941 void
942 42263 epoll_scheduler::
943 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
944 {
945
2/2
✓ Branch 0 taken 36756 times.
✓ Branch 1 taken 5507 times.
42263 int timeout_ms = task_interrupted_ ? 0 : -1;
946
947
2/2
✓ Branch 1 taken 5507 times.
✓ Branch 2 taken 36756 times.
42263 if (lock.owns_lock())
948
1/1
✓ Branch 1 taken 5507 times.
5507 lock.unlock();
949
950 42263 task_cleanup on_exit{this, &lock, ctx};
951
952 // Flush deferred timerfd programming before blocking
953
2/2
✓ Branch 1 taken 2830 times.
✓ Branch 2 taken 39433 times.
42263 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
954
1/1
✓ Branch 1 taken 2830 times.
2830 update_timerfd();
955
956 // Event loop runs without mutex held
957 epoll_event events[128];
958
1/1
✓ Branch 1 taken 42263 times.
42263 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
959
960
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 42263 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
42263 if (nfds < 0 && errno != EINTR)
961 detail::throw_system_error(make_err(errno), "epoll_wait");
962
963 42263 bool check_timers = false;
964 42263 op_queue local_ops;
965
966 // Process events without holding the mutex
967
2/2
✓ Branch 0 taken 59876 times.
✓ Branch 1 taken 42263 times.
102139 for (int i = 0; i < nfds; ++i)
968 {
969
2/2
✓ Branch 0 taken 34 times.
✓ Branch 1 taken 59842 times.
59876 if (events[i].data.ptr == nullptr)
970 {
971 std::uint64_t val;
972
1/1
✓ Branch 1 taken 34 times.
34 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
973 34 eventfd_armed_.store(false, std::memory_order_relaxed);
974 34 continue;
975 34 }
976
977
2/2
✓ Branch 0 taken 2835 times.
✓ Branch 1 taken 57007 times.
59842 if (events[i].data.ptr == &timer_fd_)
978 {
979 std::uint64_t expirations;
980
1/1
✓ Branch 1 taken 2835 times.
2835 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
981 2835 check_timers = true;
982 2835 continue;
983 2835 }
984
985 // Deferred I/O: just set ready events and enqueue descriptor
986 // No per-descriptor mutex locking in reactor hot path!
987 57007 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
988 57007 desc->add_ready_events(events[i].events);
989
990 // Only enqueue if not already enqueued
991 57007 bool expected = false;
992
1/2
✓ Branch 1 taken 57007 times.
✗ Branch 2 not taken.
57007 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
993 std::memory_order_release, std::memory_order_relaxed))
994 {
995 57007 local_ops.push(desc);
996 }
997 }
998
999 // Process timers only when timerfd fires
1000
2/2
✓ Branch 0 taken 2835 times.
✓ Branch 1 taken 39428 times.
42263 if (check_timers)
1001 {
1002
1/1
✓ Branch 1 taken 2835 times.
2835 timer_svc_->process_expired();
1003
1/1
✓ Branch 1 taken 2835 times.
2835 update_timerfd();
1004 }
1005
1006
1/1
✓ Branch 1 taken 42263 times.
42263 lock.lock();
1007
1008
2/2
✓ Branch 1 taken 36343 times.
✓ Branch 2 taken 5920 times.
42263 if (!local_ops.empty())
1009 36343 completed_ops_.splice(local_ops);
1010 42263 }
1011
1012 std::size_t
1013 140396 epoll_scheduler::
1014 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1015 {
1016 for (;;)
1017 {
1018
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 182650 times.
182659 if (stopped_)
1019 9 return 0;
1020
1021 182650 scheduler_op* op = completed_ops_.pop();
1022
1023 // Handle reactor sentinel - time to poll for I/O
1024
2/2
✓ Branch 0 taken 42404 times.
✓ Branch 1 taken 140246 times.
182650 if (op == &task_op_)
1025 {
1026 42404 bool more_handlers = !completed_ops_.empty();
1027
1028 // Nothing to run the reactor for: no pending work to wait on,
1029 // or caller requested a non-blocking poll
1030
4/4
✓ Branch 0 taken 5648 times.
✓ Branch 1 taken 36756 times.
✓ Branch 2 taken 141 times.
✓ Branch 3 taken 42263 times.
48052 if (!more_handlers &&
1031
3/4
✓ Branch 1 taken 5507 times.
✓ Branch 2 taken 141 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5507 times.
11296 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1032 timeout_us == 0))
1033 {
1034 141 completed_ops_.push(&task_op_);
1035 141 return 0;
1036 }
1037
1038
3/4
✓ Branch 0 taken 5507 times.
✓ Branch 1 taken 36756 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5507 times.
42263 task_interrupted_ = more_handlers || timeout_us == 0;
1039 42263 task_running_.store(true, std::memory_order_release);
1040
1041
2/2
✓ Branch 0 taken 36756 times.
✓ Branch 1 taken 5507 times.
42263 if (more_handlers)
1042 36756 unlock_and_signal_one(lock);
1043
1044 42263 run_task(lock, ctx);
1045
1046 42263 task_running_.store(false, std::memory_order_relaxed);
1047 42263 completed_ops_.push(&task_op_);
1048 42263 continue;
1049 42263 }
1050
1051 // Handle operation
1052
2/2
✓ Branch 0 taken 140245 times.
✓ Branch 1 taken 1 time.
140246 if (op != nullptr)
1053 {
1054
1/2
✓ Branch 1 taken 140245 times.
✗ Branch 2 not taken.
140245 if (!completed_ops_.empty())
1055
1/1
✓ Branch 1 taken 140245 times.
140245 unlock_and_signal_one(lock);
1056 else
1057 lock.unlock();
1058
1059 140245 work_cleanup on_exit{this, &lock, ctx};
1060
1061
1/1
✓ Branch 1 taken 140245 times.
140245 (*op)();
1062 140245 return 1;
1063 140245 }
1064
1065 // No pending work to wait on, or caller requested non-blocking poll
1066
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 time.
✗ Branch 6 not taken.
2 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1067 timeout_us == 0)
1068 1 return 0;
1069
1070 clear_signal();
1071 if (timeout_us < 0)
1072 wait_for_signal(lock);
1073 else
1074 wait_for_signal_for(lock, timeout_us);
1075 42263 }
1076 }
1077
1078 } // namespace boost::corosio::detail
1079
1080 #endif
1081