libs/corosio/src/corosio/src/detail/epoll/op.hpp

81.4% Lines (79/97) 80.0% Functions (16/20) 50.0% Branches (12/24)
libs/corosio/src/corosio/src/detail/epoll/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/dispatch_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 epoll Operation State
46 =====================
47
48 Each async I/O operation has a corresponding epoll_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 Persistent Registration
54 -----------------------
55 File descriptors are registered with epoll once (via descriptor_state) and
56 stay registered until closed. The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 SIGPIPE when the peer has closed.
77 */
78
79 namespace boost::corosio::detail {
80
81 // Forward declarations
82 class epoll_socket_impl;
83 class epoll_acceptor_impl;
84 struct epoll_op;
85
86 // Forward declaration
87 class epoll_scheduler;
88
89 /** Per-descriptor state for persistent epoll registration.
90
91 Tracks pending operations for a file descriptor. The fd is registered
92 once with epoll and stays registered until closed.
93
94 This struct extends scheduler_op to support deferred I/O processing.
95 When epoll events arrive, the reactor sets ready_events and queues
96 this descriptor for processing. When popped from the scheduler queue,
97 operator() performs the actual I/O and queues completion handlers.
98
99 @par Deferred I/O Model
100 The reactor no longer performs I/O directly. Instead:
101 1. Reactor sets ready_events and queues descriptor_state
102 2. Scheduler pops descriptor_state and calls operator()
103 3. operator() performs I/O under mutex and queues completions
104
105 This eliminates per-descriptor mutex locking from the reactor hot path.
106
107 @par Thread Safety
108 The mutex protects operation pointers and ready flags during I/O.
109 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 */
111 struct descriptor_state : scheduler_op
112 {
113 std::mutex mutex;
114
115 // Protected by mutex
116 epoll_op* read_op = nullptr;
117 epoll_op* write_op = nullptr;
118 epoll_op* connect_op = nullptr;
119
120 // Caches edge events that arrived before an op was registered
121 bool read_ready = false;
122 bool write_ready = false;
123
124 // Deferred cancellation: set by cancel() when the target op is not
125 // parked (e.g. completing inline via speculative I/O). Checked when
126 // the next op parks; if set, the op is immediately self-cancelled.
127 // This matches IOCP semantics where CancelIoEx always succeeds.
128 bool read_cancel_pending = false;
129 bool write_cancel_pending = false;
130
131 // Set during registration only (no mutex needed)
132 std::uint32_t registered_events = 0;
133 int fd = -1;
134
135 // For deferred I/O - set by reactor, read by scheduler
136 std::atomic<std::uint32_t> ready_events_{0};
137 std::atomic<bool> is_enqueued_{false};
138 epoll_scheduler const* scheduler_ = nullptr;
139
140 // Prevents impl destruction while this descriptor_state is queued.
141 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
142 std::shared_ptr<void> impl_ref_;
143
144 /// Add ready events atomically.
145 57007 void add_ready_events(std::uint32_t ev) noexcept
146 {
147 57007 ready_events_.fetch_or(ev, std::memory_order_relaxed);
148 57007 }
149
150 /// Perform deferred I/O and queue completions.
151 void operator()() override;
152
153 /// Destroy without invoking.
154 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
155 /// the self-referential cycle set by close_socket().
156 void destroy() override { impl_ref_.reset(); }
157 };
158
159 struct epoll_op : scheduler_op
160 {
161 struct canceller
162 {
163 epoll_op* op;
164 void operator()() const noexcept;
165 };
166
167 std::coroutine_handle<> h;
168 capy::executor_ref ex;
169 std::error_code* ec_out = nullptr;
170 std::size_t* bytes_out = nullptr;
171
172 int fd = -1;
173 int errn = 0;
174 std::size_t bytes_transferred = 0;
175
176 std::atomic<bool> cancelled{false};
177 std::optional<std::stop_callback<canceller>> stop_cb;
178
179 // Prevents use-after-free when socket is closed with pending ops.
180 // See "Impl Lifetime Management" in file header.
181 std::shared_ptr<void> impl_ptr;
182
183 // For stop_token cancellation - pointer to owning socket/acceptor impl.
184 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
185 epoll_socket_impl* socket_impl_ = nullptr;
186 epoll_acceptor_impl* acceptor_impl_ = nullptr;
187
188 15958 epoll_op() = default;
189
190 232248 void reset() noexcept
191 {
192 232248 fd = -1;
193 232248 errn = 0;
194 232248 bytes_transferred = 0;
195 232248 cancelled.store(false, std::memory_order_relaxed);
196 232248 impl_ptr.reset();
197 232248 socket_impl_ = nullptr;
198 232248 acceptor_impl_ = nullptr;
199 232248 }
200
201 // Defined in sockets.cpp where epoll_socket_impl is complete
202 void operator()() override;
203
204 37724 virtual bool is_read_operation() const noexcept { return false; }
205 virtual void cancel() noexcept = 0;
206
207 void destroy() override
208 {
209 stop_cb.reset();
210 impl_ptr.reset();
211 }
212
213 24640 void request_cancel() noexcept
214 {
215 24640 cancelled.store(true, std::memory_order_release);
216 24640 }
217
218 78372 void start(std::stop_token token, epoll_socket_impl* impl)
219 {
220 78372 cancelled.store(false, std::memory_order_release);
221 78372 stop_cb.reset();
222 78372 socket_impl_ = impl;
223 78372 acceptor_impl_ = nullptr;
224
225
2/2
✓ Branch 1 taken 98 times.
✓ Branch 2 taken 78274 times.
78372 if (token.stop_possible())
226 98 stop_cb.emplace(token, canceller{this});
227 78372 }
228
229 2652 void start(std::stop_token token, epoll_acceptor_impl* impl)
230 {
231 2652 cancelled.store(false, std::memory_order_release);
232 2652 stop_cb.reset();
233 2652 socket_impl_ = nullptr;
234 2652 acceptor_impl_ = impl;
235
236
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2643 times.
2652 if (token.stop_possible())
237 9 stop_cb.emplace(token, canceller{this});
238 2652 }
239
240 80960 void complete(int err, std::size_t bytes) noexcept
241 {
242 80960 errn = err;
243 80960 bytes_transferred = bytes;
244 80960 }
245
246 virtual void perform_io() noexcept {}
247 };
248
249
250 struct epoll_connect_op : epoll_op
251 {
252 endpoint target_endpoint;
253
254 2644 void reset() noexcept
255 {
256 2644 epoll_op::reset();
257 2644 target_endpoint = endpoint{};
258 2644 }
259
260 2644 void perform_io() noexcept override
261 {
262 // connect() completion status is retrieved via SO_ERROR, not return value
263 2644 int err = 0;
264 2644 socklen_t len = sizeof(err);
265
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2644 times.
2644 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
266 err = errno;
267 2644 complete(err, 0);
268 2644 }
269
270 // Defined in sockets.cpp where epoll_socket_impl is complete
271 void operator()() override;
272 void cancel() noexcept override;
273 };
274
275
276 struct epoll_read_op : epoll_op
277 {
278 static constexpr std::size_t max_buffers = 16;
279 iovec iovecs[max_buffers];
280 int iovec_count = 0;
281 bool empty_buffer_read = false;
282
283 37802 bool is_read_operation() const noexcept override
284 {
285 37802 return !empty_buffer_read;
286 }
287
288 113574 void reset() noexcept
289 {
290 113574 epoll_op::reset();
291 113574 iovec_count = 0;
292 113574 empty_buffer_read = false;
293 113574 }
294
295 143 void perform_io() noexcept override
296 {
297 ssize_t n;
298 do {
299 143 n = ::readv(fd, iovecs, iovec_count);
300
3/4
✓ Branch 0 taken 139 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 139 times.
143 } while (n < 0 && errno == EINTR);
301
302
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 139 times.
143 if (n >= 0)
303 4 complete(0, static_cast<std::size_t>(n));
304 else
305 139 complete(errno, 0);
306 143 }
307
308 void cancel() noexcept override;
309 };
310
311
312 struct epoll_write_op : epoll_op
313 {
314 static constexpr std::size_t max_buffers = 16;
315 iovec iovecs[max_buffers];
316 int iovec_count = 0;
317
318 113378 void reset() noexcept
319 {
320 113378 epoll_op::reset();
321 113378 iovec_count = 0;
322 113378 }
323
324 void perform_io() noexcept override
325 {
326 msghdr msg{};
327 msg.msg_iov = iovecs;
328 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
329
330 ssize_t n;
331 do {
332 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
333 } while (n < 0 && errno == EINTR);
334
335 if (n >= 0)
336 complete(0, static_cast<std::size_t>(n));
337 else
338 complete(errno, 0);
339 }
340
341 void cancel() noexcept override;
342 };
343
344
345 struct epoll_accept_op : epoll_op
346 {
347 int accepted_fd = -1;
348 io_object::io_object_impl** impl_out = nullptr;
349 sockaddr_in peer_addr{};
350
351 2652 void reset() noexcept
352 {
353 2652 epoll_op::reset();
354 2652 accepted_fd = -1;
355 2652 impl_out = nullptr;
356 2652 peer_addr = {};
357 2652 }
358
359 2641 void perform_io() noexcept override
360 {
361 2641 socklen_t addrlen = sizeof(peer_addr);
362 int new_fd;
363 do {
364 2641 new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&peer_addr),
365 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
366
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2641 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2641 } while (new_fd < 0 && errno == EINTR);
367
368
1/2
✓ Branch 0 taken 2641 times.
✗ Branch 1 not taken.
2641 if (new_fd >= 0)
369 {
370 2641 accepted_fd = new_fd;
371 2641 complete(0, 0);
372 }
373 else
374 {
375 complete(errno, 0);
376 }
377 2641 }
378
379 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
380 void operator()() override;
381 void cancel() noexcept override;
382 };
383
384 } // namespace boost::corosio::detail
385
386 #endif // BOOST_COROSIO_HAS_EPOLL
387
388 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
389