TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #include "src/ex/detail/strand_queue.hpp"
11 : #include <boost/capy/ex/detail/strand_service.hpp>
12 : #include <atomic>
13 : #include <coroutine>
14 : #include <mutex>
15 : #include <thread>
16 : #include <utility>
17 :
18 : namespace boost {
19 : namespace capy {
20 : namespace detail {
21 :
22 : //----------------------------------------------------------
23 :
24 : /** Implementation state for a strand.
25 :
26 : Each strand_impl provides serialization for coroutines
27 : dispatched through strands that share it.
28 : */
29 : // Sentinel stored in cached_frame_ after shutdown to prevent
30 : // in-flight invokers from repopulating a freed cache slot.
31 : inline void* const kCacheClosed = reinterpret_cast<void*>(1);
32 :
33 : struct strand_impl
34 : {
35 : std::mutex mutex_;
36 : strand_queue pending_;
37 : bool locked_ = false;
38 : std::atomic<std::thread::id> dispatch_thread_{};
39 : std::atomic<void*> cached_frame_{nullptr};
40 : };
41 :
42 : //----------------------------------------------------------
43 :
44 : /** Invoker coroutine for strand dispatch.
45 :
46 : Uses custom allocator to recycle frame - one allocation
47 : per strand_impl lifetime, stored in trailer for recovery.
48 : */
49 : struct strand_invoker
50 : {
51 : struct promise_type
52 : {
53 HIT 11 : void* operator new(std::size_t n, strand_impl& impl)
54 : {
55 11 : constexpr auto A = alignof(strand_impl*);
56 11 : std::size_t padded = (n + A - 1) & ~(A - 1);
57 11 : std::size_t total = padded + sizeof(strand_impl*);
58 :
59 11 : void* p = impl.cached_frame_.exchange(
60 : nullptr, std::memory_order_acquire);
61 11 : if(!p || p == kCacheClosed)
62 9 : p = ::operator new(total);
63 :
64 : // Trailer lets delete recover impl
65 11 : *reinterpret_cast<strand_impl**>(
66 11 : static_cast<char*>(p) + padded) = &impl;
67 11 : return p;
68 : }
69 :
70 11 : void operator delete(void* p, std::size_t n) noexcept
71 : {
72 11 : constexpr auto A = alignof(strand_impl*);
73 11 : std::size_t padded = (n + A - 1) & ~(A - 1);
74 :
75 11 : auto* impl = *reinterpret_cast<strand_impl**>(
76 : static_cast<char*>(p) + padded);
77 :
78 11 : void* expected = nullptr;
79 11 : if(!impl->cached_frame_.compare_exchange_strong(
80 : expected, p, std::memory_order_release))
81 MIS 0 : ::operator delete(p);
82 HIT 11 : }
83 :
84 11 : strand_invoker get_return_object() noexcept
85 11 : { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
86 :
87 11 : std::suspend_always initial_suspend() noexcept { return {}; }
88 11 : std::suspend_never final_suspend() noexcept { return {}; }
89 11 : void return_void() noexcept {}
90 MIS 0 : void unhandled_exception() { std::terminate(); }
91 : };
92 :
93 : std::coroutine_handle<promise_type> h_;
94 : };
95 :
96 : //----------------------------------------------------------
97 :
98 : /** Concrete implementation of strand_service.
99 :
100 : Holds the fixed pool of strand_impl objects.
101 : */
102 : class strand_service_impl : public strand_service
103 : {
104 : static constexpr std::size_t num_impls = 211;
105 :
106 : strand_impl impls_[num_impls];
107 : std::size_t salt_ = 0;
108 : std::mutex mutex_;
109 :
110 : public:
111 : explicit
112 HIT 21 : strand_service_impl(execution_context&)
113 4452 : {
114 21 : }
115 :
116 : strand_impl*
117 25 : get_implementation() override
118 : {
119 25 : std::lock_guard<std::mutex> lock(mutex_);
120 25 : std::size_t index = salt_++;
121 25 : index = index % num_impls;
122 25 : return &impls_[index];
123 25 : }
124 :
125 : protected:
126 : void
127 21 : shutdown() override
128 : {
129 4452 : for(std::size_t i = 0; i < num_impls; ++i)
130 : {
131 4431 : std::lock_guard<std::mutex> lock(impls_[i].mutex_);
132 4431 : impls_[i].locked_ = true;
133 :
134 4431 : void* p = impls_[i].cached_frame_.exchange(
135 : kCacheClosed, std::memory_order_acquire);
136 4431 : if(p)
137 9 : ::operator delete(p);
138 4431 : }
139 21 : }
140 :
141 : private:
142 : static bool
143 328 : enqueue(strand_impl& impl, std::coroutine_handle<> h)
144 : {
145 328 : std::lock_guard<std::mutex> lock(impl.mutex_);
146 328 : impl.pending_.push(h);
147 328 : if(!impl.locked_)
148 : {
149 11 : impl.locked_ = true;
150 11 : return true;
151 : }
152 317 : return false;
153 328 : }
154 :
155 : static void
156 25 : dispatch_pending(strand_impl& impl)
157 : {
158 25 : strand_queue::taken_batch batch;
159 : {
160 25 : std::lock_guard<std::mutex> lock(impl.mutex_);
161 25 : batch = impl.pending_.take_all();
162 25 : }
163 25 : impl.pending_.dispatch_batch(batch);
164 25 : }
165 :
166 : static bool
167 25 : try_unlock(strand_impl& impl)
168 : {
169 25 : std::lock_guard<std::mutex> lock(impl.mutex_);
170 25 : if(impl.pending_.empty())
171 : {
172 11 : impl.locked_ = false;
173 11 : return true;
174 : }
175 14 : return false;
176 25 : }
177 :
178 : static void
179 25 : set_dispatch_thread(strand_impl& impl) noexcept
180 : {
181 25 : impl.dispatch_thread_.store(std::this_thread::get_id());
182 25 : }
183 :
184 : static void
185 11 : clear_dispatch_thread(strand_impl& impl) noexcept
186 : {
187 11 : impl.dispatch_thread_.store(std::thread::id{});
188 11 : }
189 :
190 : // Loops until queue empty (aggressive). Alternative: per-batch fairness
191 : // (repost after each batch to let other work run) - explore if starvation observed.
192 : static strand_invoker
193 11 : make_invoker(strand_impl& impl)
194 : {
195 : strand_impl* p = &impl;
196 : for(;;)
197 : {
198 : set_dispatch_thread(*p);
199 : dispatch_pending(*p);
200 : if(try_unlock(*p))
201 : {
202 : clear_dispatch_thread(*p);
203 : co_return;
204 : }
205 : }
206 22 : }
207 :
208 : friend class strand_service;
209 : };
210 :
211 : //----------------------------------------------------------
212 :
213 21 : strand_service::
214 21 : strand_service()
215 21 : : service()
216 : {
217 21 : }
218 :
219 21 : strand_service::
220 : ~strand_service() = default;
221 :
222 : bool
223 6 : strand_service::
224 : running_in_this_thread(strand_impl& impl) noexcept
225 : {
226 6 : return impl.dispatch_thread_.load() == std::this_thread::get_id();
227 : }
228 :
229 : std::coroutine_handle<>
230 5 : strand_service::
231 : dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
232 : {
233 5 : if(running_in_this_thread(impl))
234 2 : return h;
235 :
236 3 : if(strand_service_impl::enqueue(impl, h))
237 3 : ex.post(strand_service_impl::make_invoker(impl).h_);
238 3 : return std::noop_coroutine();
239 : }
240 :
241 : void
242 325 : strand_service::
243 : post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
244 : {
245 325 : if(strand_service_impl::enqueue(impl, h))
246 8 : ex.post(strand_service_impl::make_invoker(impl).h_);
247 325 : }
248 :
249 : strand_service&
250 25 : get_strand_service(execution_context& ctx)
251 : {
252 25 : return ctx.use_service<strand_service_impl>();
253 : }
254 :
255 : } // namespace detail
256 : } // namespace capy
257 : } // namespace boost
|