libs/corosio/src/corosio/src/detail/select/sockets.cpp

73.4% Lines (273/372) 94.1% Functions (32/34) 57.4% Branches (113/197)
libs/corosio/src/corosio/src/detail/select/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/dispatch_coro.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <boost/capy/buffers.hpp>
20
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27
28 namespace boost::corosio::detail {
29
30 void
31 97 select_op::canceller::
32 operator()() const noexcept
33 {
34 97 op->cancel();
35 97 }
36
37 void
38 select_connect_op::
39 cancel() noexcept
40 {
41 if (socket_impl_)
42 socket_impl_->cancel_single_op(*this);
43 else
44 request_cancel();
45 }
46
47 void
48 97 select_read_op::
49 cancel() noexcept
50 {
51
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 if (socket_impl_)
52 97 socket_impl_->cancel_single_op(*this);
53 else
54 request_cancel();
55 97 }
56
57 void
58 select_write_op::
59 cancel() noexcept
60 {
61 if (socket_impl_)
62 socket_impl_->cancel_single_op(*this);
63 else
64 request_cancel();
65 }
66
67 void
68 2036 select_connect_op::
69 operator()()
70 {
71 2036 stop_cb.reset();
72
73
3/4
✓ Branch 0 taken 2034 times.
✓ Branch 1 taken 2 times.
✓ Branch 3 taken 2034 times.
✗ Branch 4 not taken.
2036 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
74
75 // Cache endpoints on successful connect
76
3/4
✓ Branch 0 taken 2034 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2034 times.
✗ Branch 3 not taken.
2036 if (success && socket_impl_)
77 {
78 // Query local endpoint via getsockname (may fail, but remote is always known)
79 2034 endpoint local_ep;
80 2034 sockaddr_in local_addr{};
81 2034 socklen_t local_len = sizeof(local_addr);
82
1/2
✓ Branch 1 taken 2034 times.
✗ Branch 2 not taken.
2034 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
83 2034 local_ep = from_sockaddr_in(local_addr);
84 // Always cache remote endpoint; local may be default if getsockname failed
85 2034 static_cast<select_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
86 }
87
88
1/2
✓ Branch 0 taken 2036 times.
✗ Branch 1 not taken.
2036 if (ec_out)
89 {
90
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2036 times.
2036 if (cancelled.load(std::memory_order_acquire))
91 *ec_out = capy::error::canceled;
92
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2034 times.
2036 else if (errn != 0)
93 2 *ec_out = make_err(errn);
94 else
95 2034 *ec_out = {};
96 }
97
98
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2036 times.
2036 if (bytes_out)
99 *bytes_out = bytes_transferred;
100
101 // Move to stack before destroying the frame
102 2036 capy::executor_ref saved_ex( std::move( ex ) );
103 2036 std::coroutine_handle<> saved_h( std::move( h ) );
104 2036 impl_ptr.reset();
105
2/2
✓ Branch 1 taken 2036 times.
✓ Branch 4 taken 2036 times.
2036 dispatch_coro(saved_ex, saved_h).resume();
106 2036 }
107
108 4081 select_socket_impl::
109 4081 select_socket_impl(select_socket_service& svc) noexcept
110 4081 : svc_(svc)
111 {
112 4081 }
113
114 void
115 4081 select_socket_impl::
116 release()
117 {
118 4081 close_socket();
119 4081 svc_.destroy_impl(*this);
120 4081 }
121
122 std::coroutine_handle<>
123 2036 select_socket_impl::
124 connect(
125 std::coroutine_handle<> h,
126 capy::executor_ref ex,
127 endpoint ep,
128 std::stop_token token,
129 std::error_code* ec)
130 {
131 2036 auto& op = conn_;
132 2036 op.reset();
133 2036 op.h = h;
134 2036 op.ex = ex;
135 2036 op.ec_out = ec;
136 2036 op.fd = fd_;
137 2036 op.target_endpoint = ep; // Store target for endpoint caching
138 2036 op.start(token, this);
139
140 2036 sockaddr_in addr = detail::to_sockaddr_in(ep);
141
1/1
✓ Branch 1 taken 2036 times.
2036 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
142
143
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2036 times.
2036 if (result == 0)
144 {
145 // Sync success - cache endpoints immediately
146 sockaddr_in local_addr{};
147 socklen_t local_len = sizeof(local_addr);
148 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
149 local_endpoint_ = detail::from_sockaddr_in(local_addr);
150 remote_endpoint_ = ep;
151
152 op.complete(0, 0);
153 op.impl_ptr = shared_from_this();
154 svc_.post(&op);
155 // completion is always posted to scheduler queue, never inline.
156 return std::noop_coroutine();
157 }
158
159
1/2
✓ Branch 0 taken 2036 times.
✗ Branch 1 not taken.
2036 if (errno == EINPROGRESS)
160 {
161 2036 svc_.work_started();
162
1/1
✓ Branch 1 taken 2036 times.
2036 op.impl_ptr = shared_from_this();
163
164 // Set registering BEFORE register_fd to close the race window where
165 // reactor sees an event before we set registered. The reactor treats
166 // registering the same as registered when claiming the op.
167 2036 op.registered.store(select_registration_state::registering, std::memory_order_release);
168
1/1
✓ Branch 2 taken 2036 times.
2036 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
169
170 // Transition to registered. If this fails, reactor or cancel already
171 // claimed the op (state is now unregistered), so we're done. However,
172 // we must still deregister the fd because cancel's deregister_fd may
173 // have run before our register_fd, leaving the fd orphaned.
174 2036 auto expected = select_registration_state::registering;
175
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2036 times.
2036 if (!op.registered.compare_exchange_strong(
176 expected, select_registration_state::registered, std::memory_order_acq_rel))
177 {
178 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
179 // completion is always posted to scheduler queue, never inline.
180 return std::noop_coroutine();
181 }
182
183 // If cancelled was set before we registered, handle it now.
184
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2036 times.
2036 if (op.cancelled.load(std::memory_order_acquire))
185 {
186 auto prev = op.registered.exchange(
187 select_registration_state::unregistered, std::memory_order_acq_rel);
188 if (prev != select_registration_state::unregistered)
189 {
190 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
191 op.impl_ptr = shared_from_this();
192 svc_.post(&op);
193 svc_.work_finished();
194 }
195 }
196 // completion is always posted to scheduler queue, never inline.
197 2036 return std::noop_coroutine();
198 }
199
200 op.complete(errno, 0);
201 op.impl_ptr = shared_from_this();
202 svc_.post(&op);
203 // completion is always posted to scheduler queue, never inline.
204 return std::noop_coroutine();
205 }
206
207 std::coroutine_handle<>
208 89127 select_socket_impl::
209 read_some(
210 std::coroutine_handle<> h,
211 capy::executor_ref ex,
212 io_buffer_param param,
213 std::stop_token token,
214 std::error_code* ec,
215 std::size_t* bytes_out)
216 {
217 89127 auto& op = rd_;
218 89127 op.reset();
219 89127 op.h = h;
220 89127 op.ex = ex;
221 89127 op.ec_out = ec;
222 89127 op.bytes_out = bytes_out;
223 89127 op.fd = fd_;
224 89127 op.start(token, this);
225
226 89127 capy::mutable_buffer bufs[select_read_op::max_buffers];
227 89127 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
228
229
6/8
✓ Branch 0 taken 89126 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 89126 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 89126 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 89126 times.
89127 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
230 {
231 1 op.empty_buffer_read = true;
232 1 op.complete(0, 0);
233
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
234
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
235 1 return std::noop_coroutine();
236 }
237
238
2/2
✓ Branch 0 taken 89126 times.
✓ Branch 1 taken 89126 times.
178252 for (int i = 0; i < op.iovec_count; ++i)
239 {
240 89126 op.iovecs[i].iov_base = bufs[i].data();
241 89126 op.iovecs[i].iov_len = bufs[i].size();
242 }
243
244
1/1
✓ Branch 1 taken 89126 times.
89126 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
245
246
2/2
✓ Branch 0 taken 88842 times.
✓ Branch 1 taken 284 times.
89126 if (n > 0)
247 {
248 88842 op.complete(0, static_cast<std::size_t>(n));
249
1/1
✓ Branch 1 taken 88842 times.
88842 op.impl_ptr = shared_from_this();
250
1/1
✓ Branch 1 taken 88842 times.
88842 svc_.post(&op);
251 88842 return std::noop_coroutine();
252 }
253
254
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 279 times.
284 if (n == 0)
255 {
256 5 op.complete(0, 0);
257
1/1
✓ Branch 1 taken 5 times.
5 op.impl_ptr = shared_from_this();
258
1/1
✓ Branch 1 taken 5 times.
5 svc_.post(&op);
259 5 return std::noop_coroutine();
260 }
261
262
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 279 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
279 if (errno == EAGAIN || errno == EWOULDBLOCK)
263 {
264 279 svc_.work_started();
265
1/1
✓ Branch 1 taken 279 times.
279 op.impl_ptr = shared_from_this();
266
267 // Set registering BEFORE register_fd to close the race window where
268 // reactor sees an event before we set registered.
269 279 op.registered.store(select_registration_state::registering, std::memory_order_release);
270
1/1
✓ Branch 2 taken 279 times.
279 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
271
272 // Transition to registered. If this fails, reactor or cancel already
273 // claimed the op (state is now unregistered), so we're done. However,
274 // we must still deregister the fd because cancel's deregister_fd may
275 // have run before our register_fd, leaving the fd orphaned.
276 279 auto expected = select_registration_state::registering;
277
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 279 times.
279 if (!op.registered.compare_exchange_strong(
278 expected, select_registration_state::registered, std::memory_order_acq_rel))
279 {
280 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
281 return std::noop_coroutine();
282 }
283
284 // If cancelled was set before we registered, handle it now.
285
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 279 times.
279 if (op.cancelled.load(std::memory_order_acquire))
286 {
287 auto prev = op.registered.exchange(
288 select_registration_state::unregistered, std::memory_order_acq_rel);
289 if (prev != select_registration_state::unregistered)
290 {
291 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
292 op.impl_ptr = shared_from_this();
293 svc_.post(&op);
294 svc_.work_finished();
295 }
296 }
297 279 return std::noop_coroutine();
298 }
299
300 op.complete(errno, 0);
301 op.impl_ptr = shared_from_this();
302 svc_.post(&op);
303 return std::noop_coroutine();
304 }
305
306 std::coroutine_handle<>
307 88966 select_socket_impl::
308 write_some(
309 std::coroutine_handle<> h,
310 capy::executor_ref ex,
311 io_buffer_param param,
312 std::stop_token token,
313 std::error_code* ec,
314 std::size_t* bytes_out)
315 {
316 88966 auto& op = wr_;
317 88966 op.reset();
318 88966 op.h = h;
319 88966 op.ex = ex;
320 88966 op.ec_out = ec;
321 88966 op.bytes_out = bytes_out;
322 88966 op.fd = fd_;
323 88966 op.start(token, this);
324
325 88966 capy::mutable_buffer bufs[select_write_op::max_buffers];
326 88966 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
327
328
6/8
✓ Branch 0 taken 88965 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 88965 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 88965 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 88965 times.
88966 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
329 {
330 1 op.complete(0, 0);
331
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
332
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
333 1 return std::noop_coroutine();
334 }
335
336
2/2
✓ Branch 0 taken 88965 times.
✓ Branch 1 taken 88965 times.
177930 for (int i = 0; i < op.iovec_count; ++i)
337 {
338 88965 op.iovecs[i].iov_base = bufs[i].data();
339 88965 op.iovecs[i].iov_len = bufs[i].size();
340 }
341
342 88965 msghdr msg{};
343 88965 msg.msg_iov = op.iovecs;
344 88965 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
345
346
1/1
✓ Branch 1 taken 88965 times.
88965 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
347
348
2/2
✓ Branch 0 taken 88964 times.
✓ Branch 1 taken 1 time.
88965 if (n > 0)
349 {
350 88964 op.complete(0, static_cast<std::size_t>(n));
351
1/1
✓ Branch 1 taken 88964 times.
88964 op.impl_ptr = shared_from_this();
352
1/1
✓ Branch 1 taken 88964 times.
88964 svc_.post(&op);
353 88964 return std::noop_coroutine();
354 }
355
356
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
357 {
358 svc_.work_started();
359 op.impl_ptr = shared_from_this();
360
361 // Set registering BEFORE register_fd to close the race window where
362 // reactor sees an event before we set registered.
363 op.registered.store(select_registration_state::registering, std::memory_order_release);
364 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
365
366 // Transition to registered. If this fails, reactor or cancel already
367 // claimed the op (state is now unregistered), so we're done. However,
368 // we must still deregister the fd because cancel's deregister_fd may
369 // have run before our register_fd, leaving the fd orphaned.
370 auto expected = select_registration_state::registering;
371 if (!op.registered.compare_exchange_strong(
372 expected, select_registration_state::registered, std::memory_order_acq_rel))
373 {
374 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
375 return std::noop_coroutine();
376 }
377
378 // If cancelled was set before we registered, handle it now.
379 if (op.cancelled.load(std::memory_order_acquire))
380 {
381 auto prev = op.registered.exchange(
382 select_registration_state::unregistered, std::memory_order_acq_rel);
383 if (prev != select_registration_state::unregistered)
384 {
385 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
386 op.impl_ptr = shared_from_this();
387 svc_.post(&op);
388 svc_.work_finished();
389 }
390 }
391 return std::noop_coroutine();
392 }
393
394
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
395
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
396
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
397 1 return std::noop_coroutine();
398 }
399
400 std::error_code
401 3 select_socket_impl::
402 shutdown(tcp_socket::shutdown_type what) noexcept
403 {
404 int how;
405
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
406 {
407 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
408 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
409 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
410 default:
411 return make_err(EINVAL);
412 }
413
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
414 return make_err(errno);
415 3 return {};
416 }
417
418 std::error_code
419 5 select_socket_impl::
420 set_no_delay(bool value) noexcept
421 {
422
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
423
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
424 return make_err(errno);
425 5 return {};
426 }
427
428 bool
429 5 select_socket_impl::
430 no_delay(std::error_code& ec) const noexcept
431 {
432 5 int flag = 0;
433 5 socklen_t len = sizeof(flag);
434
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
435 {
436 ec = make_err(errno);
437 return false;
438 }
439 5 ec = {};
440 5 return flag != 0;
441 }
442
443 std::error_code
444 4 select_socket_impl::
445 set_keep_alive(bool value) noexcept
446 {
447
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
448
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
449 return make_err(errno);
450 4 return {};
451 }
452
453 bool
454 4 select_socket_impl::
455 keep_alive(std::error_code& ec) const noexcept
456 {
457 4 int flag = 0;
458 4 socklen_t len = sizeof(flag);
459
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
460 {
461 ec = make_err(errno);
462 return false;
463 }
464 4 ec = {};
465 4 return flag != 0;
466 }
467
468 std::error_code
469 1 select_socket_impl::
470 set_receive_buffer_size(int size) noexcept
471 {
472
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
473 return make_err(errno);
474 1 return {};
475 }
476
477 int
478 3 select_socket_impl::
479 receive_buffer_size(std::error_code& ec) const noexcept
480 {
481 3 int size = 0;
482 3 socklen_t len = sizeof(size);
483
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
484 {
485 ec = make_err(errno);
486 return 0;
487 }
488 3 ec = {};
489 3 return size;
490 }
491
492 std::error_code
493 1 select_socket_impl::
494 set_send_buffer_size(int size) noexcept
495 {
496
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
497 return make_err(errno);
498 1 return {};
499 }
500
501 int
502 3 select_socket_impl::
503 send_buffer_size(std::error_code& ec) const noexcept
504 {
505 3 int size = 0;
506 3 socklen_t len = sizeof(size);
507
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
508 {
509 ec = make_err(errno);
510 return 0;
511 }
512 3 ec = {};
513 3 return size;
514 }
515
516 std::error_code
517 4 select_socket_impl::
518 set_linger(bool enabled, int timeout) noexcept
519 {
520
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
521 1 return make_err(EINVAL);
522 struct ::linger lg;
523
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
524 3 lg.l_linger = timeout;
525
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
526 return make_err(errno);
527 3 return {};
528 }
529
530 tcp_socket::linger_options
531 3 select_socket_impl::
532 linger(std::error_code& ec) const noexcept
533 {
534 3 struct ::linger lg{};
535 3 socklen_t len = sizeof(lg);
536
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
537 {
538 ec = make_err(errno);
539 return {};
540 }
541 3 ec = {};
542 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
543 }
544
545 void
546 6303 select_socket_impl::
547 cancel() noexcept
548 {
549 6303 std::shared_ptr<select_socket_impl> self;
550 try {
551
1/1
✓ Branch 1 taken 6303 times.
6303 self = shared_from_this();
552 } catch (const std::bad_weak_ptr&) {
553 return;
554 }
555
556 18909 auto cancel_op = [this, &self](select_op& op, int events) {
557 18909 auto prev = op.registered.exchange(
558 select_registration_state::unregistered, std::memory_order_acq_rel);
559 18909 op.request_cancel();
560
2/2
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 18817 times.
18909 if (prev != select_registration_state::unregistered)
561 {
562 92 svc_.scheduler().deregister_fd(fd_, events);
563 92 op.impl_ptr = self;
564 92 svc_.post(&op);
565 92 svc_.work_finished();
566 }
567 25212 };
568
569 6303 cancel_op(conn_, select_scheduler::event_write);
570 6303 cancel_op(rd_, select_scheduler::event_read);
571 6303 cancel_op(wr_, select_scheduler::event_write);
572 6303 }
573
574 void
575 97 select_socket_impl::
576 cancel_single_op(select_op& op) noexcept
577 {
578 // Called from stop_token callback to cancel a specific pending operation.
579 97 auto prev = op.registered.exchange(
580 select_registration_state::unregistered, std::memory_order_acq_rel);
581 97 op.request_cancel();
582
583
2/2
✓ Branch 0 taken 65 times.
✓ Branch 1 taken 32 times.
97 if (prev != select_registration_state::unregistered)
584 {
585 // Determine which event type to deregister
586 65 int events = 0;
587
2/4
✓ Branch 0 taken 65 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 65 times.
65 if (&op == &conn_ || &op == &wr_)
588 events = select_scheduler::event_write;
589
1/2
✓ Branch 0 taken 65 times.
✗ Branch 1 not taken.
65 else if (&op == &rd_)
590 65 events = select_scheduler::event_read;
591
592 65 svc_.scheduler().deregister_fd(fd_, events);
593
594 // Keep impl alive until op completes
595 try {
596
1/1
✓ Branch 1 taken 65 times.
65 op.impl_ptr = shared_from_this();
597 } catch (const std::bad_weak_ptr&) {
598 // Impl is being destroyed, op will be orphaned but that's ok
599 }
600
601 65 svc_.post(&op);
602 65 svc_.work_finished();
603 }
604 97 }
605
606 void
607 6128 select_socket_impl::
608 close_socket() noexcept
609 {
610 6128 cancel();
611
612
2/2
✓ Branch 0 taken 4081 times.
✓ Branch 1 taken 2047 times.
6128 if (fd_ >= 0)
613 {
614 // Unconditionally remove from registered_fds_ to handle edge cases
615 // where the fd might be registered but cancel() didn't clean it up
616 // due to race conditions.
617 4081 svc_.scheduler().deregister_fd(fd_,
618 select_scheduler::event_read | select_scheduler::event_write);
619 4081 ::close(fd_);
620 4081 fd_ = -1;
621 }
622
623 // Clear cached endpoints
624 6128 local_endpoint_ = endpoint{};
625 6128 remote_endpoint_ = endpoint{};
626 6128 }
627
628 120 select_socket_service::
629 120 select_socket_service(capy::execution_context& ctx)
630
2/2
✓ Branch 2 taken 120 times.
✓ Branch 5 taken 120 times.
120 : state_(std::make_unique<select_socket_state>(ctx.use_service<select_scheduler>()))
631 {
632 120 }
633
634 240 select_socket_service::
635 120 ~select_socket_service()
636 {
637 240 }
638
639 void
640 120 select_socket_service::
641 shutdown()
642 {
643
1/1
✓ Branch 2 taken 120 times.
120 std::lock_guard lock(state_->mutex_);
644
645
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 120 times.
120 while (auto* impl = state_->socket_list_.pop_front())
646 impl->close_socket();
647
648 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
649 // drains completed_ops_, calling destroy() on each queued op. Letting
650 // ~state_ release the ptrs (during service destruction, after scheduler
651 // shutdown) keeps every impl alive until all ops have been drained.
652 120 }
653
654 tcp_socket::socket_impl&
655 4081 select_socket_service::
656 create_impl()
657 {
658
1/1
✓ Branch 1 taken 4081 times.
4081 auto impl = std::make_shared<select_socket_impl>(*this);
659 4081 auto* raw = impl.get();
660
661 {
662
1/1
✓ Branch 2 taken 4081 times.
4081 std::lock_guard lock(state_->mutex_);
663 4081 state_->socket_list_.push_back(raw);
664
1/1
✓ Branch 3 taken 4081 times.
4081 state_->socket_ptrs_.emplace(raw, std::move(impl));
665 4081 }
666
667 4081 return *raw;
668 4081 }
669
670 void
671 4081 select_socket_service::
672 destroy_impl(tcp_socket::socket_impl& impl)
673 {
674 4081 auto* select_impl = static_cast<select_socket_impl*>(&impl);
675
1/1
✓ Branch 2 taken 4081 times.
4081 std::lock_guard lock(state_->mutex_);
676 4081 state_->socket_list_.remove(select_impl);
677
1/1
✓ Branch 2 taken 4081 times.
4081 state_->socket_ptrs_.erase(select_impl);
678 4081 }
679
680 std::error_code
681 2047 select_socket_service::
682 open_socket(tcp_socket::socket_impl& impl)
683 {
684 2047 auto* select_impl = static_cast<select_socket_impl*>(&impl);
685 2047 select_impl->close_socket();
686
687 2047 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
688
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2047 times.
2047 if (fd < 0)
689 return make_err(errno);
690
691 // Set non-blocking and close-on-exec
692 2047 int flags = ::fcntl(fd, F_GETFL, 0);
693
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2047 times.
2047 if (flags == -1)
694 {
695 int errn = errno;
696 ::close(fd);
697 return make_err(errn);
698 }
699
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2047 times.
2047 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
700 {
701 int errn = errno;
702 ::close(fd);
703 return make_err(errn);
704 }
705
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2047 times.
2047 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
706 {
707 int errn = errno;
708 ::close(fd);
709 return make_err(errn);
710 }
711
712 // Check fd is within select() limits
713
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2047 times.
2047 if (fd >= FD_SETSIZE)
714 {
715 ::close(fd);
716 return make_err(EMFILE); // Too many open files
717 }
718
719 2047 select_impl->fd_ = fd;
720 2047 return {};
721 }
722
723 void
724 177971 select_socket_service::
725 post(select_op* op)
726 {
727 177971 state_->sched_.post(op);
728 177971 }
729
730 void
731 2315 select_socket_service::
732 work_started() noexcept
733 {
734 2315 state_->sched_.work_started();
735 2315 }
736
737 void
738 157 select_socket_service::
739 work_finished() noexcept
740 {
741 157 state_->sched_.work_finished();
742 157 }
743
744 } // namespace boost::corosio::detail
745
746 #endif // BOOST_COROSIO_HAS_SELECT
747