libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

80.6% Lines (345/428) 94.4% Functions (34/36) 64.5% Branches (149/231)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 // Register an op with the reactor, handling cached edge events.
34 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
35 void
36 2842 epoll_socket_impl::
37 register_op(
38 epoll_op& op,
39 epoll_op*& desc_slot,
40 bool& ready_flag) noexcept
41 {
42 2842 svc_.work_started();
43
44 2842 std::lock_guard lock(desc_state_.mutex);
45 2842 bool io_done = false;
46
2/2
✓ Branch 0 taken 139 times.
✓ Branch 1 taken 2703 times.
2842 if (ready_flag)
47 {
48 139 ready_flag = false;
49 139 op.perform_io();
50
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 139 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
139 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
51
1/2
✓ Branch 0 taken 139 times.
✗ Branch 1 not taken.
139 if (!io_done)
52 139 op.errn = 0;
53 }
54
55 // Check deferred cancellation (read/write only; connect has no pending flag)
56 2842 bool cancel_pending = false;
57
2/2
✓ Branch 0 taken 198 times.
✓ Branch 1 taken 2644 times.
2842 if (&desc_slot == &desc_state_.read_op)
58 198 cancel_pending = std::exchange(desc_state_.read_cancel_pending, false);
59
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2644 times.
2644 else if (&desc_slot == &desc_state_.write_op)
60 cancel_pending = std::exchange(desc_state_.write_cancel_pending, false);
61
62
2/2
✓ Branch 0 taken 93 times.
✓ Branch 1 taken 2749 times.
2842 if (cancel_pending)
63 93 op.cancelled.store(true, std::memory_order_relaxed);
64
65
5/6
✓ Branch 0 taken 2842 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 93 times.
✓ Branch 4 taken 2749 times.
✓ Branch 5 taken 93 times.
✓ Branch 6 taken 2749 times.
2842 if (io_done || op.cancelled.load(std::memory_order_acquire))
66 {
67 93 svc_.post(&op);
68 93 svc_.work_finished();
69 }
70 else
71 {
72 2749 desc_slot = &op;
73 }
74 2842 }
75
76 void
77 103 epoll_op::canceller::
78 operator()() const noexcept
79 {
80 103 op->cancel();
81 103 }
82
83 void
84 epoll_connect_op::
85 cancel() noexcept
86 {
87 if (socket_impl_)
88 socket_impl_->cancel_single_op(*this);
89 else
90 request_cancel();
91 }
92
93 void
94 97 epoll_read_op::
95 cancel() noexcept
96 {
97
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 if (socket_impl_)
98 97 socket_impl_->cancel_single_op(*this);
99 else
100 request_cancel();
101 97 }
102
103 void
104 epoll_write_op::
105 cancel() noexcept
106 {
107 if (socket_impl_)
108 socket_impl_->cancel_single_op(*this);
109 else
110 request_cancel();
111 }
112
113 void
114 75728 epoll_op::
115 operator()()
116 {
117 75728 stop_cb.reset();
118
119 75728 socket_impl_->svc_.scheduler().reset_inline_budget();
120
121
2/2
✓ Branch 1 taken 202 times.
✓ Branch 2 taken 75526 times.
75728 if (cancelled.load(std::memory_order_acquire))
122 202 *ec_out = capy::error::canceled;
123
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 75526 times.
75526 else if (errn != 0)
124 *ec_out = make_err(errn);
125
4/6
✓ Branch 1 taken 37801 times.
✓ Branch 2 taken 37725 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 37801 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 75526 times.
75526 else if (is_read_operation() && bytes_transferred == 0)
126 *ec_out = capy::error::eof;
127 else
128 75526 *ec_out = {};
129
130 75728 *bytes_out = bytes_transferred;
131
132 // Move to stack before resuming coroutine. The coroutine might close
133 // the socket, releasing the last wrapper ref. If impl_ptr were the
134 // last ref and we destroyed it while still in operator(), we'd have
135 // use-after-free. Moving to local ensures destruction happens at
136 // function exit, after all member accesses are complete.
137 75728 capy::executor_ref saved_ex( std::move( ex ) );
138 75728 std::coroutine_handle<> saved_h( std::move( h ) );
139 75728 auto prevent_premature_destruction = std::move(impl_ptr);
140
2/2
✓ Branch 1 taken 75728 times.
✓ Branch 4 taken 75728 times.
75728 dispatch_coro(saved_ex, saved_h).resume();
141 75728 }
142
143 void
144 2644 epoll_connect_op::
145 operator()()
146 {
147 2644 stop_cb.reset();
148
149 2644 socket_impl_->svc_.scheduler().reset_inline_budget();
150
151
3/4
✓ Branch 0 taken 2643 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 2643 times.
✗ Branch 4 not taken.
2644 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
152
153 // Cache endpoints on successful connect
154
3/4
✓ Branch 0 taken 2643 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 2643 times.
✗ Branch 3 not taken.
2644 if (success && socket_impl_)
155 {
156 // Query local endpoint via getsockname (may fail, but remote is always known)
157 2643 endpoint local_ep;
158 2643 sockaddr_in local_addr{};
159 2643 socklen_t local_len = sizeof(local_addr);
160
1/2
✓ Branch 1 taken 2643 times.
✗ Branch 2 not taken.
2643 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
161 2643 local_ep = from_sockaddr_in(local_addr);
162 // Always cache remote endpoint; local may be default if getsockname failed
163 2643 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
164 }
165
166
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2644 times.
2644 if (cancelled.load(std::memory_order_acquire))
167 *ec_out = capy::error::canceled;
168
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2643 times.
2644 else if (errn != 0)
169 1 *ec_out = make_err(errn);
170 else
171 2643 *ec_out = {};
172
173 // Move to stack before resuming. See epoll_op::operator()() for rationale.
174 2644 capy::executor_ref saved_ex( std::move( ex ) );
175 2644 std::coroutine_handle<> saved_h( std::move( h ) );
176 2644 auto prevent_premature_destruction = std::move(impl_ptr);
177
2/2
✓ Branch 1 taken 2644 times.
✓ Branch 4 taken 2644 times.
2644 dispatch_coro(saved_ex, saved_h).resume();
178 2644 }
179
180 5298 epoll_socket_impl::
181 5298 epoll_socket_impl(epoll_socket_service& svc) noexcept
182 5298 : svc_(svc)
183 {
184 5298 }
185
186 5298 epoll_socket_impl::
187 ~epoll_socket_impl() = default;
188
189 void
190 5298 epoll_socket_impl::
191 release()
192 {
193 5298 close_socket();
194 5298 svc_.destroy_impl(*this);
195 5298 }
196
197 std::coroutine_handle<>
198 2644 epoll_socket_impl::
199 connect(
200 std::coroutine_handle<> h,
201 capy::executor_ref ex,
202 endpoint ep,
203 std::stop_token token,
204 std::error_code* ec)
205 {
206 2644 auto& op = conn_;
207
208 2644 sockaddr_in addr = detail::to_sockaddr_in(ep);
209
1/1
✓ Branch 1 taken 2644 times.
2644 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
210
211
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2644 times.
2644 if (result == 0)
212 {
213 sockaddr_in local_addr{};
214 socklen_t local_len = sizeof(local_addr);
215 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
216 local_endpoint_ = detail::from_sockaddr_in(local_addr);
217 remote_endpoint_ = ep;
218 }
219
220
2/4
✓ Branch 0 taken 2644 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2644 times.
2644 if (result == 0 || errno != EINPROGRESS)
221 {
222 int err = (result < 0) ? errno : 0;
223 if (svc_.scheduler().try_consume_inline_budget())
224 {
225 *ec = err ? make_err(err) : std::error_code{};
226 return ex.dispatch(h);
227 }
228 op.reset();
229 op.h = h;
230 op.ex = ex;
231 op.ec_out = ec;
232 op.fd = fd_;
233 op.target_endpoint = ep;
234 op.start(token, this);
235 op.impl_ptr = shared_from_this();
236 op.complete(err, 0);
237 svc_.post(&op);
238 return std::noop_coroutine();
239 }
240
241 // EINPROGRESS — register with reactor
242 2644 op.reset();
243 2644 op.h = h;
244 2644 op.ex = ex;
245 2644 op.ec_out = ec;
246 2644 op.fd = fd_;
247 2644 op.target_endpoint = ep;
248 2644 op.start(token, this);
249
1/1
✓ Branch 1 taken 2644 times.
2644 op.impl_ptr = shared_from_this();
250
251 2644 register_op(op, desc_state_.connect_op, desc_state_.write_ready);
252 2644 return std::noop_coroutine();
253 }
254
255 std::coroutine_handle<>
256 113574 epoll_socket_impl::
257 read_some(
258 std::coroutine_handle<> h,
259 capy::executor_ref ex,
260 io_buffer_param param,
261 std::stop_token token,
262 std::error_code* ec,
263 std::size_t* bytes_out)
264 {
265 113574 auto& op = rd_;
266 113574 op.reset();
267
268 113574 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
269 113574 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
270
271
6/8
✓ Branch 0 taken 113573 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 113573 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 113573 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 113573 times.
113574 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
272 {
273 1 op.empty_buffer_read = true;
274 1 op.h = h;
275 1 op.ex = ex;
276 1 op.ec_out = ec;
277 1 op.bytes_out = bytes_out;
278 1 op.start(token, this);
279
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
280 1 op.complete(0, 0);
281
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
282 1 return std::noop_coroutine();
283 }
284
285
2/2
✓ Branch 0 taken 113573 times.
✓ Branch 1 taken 113573 times.
227146 for (int i = 0; i < op.iovec_count; ++i)
286 {
287 113573 op.iovecs[i].iov_base = bufs[i].data();
288 113573 op.iovecs[i].iov_len = bufs[i].size();
289 }
290
291 // Speculative read
292 ssize_t n;
293 do {
294
1/1
✓ Branch 1 taken 113573 times.
113573 n = ::readv(fd_, op.iovecs, op.iovec_count);
295
3/4
✓ Branch 0 taken 198 times.
✓ Branch 1 taken 113375 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 198 times.
113573 } while (n < 0 && errno == EINTR);
296
297
3/6
✓ Branch 0 taken 198 times.
✓ Branch 1 taken 113375 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 198 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
113573 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
298 {
299
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 113375 times.
113375 int err = (n < 0) ? errno : 0;
300 113375 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
301
302
2/2
✓ Branch 2 taken 75574 times.
✓ Branch 3 taken 37801 times.
113375 if (svc_.scheduler().try_consume_inline_budget())
303 {
304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 75574 times.
75574 if (err)
305 *ec = make_err(err);
306
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 75569 times.
75574 else if (n == 0)
307 5 *ec = capy::error::eof;
308 else
309 75569 *ec = {};
310 75574 *bytes_out = bytes;
311
1/1
✓ Branch 1 taken 75574 times.
75574 return ex.dispatch(h);
312 }
313 37801 op.h = h;
314 37801 op.ex = ex;
315 37801 op.ec_out = ec;
316 37801 op.bytes_out = bytes_out;
317 37801 op.start(token, this);
318
1/1
✓ Branch 1 taken 37801 times.
37801 op.impl_ptr = shared_from_this();
319 37801 op.complete(err, bytes);
320
1/1
✓ Branch 1 taken 37801 times.
37801 svc_.post(&op);
321 37801 return std::noop_coroutine();
322 }
323
324 // EAGAIN — register with reactor
325 198 op.h = h;
326 198 op.ex = ex;
327 198 op.ec_out = ec;
328 198 op.bytes_out = bytes_out;
329 198 op.fd = fd_;
330 198 op.start(token, this);
331
1/1
✓ Branch 1 taken 198 times.
198 op.impl_ptr = shared_from_this();
332
333 198 register_op(op, desc_state_.read_op, desc_state_.read_ready);
334 198 return std::noop_coroutine();
335 }
336
337 std::coroutine_handle<>
338 113378 epoll_socket_impl::
339 write_some(
340 std::coroutine_handle<> h,
341 capy::executor_ref ex,
342 io_buffer_param param,
343 std::stop_token token,
344 std::error_code* ec,
345 std::size_t* bytes_out)
346 {
347 113378 auto& op = wr_;
348 113378 op.reset();
349
350 113378 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
351 113378 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
352
353
6/8
✓ Branch 0 taken 113377 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 113377 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 113377 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 113377 times.
113378 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
354 {
355 1 op.h = h;
356 1 op.ex = ex;
357 1 op.ec_out = ec;
358 1 op.bytes_out = bytes_out;
359 1 op.start(token, this);
360
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
361 1 op.complete(0, 0);
362
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
363 1 return std::noop_coroutine();
364 }
365
366
2/2
✓ Branch 0 taken 113377 times.
✓ Branch 1 taken 113377 times.
226754 for (int i = 0; i < op.iovec_count; ++i)
367 {
368 113377 op.iovecs[i].iov_base = bufs[i].data();
369 113377 op.iovecs[i].iov_len = bufs[i].size();
370 }
371
372 // Speculative write
373 113377 msghdr msg{};
374 113377 msg.msg_iov = op.iovecs;
375 113377 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
376
377 ssize_t n;
378 do {
379
1/1
✓ Branch 1 taken 113377 times.
113377 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
380
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 113376 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
113377 } while (n < 0 && errno == EINTR);
381
382
4/6
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 113376 times.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 time.
✗ Branch 5 not taken.
113377 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
383 {
384
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 113376 times.
113377 int err = (n < 0) ? errno : 0;
385 113377 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
386
387
2/2
✓ Branch 2 taken 75650 times.
✓ Branch 3 taken 37727 times.
113377 if (svc_.scheduler().try_consume_inline_budget())
388 {
389
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 75649 times.
75650 *ec = err ? make_err(err) : std::error_code{};
390 75650 *bytes_out = bytes;
391
1/1
✓ Branch 1 taken 75650 times.
75650 return ex.dispatch(h);
392 }
393 37727 op.h = h;
394 37727 op.ex = ex;
395 37727 op.ec_out = ec;
396 37727 op.bytes_out = bytes_out;
397 37727 op.start(token, this);
398
1/1
✓ Branch 1 taken 37727 times.
37727 op.impl_ptr = shared_from_this();
399 37727 op.complete(err, bytes);
400
1/1
✓ Branch 1 taken 37727 times.
37727 svc_.post(&op);
401 37727 return std::noop_coroutine();
402 }
403
404 // EAGAIN — register with reactor
405 op.h = h;
406 op.ex = ex;
407 op.ec_out = ec;
408 op.bytes_out = bytes_out;
409 op.fd = fd_;
410 op.start(token, this);
411 op.impl_ptr = shared_from_this();
412
413 register_op(op, desc_state_.write_op, desc_state_.write_ready);
414 return std::noop_coroutine();
415 }
416
417 std::error_code
418 3 epoll_socket_impl::
419 shutdown(tcp_socket::shutdown_type what) noexcept
420 {
421 int how;
422
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
423 {
424 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
425 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
426 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
427 default:
428 return make_err(EINVAL);
429 }
430
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
431 return make_err(errno);
432 3 return {};
433 }
434
435 std::error_code
436 5 epoll_socket_impl::
437 set_no_delay(bool value) noexcept
438 {
439
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
440
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
441 return make_err(errno);
442 5 return {};
443 }
444
445 bool
446 5 epoll_socket_impl::
447 no_delay(std::error_code& ec) const noexcept
448 {
449 5 int flag = 0;
450 5 socklen_t len = sizeof(flag);
451
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
452 {
453 ec = make_err(errno);
454 return false;
455 }
456 5 ec = {};
457 5 return flag != 0;
458 }
459
460 std::error_code
461 4 epoll_socket_impl::
462 set_keep_alive(bool value) noexcept
463 {
464
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
465
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
466 return make_err(errno);
467 4 return {};
468 }
469
470 bool
471 4 epoll_socket_impl::
472 keep_alive(std::error_code& ec) const noexcept
473 {
474 4 int flag = 0;
475 4 socklen_t len = sizeof(flag);
476
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
477 {
478 ec = make_err(errno);
479 return false;
480 }
481 4 ec = {};
482 4 return flag != 0;
483 }
484
485 std::error_code
486 1 epoll_socket_impl::
487 set_receive_buffer_size(int size) noexcept
488 {
489
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
490 return make_err(errno);
491 1 return {};
492 }
493
494 int
495 3 epoll_socket_impl::
496 receive_buffer_size(std::error_code& ec) const noexcept
497 {
498 3 int size = 0;
499 3 socklen_t len = sizeof(size);
500
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
501 {
502 ec = make_err(errno);
503 return 0;
504 }
505 3 ec = {};
506 3 return size;
507 }
508
509 std::error_code
510 1 epoll_socket_impl::
511 set_send_buffer_size(int size) noexcept
512 {
513
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
514 return make_err(errno);
515 1 return {};
516 }
517
518 int
519 3 epoll_socket_impl::
520 send_buffer_size(std::error_code& ec) const noexcept
521 {
522 3 int size = 0;
523 3 socklen_t len = sizeof(size);
524
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
525 {
526 ec = make_err(errno);
527 return 0;
528 }
529 3 ec = {};
530 3 return size;
531 }
532
533 std::error_code
534 8 epoll_socket_impl::
535 set_linger(bool enabled, int timeout) noexcept
536 {
537
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 7 times.
8 if (timeout < 0)
538 1 return make_err(EINVAL);
539 struct ::linger lg;
540
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 1 time.
7 lg.l_onoff = enabled ? 1 : 0;
541 7 lg.l_linger = timeout;
542
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
543 return make_err(errno);
544 7 return {};
545 }
546
547 tcp_socket::linger_options
548 3 epoll_socket_impl::
549 linger(std::error_code& ec) const noexcept
550 {
551 3 struct ::linger lg{};
552 3 socklen_t len = sizeof(lg);
553
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
554 {
555 ec = make_err(errno);
556 return {};
557 }
558 3 ec = {};
559 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
560 }
561
562 void
563 8136 epoll_socket_impl::
564 cancel() noexcept
565 {
566 8136 std::shared_ptr<epoll_socket_impl> self;
567 try {
568
1/1
✓ Branch 1 taken 8136 times.
8136 self = shared_from_this();
569 } catch (const std::bad_weak_ptr&) {
570 return;
571 }
572
573 8136 conn_.request_cancel();
574 8136 rd_.request_cancel();
575 8136 wr_.request_cancel();
576
577 8136 epoll_op* conn_claimed = nullptr;
578 8136 epoll_op* rd_claimed = nullptr;
579 8136 epoll_op* wr_claimed = nullptr;
580 {
581 8136 std::lock_guard lock(desc_state_.mutex);
582
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8136 times.
8136 if (desc_state_.connect_op == &conn_)
583 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
584
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 8132 times.
8136 if (desc_state_.read_op == &rd_)
585 4 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
586 else
587 8132 desc_state_.read_cancel_pending = true;
588
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8136 times.
8136 if (desc_state_.write_op == &wr_)
589 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
590 else
591 8136 desc_state_.write_cancel_pending = true;
592 8136 }
593
594
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8136 times.
8136 if (conn_claimed)
595 {
596 conn_.impl_ptr = self;
597 svc_.post(&conn_);
598 svc_.work_finished();
599 }
600
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 8132 times.
8136 if (rd_claimed)
601 {
602 4 rd_.impl_ptr = self;
603 4 svc_.post(&rd_);
604 4 svc_.work_finished();
605 }
606
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8136 times.
8136 if (wr_claimed)
607 {
608 wr_.impl_ptr = self;
609 svc_.post(&wr_);
610 svc_.work_finished();
611 }
612 8136 }
613
614 void
615 97 epoll_socket_impl::
616 cancel_single_op(epoll_op& op) noexcept
617 {
618 97 op.request_cancel();
619
620 97 epoll_op** desc_op_ptr = nullptr;
621
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 97 times.
97 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
622
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
623 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
624
625
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 if (desc_op_ptr)
626 {
627 97 epoll_op* claimed = nullptr;
628 {
629 97 std::lock_guard lock(desc_state_.mutex);
630
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 if (*desc_op_ptr == &op)
631 97 claimed = std::exchange(*desc_op_ptr, nullptr);
632 else if (&op == &rd_)
633 desc_state_.read_cancel_pending = true;
634 else if (&op == &wr_)
635 desc_state_.write_cancel_pending = true;
636 97 }
637
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 if (claimed)
638 {
639 try {
640
1/1
✓ Branch 1 taken 97 times.
97 op.impl_ptr = shared_from_this();
641 } catch (const std::bad_weak_ptr&) {}
642 97 svc_.post(&op);
643 97 svc_.work_finished();
644 }
645 }
646 97 }
647
648 void
649 7953 epoll_socket_impl::
650 close_socket() noexcept
651 {
652 7953 cancel();
653
654 // Keep impl alive if descriptor_state is queued in the scheduler.
655 // Without this, destroy_impl() drops the last shared_ptr while
656 // the queued descriptor_state node would become dangling.
657
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 7946 times.
7953 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
658 {
659 try {
660
1/1
✓ Branch 1 taken 7 times.
7 desc_state_.impl_ref_ = shared_from_this();
661 } catch (std::bad_weak_ptr const&) {}
662 }
663
664
2/2
✓ Branch 0 taken 5298 times.
✓ Branch 1 taken 2655 times.
7953 if (fd_ >= 0)
665 {
666
1/2
✓ Branch 0 taken 5298 times.
✗ Branch 1 not taken.
5298 if (desc_state_.registered_events != 0)
667 5298 svc_.scheduler().deregister_descriptor(fd_);
668 5298 ::close(fd_);
669 5298 fd_ = -1;
670 }
671
672 7953 desc_state_.fd = -1;
673 {
674 7953 std::lock_guard lock(desc_state_.mutex);
675 7953 desc_state_.read_op = nullptr;
676 7953 desc_state_.write_op = nullptr;
677 7953 desc_state_.connect_op = nullptr;
678 7953 desc_state_.read_ready = false;
679 7953 desc_state_.write_ready = false;
680 7953 desc_state_.read_cancel_pending = false;
681 7953 desc_state_.write_cancel_pending = false;
682 7953 }
683 7953 desc_state_.registered_events = 0;
684
685 7953 local_endpoint_ = endpoint{};
686 7953 remote_endpoint_ = endpoint{};
687 7953 }
688
689 190 epoll_socket_service::
690 190 epoll_socket_service(capy::execution_context& ctx)
691
2/2
✓ Branch 2 taken 190 times.
✓ Branch 5 taken 190 times.
190 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
692 {
693 190 }
694
695 380 epoll_socket_service::
696 190 ~epoll_socket_service()
697 {
698 380 }
699
700 void
701 190 epoll_socket_service::
702 shutdown()
703 {
704
1/1
✓ Branch 2 taken 190 times.
190 std::lock_guard lock(state_->mutex_);
705
706
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 190 times.
190 while (auto* impl = state_->socket_list_.pop_front())
707 impl->close_socket();
708
709 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
710 // drains completed_ops_, calling destroy() on each queued op. If we
711 // released our shared_ptrs now, an epoll_op::destroy() could free the
712 // last ref to an impl whose embedded descriptor_state is still linked
713 // in the queue — use-after-free on the next pop(). Letting ~state_
714 // release the ptrs (during service destruction, after scheduler
715 // shutdown) keeps every impl alive until all ops have been drained.
716 190 }
717
718 tcp_socket::socket_impl&
719 5298 epoll_socket_service::
720 create_impl()
721 {
722
1/1
✓ Branch 1 taken 5298 times.
5298 auto impl = std::make_shared<epoll_socket_impl>(*this);
723 5298 auto* raw = impl.get();
724
725 {
726
1/1
✓ Branch 2 taken 5298 times.
5298 std::lock_guard lock(state_->mutex_);
727 5298 state_->socket_list_.push_back(raw);
728
1/1
✓ Branch 3 taken 5298 times.
5298 state_->socket_ptrs_.emplace(raw, std::move(impl));
729 5298 }
730
731 5298 return *raw;
732 5298 }
733
734 void
735 5298 epoll_socket_service::
736 destroy_impl(tcp_socket::socket_impl& impl)
737 {
738 5298 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
739
1/1
✓ Branch 2 taken 5298 times.
5298 std::lock_guard lock(state_->mutex_);
740 5298 state_->socket_list_.remove(epoll_impl);
741
1/1
✓ Branch 2 taken 5298 times.
5298 state_->socket_ptrs_.erase(epoll_impl);
742 5298 }
743
744 std::error_code
745 2655 epoll_socket_service::
746 open_socket(tcp_socket::socket_impl& impl)
747 {
748 2655 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
749 2655 epoll_impl->close_socket();
750
751 2655 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
752
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2655 times.
2655 if (fd < 0)
753 return make_err(errno);
754
755 2655 epoll_impl->fd_ = fd;
756
757 // Register fd with epoll (edge-triggered mode)
758 2655 epoll_impl->desc_state_.fd = fd;
759 {
760
1/1
✓ Branch 1 taken 2655 times.
2655 std::lock_guard lock(epoll_impl->desc_state_.mutex);
761 2655 epoll_impl->desc_state_.read_op = nullptr;
762 2655 epoll_impl->desc_state_.write_op = nullptr;
763 2655 epoll_impl->desc_state_.connect_op = nullptr;
764 2655 }
765 2655 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
766
767 2655 return {};
768 }
769
770 void
771 75724 epoll_socket_service::
772 post(epoll_op* op)
773 {
774 75724 state_->sched_.post(op);
775 75724 }
776
777 void
778 2842 epoll_socket_service::
779 work_started() noexcept
780 {
781 2842 state_->sched_.work_started();
782 2842 }
783
784 void
785 194 epoll_socket_service::
786 work_finished() noexcept
787 {
788 194 state_->sched_.work_finished();
789 194 }
790
791 } // namespace boost::corosio::detail
792
793 #endif // BOOST_COROSIO_HAS_EPOLL
794