src/ex/detail/strand_queue.hpp
74.2% Lines (49/66)
85.7% List of functions (12/14)
76.9% Branches (10/13)
Functions (14)
Function
Calls
Lines
Branches
Blocks
boost::capy::detail::strand_op::promise_type::get_return_object()
:63
0
100.0%
–
–
boost::capy::detail::strand_op::promise_type::initial_suspend()
:69
0
100.0%
–
–
boost::capy::detail::strand_op::promise_type::final_suspend()
:75
0
100.0%
–
–
boost::capy::detail::strand_op::promise_type::return_void()
:81
0
100.0%
–
–
boost::capy::detail::strand_op::promise_type::unhandled_exception()
:86
0
0.0%
–
–
boost::capy::detail::strand_queue::make_strand_op(boost::capy::detail::strand_queue&, std::__n4861::coroutine_handle<void>)
:126
0
100.0%
100.0%
–
boost::capy::detail::strand_queue::strand_queue()
:136
0
100.0%
–
–
boost::capy::detail::strand_queue::~strand_queue()
:146
0
36.4%
50.0%
–
boost::capy::detail::strand_queue::empty() const
:170
0
100.0%
–
–
boost::capy::detail::strand_queue::push(std::__n4861::coroutine_handle<void>)
:184
0
100.0%
100.0%
–
boost::capy::detail::strand_queue::take_all()
:243
0
100.0%
–
–
boost::capy::detail::strand_queue::dispatch_batch(boost::capy::detail::strand_queue::taken_batch&)
:260
0
100.0%
100.0%
–
boost::capy::detail::strand_op::promise_type::operator new(unsigned long, boost::capy::detail::strand_queue&, std::__n4861::coroutine_handle<void>)
:284
0
75.0%
50.0%
–
boost::capy::detail::strand_op::promise_type::operator delete(void*, unsigned long)
:317
0
0.0%
–
–
| Line | Branch | TLA | Hits | Source Code |
|---|---|---|---|---|
| 1 | // | |||
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | |||
| 3 | // | |||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||
| 6 | // | |||
| 7 | // Official repository: https://github.com/cppalliance/capy | |||
| 8 | // | |||
| 9 | ||||
| 10 | #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | |||
| 11 | #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | |||
| 12 | ||||
| 13 | #include <boost/capy/detail/config.hpp> | |||
| 14 | ||||
| 15 | #include <coroutine> | |||
| 16 | #include <cstddef> | |||
| 17 | #include <exception> | |||
| 18 | ||||
| 19 | namespace boost { | |||
| 20 | namespace capy { | |||
| 21 | namespace detail { | |||
| 22 | ||||
| 23 | class strand_queue; | |||
| 24 | ||||
| 25 | //---------------------------------------------------------- | |||
| 26 | ||||
| 27 | // Metadata stored before the coroutine frame | |||
| 28 | struct frame_prefix | |||
| 29 | { | |||
| 30 | frame_prefix* next; | |||
| 31 | strand_queue* queue; | |||
| 32 | std::size_t alloc_size; | |||
| 33 | }; | |||
| 34 | ||||
| 35 | //---------------------------------------------------------- | |||
| 36 | ||||
| 37 | /** Wrapper coroutine for strand queue dispatch operations. | |||
| 38 | ||||
| 39 | This coroutine wraps a target coroutine handle and resumes | |||
| 40 | it when dispatched. The wrapper ensures control returns to | |||
| 41 | the dispatch loop after the target suspends or completes. | |||
| 42 | ||||
| 43 | The promise contains an intrusive list node for queue | |||
| 44 | storage and supports a custom allocator that recycles | |||
| 45 | coroutine frames via a free list. | |||
| 46 | */ | |||
| 47 | struct strand_op | |||
| 48 | { | |||
| 49 | struct promise_type | |||
| 50 | { | |||
| 51 | promise_type* next = nullptr; | |||
| 52 | ||||
| 53 | void* | |||
| 54 | operator new( | |||
| 55 | std::size_t size, | |||
| 56 | strand_queue& q, | |||
| 57 | std::coroutine_handle<void>); | |||
| 58 | ||||
| 59 | void | |||
| 60 | operator delete(void* p, std::size_t); | |||
| 61 | ||||
| 62 | strand_op | |||
| 63 | 328x | get_return_object() noexcept | ||
| 64 | { | |||
| 65 | 328x | return {std::coroutine_handle<promise_type>::from_promise(*this)}; | ||
| 66 | } | |||
| 67 | ||||
| 68 | std::suspend_always | |||
| 69 | 328x | initial_suspend() noexcept | ||
| 70 | { | |||
| 71 | 328x | return {}; | ||
| 72 | } | |||
| 73 | ||||
| 74 | std::suspend_always | |||
| 75 | 328x | final_suspend() noexcept | ||
| 76 | { | |||
| 77 | 328x | return {}; | ||
| 78 | } | |||
| 79 | ||||
| 80 | void | |||
| 81 | 328x | return_void() noexcept | ||
| 82 | { | |||
| 83 | 328x | } | ||
| 84 | ||||
| 85 | void | |||
| 86 | ✗ | unhandled_exception() | ||
| 87 | { | |||
| 88 | ✗ | std::terminate(); | ||
| 89 | } | |||
| 90 | }; | |||
| 91 | ||||
| 92 | std::coroutine_handle<promise_type> h_; | |||
| 93 | }; | |||
| 94 | ||||
| 95 | //---------------------------------------------------------- | |||
| 96 | ||||
| 97 | /** Single-threaded dispatch queue for coroutine handles. | |||
| 98 | ||||
| 99 | This queue stores coroutine handles and resumes them | |||
| 100 | sequentially when dispatch() is called. Each pushed | |||
| 101 | handle is wrapped in a strand_op coroutine that ensures | |||
| 102 | control returns to the dispatch loop after the target | |||
| 103 | suspends or completes. | |||
| 104 | ||||
| 105 | The queue uses an intrusive singly-linked list through | |||
| 106 | the promise type to avoid separate node allocations. | |||
| 107 | A free list recycles wrapper coroutine frames to reduce | |||
| 108 | allocation overhead during repeated push/dispatch cycles. | |||
| 109 | ||||
| 110 | @par Thread Safety | |||
| 111 | This class is not thread-safe. All operations must be | |||
| 112 | called from a single thread. | |||
| 113 | */ | |||
| 114 | class strand_queue | |||
| 115 | { | |||
| 116 | using promise_type = strand_op::promise_type; | |||
| 117 | ||||
| 118 | promise_type* head_ = nullptr; | |||
| 119 | promise_type* tail_ = nullptr; | |||
| 120 | frame_prefix* free_list_ = nullptr; | |||
| 121 | ||||
| 122 | friend struct strand_op::promise_type; | |||
| 123 | ||||
| 124 | static | |||
| 125 | strand_op | |||
| 126 |
1/1✓ Branch 1 taken 328 times.
|
328x | make_strand_op( | |
| 127 | strand_queue& q, | |||
| 128 | std::coroutine_handle<void> target) | |||
| 129 | { | |||
| 130 | (void)q; | |||
| 131 | target.resume(); | |||
| 132 | co_return; | |||
| 133 | 656x | } | ||
| 134 | ||||
| 135 | public: | |||
| 136 | 4431x | strand_queue() = default; | ||
| 137 | ||||
| 138 | strand_queue(strand_queue const&) = delete; | |||
| 139 | strand_queue& operator=(strand_queue const&) = delete; | |||
| 140 | ||||
| 141 | /** Destructor. | |||
| 142 | ||||
| 143 | Destroys any pending wrappers without resuming them, | |||
| 144 | then frees all memory in the free list. | |||
| 145 | */ | |||
| 146 | 4431x | ~strand_queue() | ||
| 147 | { | |||
| 148 | // Destroy pending wrappers | |||
| 149 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4431 times.
|
4431x | while(head_) | |
| 150 | { | |||
| 151 | ✗ | promise_type* p = head_; | ||
| 152 | ✗ | head_ = p->next; | ||
| 153 | ||||
| 154 | ✗ | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | ||
| 155 | ✗ | h.destroy(); | ||
| 156 | } | |||
| 157 | ||||
| 158 | // Free the free list memory | |||
| 159 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4431 times.
|
4431x | while(free_list_) | |
| 160 | { | |||
| 161 | ✗ | frame_prefix* prefix = free_list_; | ||
| 162 | ✗ | free_list_ = prefix->next; | ||
| 163 | ✗ | ::operator delete(prefix); | ||
| 164 | } | |||
| 165 | 4431x | } | ||
| 166 | ||||
| 167 | /** Returns true if there are no pending operations. | |||
| 168 | */ | |||
| 169 | bool | |||
| 170 | 25x | empty() const noexcept | ||
| 171 | { | |||
| 172 | 25x | return head_ == nullptr; | ||
| 173 | } | |||
| 174 | ||||
| 175 | /** Push a coroutine handle to the queue. | |||
| 176 | ||||
| 177 | Creates a wrapper coroutine and appends it to the | |||
| 178 | queue. The wrapper will resume the target handle | |||
| 179 | when dispatch() processes it. | |||
| 180 | ||||
| 181 | @param h The coroutine handle to dispatch. | |||
| 182 | */ | |||
| 183 | void | |||
| 184 | 328x | push(std::coroutine_handle<void> h) | ||
| 185 | { | |||
| 186 |
1/1✓ Branch 1 taken 328 times.
|
328x | strand_op op = make_strand_op(*this, h); | |
| 187 | ||||
| 188 | 328x | promise_type* p = &op.h_.promise(); | ||
| 189 | 328x | p->next = nullptr; | ||
| 190 | ||||
| 191 |
2/2✓ Branch 0 taken 303 times.
✓ Branch 1 taken 25 times.
|
328x | if(tail_) | |
| 192 | 303x | tail_->next = p; | ||
| 193 | else | |||
| 194 | 25x | head_ = p; | ||
| 195 | 328x | tail_ = p; | ||
| 196 | 328x | } | ||
| 197 | ||||
| 198 | /** Resume all queued coroutines in sequence. | |||
| 199 | ||||
| 200 | Processes each wrapper in FIFO order, resuming its | |||
| 201 | target coroutine. After each target suspends or | |||
| 202 | completes, the wrapper is destroyed and its frame | |||
| 203 | is added to the free list for reuse. | |||
| 204 | ||||
| 205 | Coroutines resumed during dispatch may push new | |||
| 206 | handles, which will also be processed in the same | |||
| 207 | dispatch call. | |||
| 208 | ||||
| 209 | @warning Not thread-safe. Do not call while another | |||
| 210 | thread may be calling push(). | |||
| 211 | */ | |||
| 212 | void | |||
| 213 | dispatch() | |||
| 214 | { | |||
| 215 | while(head_) | |||
| 216 | { | |||
| 217 | promise_type* p = head_; | |||
| 218 | head_ = p->next; | |||
| 219 | if(!head_) | |||
| 220 | tail_ = nullptr; | |||
| 221 | ||||
| 222 | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | |||
| 223 | h.resume(); | |||
| 224 | h.destroy(); | |||
| 225 | } | |||
| 226 | } | |||
| 227 | ||||
| 228 | /** Batch of taken items for thread-safe dispatch. */ | |||
| 229 | struct taken_batch | |||
| 230 | { | |||
| 231 | promise_type* head = nullptr; | |||
| 232 | promise_type* tail = nullptr; | |||
| 233 | }; | |||
| 234 | ||||
| 235 | /** Take all pending items atomically. | |||
| 236 | ||||
| 237 | Removes all items from the queue and returns them | |||
| 238 | as a batch. The queue is left empty. | |||
| 239 | ||||
| 240 | @return The batch of taken items. | |||
| 241 | */ | |||
| 242 | taken_batch | |||
| 243 | 25x | take_all() noexcept | ||
| 244 | { | |||
| 245 | 25x | taken_batch batch{head_, tail_}; | ||
| 246 | 25x | head_ = tail_ = nullptr; | ||
| 247 | 25x | return batch; | ||
| 248 | } | |||
| 249 | ||||
| 250 | /** Dispatch a batch of taken items. | |||
| 251 | ||||
| 252 | @param batch The batch to dispatch. | |||
| 253 | ||||
| 254 | @note This is thread-safe w.r.t. push() because it doesn't | |||
| 255 | access the queue's free_list_. Frames are deleted directly | |||
| 256 | rather than recycled. | |||
| 257 | */ | |||
| 258 | static | |||
| 259 | void | |||
| 260 | 25x | dispatch_batch(taken_batch& batch) | ||
| 261 | { | |||
| 262 |
2/2✓ Branch 0 taken 328 times.
✓ Branch 1 taken 25 times.
|
353x | while(batch.head) | |
| 263 | { | |||
| 264 | 328x | promise_type* p = batch.head; | ||
| 265 | 328x | batch.head = p->next; | ||
| 266 | ||||
| 267 | 328x | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | ||
| 268 |
1/1✓ Branch 1 taken 328 times.
|
328x | h.resume(); | |
| 269 | // Don't use h.destroy() - it would call operator delete which | |||
| 270 | // accesses the queue's free_list_ (race with push). | |||
| 271 | // Instead, manually free the frame without recycling. | |||
| 272 | // h.address() returns the frame base (what operator new returned). | |||
| 273 | 328x | frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1; | ||
| 274 | 328x | ::operator delete(prefix); | ||
| 275 | } | |||
| 276 | 25x | batch.tail = nullptr; | ||
| 277 | 25x | } | ||
| 278 | }; | |||
| 279 | ||||
| 280 | //---------------------------------------------------------- | |||
| 281 | ||||
| 282 | inline | |||
| 283 | void* | |||
| 284 | 328x | strand_op::promise_type::operator new( | ||
| 285 | std::size_t size, | |||
| 286 | strand_queue& q, | |||
| 287 | std::coroutine_handle<void>) | |||
| 288 | { | |||
| 289 | // Total size includes prefix | |||
| 290 | 328x | std::size_t alloc_size = size + sizeof(frame_prefix); | ||
| 291 | void* raw; | |||
| 292 | ||||
| 293 | // Try to reuse from free list | |||
| 294 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 328 times.
|
328x | if(q.free_list_) | |
| 295 | { | |||
| 296 | ✗ | frame_prefix* prefix = q.free_list_; | ||
| 297 | ✗ | q.free_list_ = prefix->next; | ||
| 298 | ✗ | raw = prefix; | ||
| 299 | } | |||
| 300 | else | |||
| 301 | { | |||
| 302 | 328x | raw = ::operator new(alloc_size); | ||
| 303 | } | |||
| 304 | ||||
| 305 | // Initialize prefix | |||
| 306 | 328x | frame_prefix* prefix = static_cast<frame_prefix*>(raw); | ||
| 307 | 328x | prefix->next = nullptr; | ||
| 308 | 328x | prefix->queue = &q; | ||
| 309 | 328x | prefix->alloc_size = alloc_size; | ||
| 310 | ||||
| 311 | // Return pointer AFTER the prefix (this is where coroutine frame goes) | |||
| 312 | 328x | return prefix + 1; | ||
| 313 | } | |||
| 314 | ||||
| 315 | inline | |||
| 316 | void | |||
| 317 | ✗ | strand_op::promise_type::operator delete(void* p, std::size_t) | ||
| 318 | { | |||
| 319 | // Calculate back to get the prefix | |||
| 320 | ✗ | frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1; | ||
| 321 | ||||
| 322 | // Add to free list | |||
| 323 | ✗ | prefix->next = prefix->queue->free_list_; | ||
| 324 | ✗ | prefix->queue->free_list_ = prefix; | ||
| 325 | ✗ | } | ||
| 326 | ||||
| 327 | } // namespace detail | |||
| 328 | } // namespace capy | |||
| 329 | } // namespace boost | |||
| 330 | ||||
| 331 | #endif | |||
| 332 |