TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/void_to_monostate.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/concept/io_awaitable.hpp>
17 : #include <coroutine>
18 : #include <boost/capy/ex/executor_ref.hpp>
19 : #include <boost/capy/ex/frame_allocator.hpp>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/task.hpp>
22 :
23 : #include <array>
24 : #include <atomic>
25 : #include <exception>
26 : #include <optional>
27 : #include <ranges>
28 : #include <stdexcept>
29 : #include <stop_token>
30 : #include <tuple>
31 : #include <type_traits>
32 : #include <utility>
33 : #include <variant>
34 : #include <vector>
35 :
36 : /*
37 : when_any - Race multiple tasks, return first completion
38 : ========================================================
39 :
40 : OVERVIEW:
41 : ---------
42 : when_any launches N tasks concurrently and completes when the FIRST task
43 : finishes (success or failure). It then requests stop for all siblings and
44 : waits for them to acknowledge before returning.
45 :
46 : ARCHITECTURE:
47 : -------------
48 : The design mirrors when_all but with inverted completion semantics:
49 :
50 : when_all: complete when remaining_count reaches 0 (all done)
51 : when_any: complete when has_winner becomes true (first done)
52 : BUT still wait for remaining_count to reach 0 for cleanup
53 :
54 : Key components:
55 : - when_any_state: Shared state tracking winner and completion
56 : - when_any_runner: Wrapper coroutine for each child task
57 : - when_any_launcher: Awaitable that starts all runners concurrently
58 :
59 : CRITICAL INVARIANTS:
60 : --------------------
61 : 1. Exactly one task becomes the winner (via atomic compare_exchange)
62 : 2. All tasks must complete before parent resumes (cleanup safety)
63 : 3. Stop is requested immediately when winner is determined
64 : 4. Only the winner's result/exception is stored
65 :
66 : POSITIONAL VARIANT:
67 : -------------------
68 : The variadic overload returns a std::variant with one alternative per
69 : input task, preserving positional correspondence. Use .index() on
70 : the variant to identify which task won.
71 :
72 : Example: when_any(task<int>, task<string>, task<int>)
73 : - Raw types after void->monostate: int, string, int
74 : - Result variant: std::variant<int, string, int>
75 : - variant.index() tells you which task won (0, 1, or 2)
76 :
77 : VOID HANDLING:
78 : --------------
79 : void tasks contribute std::monostate to the variant.
80 : All-void tasks result in: variant<monostate, monostate, monostate>
81 :
82 : MEMORY MODEL:
83 : -------------
84 : Synchronization chain from winner's write to parent's read:
85 :
86 : 1. Winner thread writes result_/winner_exception_ (non-atomic)
87 : 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
88 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
89 : → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
90 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
91 : 5. Parent coroutine resumes and reads result_/winner_exception_
92 :
93 : Synchronization analysis:
94 : - All fetch_sub operations on remaining_count_ form a release sequence
95 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
96 : in the modification order of remaining_count_
97 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
98 : modification order, establishing happens-before from winner's writes
99 : - Executor dispatch() is expected to provide queue-based synchronization
100 : (release-on-post, acquire-on-execute) completing the chain to parent
101 : - Even inline executors work (same thread = sequenced-before)
102 :
103 : Alternative considered: Adding winner_ready_ atomic (set with release after
104 : storing winner data, acquired before reading) would make synchronization
105 : self-contained and not rely on executor implementation details. Current
106 : approach is correct but requires careful reasoning about release sequences
107 : and executor behavior.
108 :
109 : EXCEPTION SEMANTICS:
110 : --------------------
111 : Unlike when_all (which captures first exception, discards others), when_any
112 : treats exceptions as valid completions. If the winning task threw, that
113 : exception is rethrown. Exceptions from non-winners are silently discarded.
114 : */
115 :
116 : namespace boost {
117 : namespace capy {
118 :
119 : namespace detail {
120 :
121 : /** Core shared state for when_any operations.
122 :
123 : Contains all members and methods common to both heterogeneous (variadic)
124 : and homogeneous (range) when_any implementations. State classes embed
125 : this via composition to avoid CRTP destructor ordering issues.
126 :
127 : @par Thread Safety
128 : Atomic operations protect winner selection and completion count.
129 : */
130 : struct when_any_core
131 : {
132 : std::atomic<std::size_t> remaining_count_;
133 : std::size_t winner_index_{0};
134 : std::exception_ptr winner_exception_;
135 : std::stop_source stop_source_;
136 :
137 : // Bridges parent's stop token to our stop_source
138 : struct stop_callback_fn
139 : {
140 : std::stop_source* source_;
141 HIT 9 : void operator()() const noexcept { source_->request_stop(); }
142 : };
143 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
144 : std::optional<stop_callback_t> parent_stop_callback_;
145 :
146 : std::coroutine_handle<> continuation_;
147 : io_env const* caller_env_ = nullptr;
148 :
149 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
150 : std::atomic<bool> has_winner_{false};
151 :
152 73 : explicit when_any_core(std::size_t count) noexcept
153 73 : : remaining_count_(count)
154 : {
155 73 : }
156 :
157 : /** Atomically claim winner status; exactly one task succeeds. */
158 206 : bool try_win(std::size_t index) noexcept
159 : {
160 206 : bool expected = false;
161 206 : if(has_winner_.compare_exchange_strong(
162 : expected, true, std::memory_order_acq_rel))
163 : {
164 73 : winner_index_ = index;
165 73 : stop_source_.request_stop();
166 73 : return true;
167 : }
168 133 : return false;
169 : }
170 :
171 : /** @pre try_win() returned true. */
172 9 : void set_winner_exception(std::exception_ptr ep) noexcept
173 : {
174 9 : winner_exception_ = ep;
175 9 : }
176 :
177 : // Runners signal completion directly via final_suspend; no member function needed.
178 : };
179 :
180 : /** Shared state for heterogeneous when_any operation.
181 :
182 : Coordinates winner selection, result storage, and completion tracking
183 : for all child tasks in a when_any operation. Uses composition with
184 : when_any_core for shared functionality.
185 :
186 : @par Lifetime
187 : Allocated on the parent coroutine's frame, outlives all runners.
188 :
189 : @tparam Ts Task result types.
190 : */
191 : template<typename... Ts>
192 : struct when_any_state
193 : {
194 : static constexpr std::size_t task_count = sizeof...(Ts);
195 : using variant_type = std::variant<void_to_monostate_t<Ts>...>;
196 :
197 : when_any_core core_;
198 : std::optional<variant_type> result_;
199 : std::array<std::coroutine_handle<>, task_count> runner_handles_{};
200 :
201 51 : when_any_state()
202 51 : : core_(task_count)
203 : {
204 51 : }
205 :
206 : // Runners self-destruct in final_suspend. No destruction needed here.
207 :
208 : /** @pre core_.try_win() returned true.
209 : @note Uses in_place_index (not type) for positional variant access.
210 : */
211 : template<std::size_t I, typename T>
212 37 : void set_winner_result(T value)
213 : noexcept(std::is_nothrow_move_constructible_v<T>)
214 : {
215 37 : result_.emplace(std::in_place_index<I>, std::move(value));
216 37 : }
217 :
218 : /** @pre core_.try_win() returned true. */
219 : template<std::size_t I>
220 8 : void set_winner_void() noexcept
221 : {
222 8 : result_.emplace(std::in_place_index<I>, std::monostate{});
223 8 : }
224 : };
225 :
226 : /** Wrapper coroutine that runs a single child task for when_any.
227 :
228 : Propagates executor/stop_token to the child, attempts to claim winner
229 : status on completion, and signals completion for cleanup coordination.
230 :
231 : @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
232 : */
233 : template<typename StateType>
234 : struct when_any_runner
235 : {
236 : struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
237 : {
238 : StateType* state_ = nullptr;
239 : std::size_t index_ = 0;
240 : io_env env_;
241 :
242 206 : when_any_runner get_return_object() noexcept
243 : {
244 206 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
245 : }
246 :
247 : // Starts suspended; launcher sets up state/ex/token then resumes
248 206 : std::suspend_always initial_suspend() noexcept
249 : {
250 206 : return {};
251 : }
252 :
253 206 : auto final_suspend() noexcept
254 : {
255 : struct awaiter
256 : {
257 : promise_type* p_;
258 206 : bool await_ready() const noexcept { return false; }
259 206 : auto await_suspend(std::coroutine_handle<> h) noexcept
260 : {
261 : // Extract everything needed before self-destruction.
262 206 : auto& core = p_->state_->core_;
263 206 : auto* counter = &core.remaining_count_;
264 206 : auto* caller_env = core.caller_env_;
265 206 : auto cont = core.continuation_;
266 :
267 206 : h.destroy();
268 :
269 : // If last runner, dispatch parent for symmetric transfer.
270 206 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
271 206 : if(remaining == 1)
272 73 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
273 133 : return detail::symmetric_transfer(std::noop_coroutine());
274 : }
275 MIS 0 : void await_resume() const noexcept {}
276 : };
277 HIT 206 : return awaiter{this};
278 : }
279 :
280 193 : void return_void() noexcept {}
281 :
282 : // Exceptions are valid completions in when_any (unlike when_all)
283 13 : void unhandled_exception()
284 : {
285 13 : if(state_->core_.try_win(index_))
286 9 : state_->core_.set_winner_exception(std::current_exception());
287 13 : }
288 :
289 : /** Injects executor and stop token into child awaitables. */
290 : template<class Awaitable>
291 : struct transform_awaiter
292 : {
293 : std::decay_t<Awaitable> a_;
294 : promise_type* p_;
295 :
296 206 : bool await_ready() { return a_.await_ready(); }
297 206 : auto await_resume() { return a_.await_resume(); }
298 :
299 : template<class Promise>
300 200 : auto await_suspend(std::coroutine_handle<Promise> h)
301 : {
302 : using R = decltype(a_.await_suspend(h, &p_->env_));
303 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
304 200 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
305 : else
306 : return a_.await_suspend(h, &p_->env_);
307 : }
308 : };
309 :
310 : template<class Awaitable>
311 206 : auto await_transform(Awaitable&& a)
312 : {
313 : using A = std::decay_t<Awaitable>;
314 : if constexpr (IoAwaitable<A>)
315 : {
316 : return transform_awaiter<Awaitable>{
317 412 : std::forward<Awaitable>(a), this};
318 : }
319 : else
320 : {
321 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
322 : }
323 206 : }
324 : };
325 :
326 : std::coroutine_handle<promise_type> h_;
327 :
328 206 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
329 206 : : h_(h)
330 : {
331 206 : }
332 :
333 : // Enable move for all clang versions - some versions need it
334 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
335 :
336 : // Non-copyable
337 : when_any_runner(when_any_runner const&) = delete;
338 : when_any_runner& operator=(when_any_runner const&) = delete;
339 : when_any_runner& operator=(when_any_runner&&) = delete;
340 :
341 206 : auto release() noexcept
342 : {
343 206 : return std::exchange(h_, nullptr);
344 : }
345 : };
346 :
347 : /** Indexed overload for heterogeneous when_any (compile-time index).
348 :
349 : Uses compile-time index I for variant construction via in_place_index.
350 : Called from when_any_launcher::launch_one<I>().
351 : */
352 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
353 : when_any_runner<StateType>
354 121 : make_when_any_runner(Awaitable inner, StateType* state)
355 : {
356 : using T = awaitable_result_t<Awaitable>;
357 : if constexpr (std::is_void_v<T>)
358 : {
359 : co_await std::move(inner);
360 : if(state->core_.try_win(I))
361 : state->template set_winner_void<I>();
362 : }
363 : else
364 : {
365 : auto result = co_await std::move(inner);
366 : if(state->core_.try_win(I))
367 : {
368 : try
369 : {
370 : state->template set_winner_result<I>(std::move(result));
371 : }
372 : catch(...)
373 : {
374 : state->core_.set_winner_exception(std::current_exception());
375 : }
376 : }
377 : }
378 242 : }
379 :
380 : /** Runtime-index overload for homogeneous when_any (range path).
381 :
382 : Uses requires-expressions to detect state capabilities:
383 : - set_winner_void(): for heterogeneous void tasks (stores monostate)
384 : - set_winner_result(): for non-void tasks
385 : - Neither: for homogeneous void tasks (no result storage)
386 : */
387 : template<IoAwaitable Awaitable, typename StateType>
388 : when_any_runner<StateType>
389 85 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
390 : {
391 : using T = awaitable_result_t<Awaitable>;
392 : if constexpr (std::is_void_v<T>)
393 : {
394 : co_await std::move(inner);
395 : if(state->core_.try_win(index))
396 : {
397 : if constexpr (requires { state->set_winner_void(); })
398 : state->set_winner_void();
399 : }
400 : }
401 : else
402 : {
403 : auto result = co_await std::move(inner);
404 : if(state->core_.try_win(index))
405 : {
406 : try
407 : {
408 : state->set_winner_result(std::move(result));
409 : }
410 : catch(...)
411 : {
412 : state->core_.set_winner_exception(std::current_exception());
413 : }
414 : }
415 : }
416 170 : }
417 :
418 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
419 : template<IoAwaitable... Awaitables>
420 : class when_any_launcher
421 : {
422 : using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
423 :
424 : std::tuple<Awaitables...>* tasks_;
425 : state_type* state_;
426 :
427 : public:
428 51 : when_any_launcher(
429 : std::tuple<Awaitables...>* tasks,
430 : state_type* state)
431 51 : : tasks_(tasks)
432 51 : , state_(state)
433 : {
434 51 : }
435 :
436 51 : bool await_ready() const noexcept
437 : {
438 51 : return sizeof...(Awaitables) == 0;
439 : }
440 :
441 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
442 : destroys this object before await_suspend returns. Must not reference
443 : `this` after the final launch_one call.
444 : */
445 51 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
446 : {
447 51 : state_->core_.continuation_ = continuation;
448 51 : state_->core_.caller_env_ = caller_env;
449 :
450 51 : if(caller_env->stop_token.stop_possible())
451 : {
452 18 : state_->core_.parent_stop_callback_.emplace(
453 9 : caller_env->stop_token,
454 9 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
455 :
456 9 : if(caller_env->stop_token.stop_requested())
457 3 : state_->core_.stop_source_.request_stop();
458 : }
459 :
460 51 : auto token = state_->core_.stop_source_.get_token();
461 102 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
462 51 : (..., launch_one<Is>(caller_env->executor, token));
463 51 : }(std::index_sequence_for<Awaitables...>{});
464 :
465 102 : return std::noop_coroutine();
466 51 : }
467 :
468 51 : void await_resume() const noexcept
469 : {
470 51 : }
471 :
472 : private:
473 : /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
474 : template<std::size_t I>
475 121 : void launch_one(executor_ref caller_ex, std::stop_token token)
476 : {
477 121 : auto runner = make_when_any_runner<I>(
478 121 : std::move(std::get<I>(*tasks_)), state_);
479 :
480 121 : auto h = runner.release();
481 121 : h.promise().state_ = state_;
482 121 : h.promise().index_ = I;
483 121 : h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
484 :
485 121 : std::coroutine_handle<> ch{h};
486 121 : state_->runner_handles_[I] = ch;
487 121 : caller_ex.post(ch);
488 242 : }
489 : };
490 :
491 : } // namespace detail
492 :
493 : /** Wait for the first awaitable to complete.
494 :
495 : Races multiple heterogeneous awaitables concurrently and returns when the
496 : first one completes. The result is a variant with one alternative per
497 : input task, preserving positional correspondence.
498 :
499 : @par Suspends
500 : The calling coroutine suspends when co_await is invoked. All awaitables
501 : are launched concurrently and execute in parallel. The coroutine resumes
502 : only after all awaitables have completed, even though the winner is
503 : determined by the first to finish.
504 :
505 : @par Completion Conditions
506 : @li Winner is determined when the first awaitable completes (success or exception)
507 : @li Only one task can claim winner status via atomic compare-exchange
508 : @li Once a winner exists, stop is requested for all remaining siblings
509 : @li Parent coroutine resumes only after all siblings acknowledge completion
510 : @li The winner's result is returned; if the winner threw, the exception is rethrown
511 :
512 : @par Cancellation Semantics
513 : Cancellation is supported via stop_token propagated through the
514 : IoAwaitable protocol:
515 : @li Each child awaitable receives a stop_token derived from a shared stop_source
516 : @li When the parent's stop token is activated, the stop is forwarded to all children
517 : @li When a winner is determined, stop_source_.request_stop() is called immediately
518 : @li Siblings must handle cancellation gracefully and complete before parent resumes
519 : @li Stop requests are cooperative; tasks must check and respond to them
520 :
521 : @par Concurrency/Overlap
522 : All awaitables are launched concurrently before any can complete.
523 : The launcher iterates through the arguments, starting each task on the
524 : caller's executor. Tasks may execute in parallel on multi-threaded
525 : executors or interleave on single-threaded executors. There is no
526 : guaranteed ordering of task completion.
527 :
528 : @par Notable Error Conditions
529 : @li Winner exception: if the winning task threw, that exception is rethrown
530 : @li Non-winner exceptions: silently discarded (only winner's result matters)
531 : @li Cancellation: tasks may complete via cancellation without throwing
532 :
533 : @par Example
534 : @code
535 : task<void> example() {
536 : auto result = co_await when_any(
537 : fetch_int(), // task<int>
538 : fetch_string() // task<std::string>
539 : );
540 : // result.index() is 0 or 1
541 : if (result.index() == 0)
542 : std::cout << "Got int: " << std::get<0>(result) << "\n";
543 : else
544 : std::cout << "Got string: " << std::get<1>(result) << "\n";
545 : }
546 : @endcode
547 :
548 : @param as Awaitables to race concurrently (at least one required; each
549 : must satisfy IoAwaitable).
550 : @return A task yielding a std::variant with one alternative per awaitable.
551 : Use .index() to identify the winner. Void awaitables contribute
552 : std::monostate.
553 :
554 : @throws Rethrows the winner's exception if the winning task threw an exception.
555 :
556 : @par Remarks
557 : Awaitables are moved into the coroutine frame; original objects become
558 : empty after the call. The variant preserves one alternative per input
559 : task. Use .index() to determine which awaitable completed first.
560 : Void awaitables contribute std::monostate to the variant.
561 :
562 : @see when_all, IoAwaitable
563 : */
564 : template<IoAwaitable... As>
565 : requires (sizeof...(As) > 0)
566 51 : [[nodiscard]] auto when_any(As... as)
567 : -> task<std::variant<void_to_monostate_t<awaitable_result_t<As>>...>>
568 : {
569 : detail::when_any_state<awaitable_result_t<As>...> state;
570 : std::tuple<As...> awaitable_tuple(std::move(as)...);
571 :
572 : co_await detail::when_any_launcher<As...>(&awaitable_tuple, &state);
573 :
574 : if(state.core_.winner_exception_)
575 : std::rethrow_exception(state.core_.winner_exception_);
576 :
577 : co_return std::move(*state.result_);
578 102 : }
579 :
580 : /** Concept for ranges of full I/O awaitables.
581 :
582 : A range satisfies `IoAwaitableRange` if it is a sized input range
583 : whose value type satisfies @ref IoAwaitable. This enables when_any
584 : to accept any container or view of awaitables, not just std::vector.
585 :
586 : @tparam R The range type.
587 :
588 : @par Requirements
589 : @li `R` must satisfy `std::ranges::input_range`
590 : @li `R` must satisfy `std::ranges::sized_range`
591 : @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
592 :
593 : @par Syntactic Requirements
594 : Given `r` of type `R`:
595 : @li `std::ranges::begin(r)` is valid
596 : @li `std::ranges::end(r)` is valid
597 : @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
598 : @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
599 :
600 : @par Example
601 : @code
602 : template<IoAwaitableRange R>
603 : task<void> race_all(R&& awaitables) {
604 : auto winner = co_await when_any(std::forward<R>(awaitables));
605 : // Process winner...
606 : }
607 : @endcode
608 :
609 : @see when_any, IoAwaitable
610 : */
611 : template<typename R>
612 : concept IoAwaitableRange =
613 : std::ranges::input_range<R> &&
614 : std::ranges::sized_range<R> &&
615 : IoAwaitable<std::ranges::range_value_t<R>>;
616 :
617 : namespace detail {
618 :
619 : /** Shared state for homogeneous when_any (range overload).
620 :
621 : Uses composition with when_any_core for shared functionality.
622 : Simpler than heterogeneous: optional<T> instead of variant, vector
623 : instead of array for runner handles.
624 : */
625 : template<typename T>
626 : struct when_any_homogeneous_state
627 : {
628 : when_any_core core_;
629 : std::optional<T> result_;
630 : std::vector<std::coroutine_handle<>> runner_handles_;
631 :
632 19 : explicit when_any_homogeneous_state(std::size_t count)
633 19 : : core_(count)
634 38 : , runner_handles_(count)
635 : {
636 19 : }
637 :
638 : // Runners self-destruct in final_suspend. No destruction needed here.
639 :
640 : /** @pre core_.try_win() returned true. */
641 17 : void set_winner_result(T value)
642 : noexcept(std::is_nothrow_move_constructible_v<T>)
643 : {
644 17 : result_.emplace(std::move(value));
645 17 : }
646 : };
647 :
648 : /** Specialization for void tasks (no result storage needed). */
649 : template<>
650 : struct when_any_homogeneous_state<void>
651 : {
652 : when_any_core core_;
653 : std::vector<std::coroutine_handle<>> runner_handles_;
654 :
655 3 : explicit when_any_homogeneous_state(std::size_t count)
656 3 : : core_(count)
657 6 : , runner_handles_(count)
658 : {
659 3 : }
660 :
661 : // Runners self-destruct in final_suspend. No destruction needed here.
662 :
663 : // No set_winner_result - void tasks have no result to store
664 : };
665 :
666 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
667 : template<IoAwaitableRange Range>
668 : class when_any_homogeneous_launcher
669 : {
670 : using Awaitable = std::ranges::range_value_t<Range>;
671 : using T = awaitable_result_t<Awaitable>;
672 :
673 : Range* range_;
674 : when_any_homogeneous_state<T>* state_;
675 :
676 : public:
677 22 : when_any_homogeneous_launcher(
678 : Range* range,
679 : when_any_homogeneous_state<T>* state)
680 22 : : range_(range)
681 22 : , state_(state)
682 : {
683 22 : }
684 :
685 22 : bool await_ready() const noexcept
686 : {
687 22 : return std::ranges::empty(*range_);
688 : }
689 :
690 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
691 : destroys this object before await_suspend returns. Must not reference
692 : `this` after dispatching begins.
693 :
694 : Two-phase approach:
695 : 1. Create all runners (safe - no dispatch yet)
696 : 2. Dispatch all runners (any may complete synchronously)
697 : */
698 22 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
699 : {
700 22 : state_->core_.continuation_ = continuation;
701 22 : state_->core_.caller_env_ = caller_env;
702 :
703 22 : if(caller_env->stop_token.stop_possible())
704 : {
705 14 : state_->core_.parent_stop_callback_.emplace(
706 7 : caller_env->stop_token,
707 7 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
708 :
709 7 : if(caller_env->stop_token.stop_requested())
710 4 : state_->core_.stop_source_.request_stop();
711 : }
712 :
713 22 : auto token = state_->core_.stop_source_.get_token();
714 :
715 : // Phase 1: Create all runners without dispatching.
716 : // This iterates over *range_ safely because no runners execute yet.
717 22 : std::size_t index = 0;
718 107 : for(auto&& a : *range_)
719 : {
720 85 : auto runner = make_when_any_runner(
721 85 : std::move(a), state_, index);
722 :
723 85 : auto h = runner.release();
724 85 : h.promise().state_ = state_;
725 85 : h.promise().index_ = index;
726 85 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
727 :
728 85 : state_->runner_handles_[index] = std::coroutine_handle<>{h};
729 85 : ++index;
730 : }
731 :
732 : // Phase 2: Post all runners. Any may complete synchronously.
733 : // After last post, state_ and this may be destroyed.
734 : // Use raw pointer/count captured before posting.
735 22 : std::coroutine_handle<>* handles = state_->runner_handles_.data();
736 22 : std::size_t count = state_->runner_handles_.size();
737 107 : for(std::size_t i = 0; i < count; ++i)
738 85 : caller_env->executor.post(handles[i]);
739 :
740 44 : return std::noop_coroutine();
741 107 : }
742 :
743 22 : void await_resume() const noexcept
744 : {
745 22 : }
746 : };
747 :
748 : } // namespace detail
749 :
750 : /** Wait for the first awaitable to complete (range overload).
751 :
752 : Races a range of awaitables with the same result type. Accepts any
753 : sized input range of IoAwaitable types, enabling use with arrays,
754 : spans, or custom containers.
755 :
756 : @par Suspends
757 : The calling coroutine suspends when co_await is invoked. All awaitables
758 : in the range are launched concurrently and execute in parallel. The
759 : coroutine resumes only after all awaitables have completed, even though
760 : the winner is determined by the first to finish.
761 :
762 : @par Completion Conditions
763 : @li Winner is determined when the first awaitable completes (success or exception)
764 : @li Only one task can claim winner status via atomic compare-exchange
765 : @li Once a winner exists, stop is requested for all remaining siblings
766 : @li Parent coroutine resumes only after all siblings acknowledge completion
767 : @li The winner's index and result are returned; if the winner threw, the exception is rethrown
768 :
769 : @par Cancellation Semantics
770 : Cancellation is supported via stop_token propagated through the
771 : IoAwaitable protocol:
772 : @li Each child awaitable receives a stop_token derived from a shared stop_source
773 : @li When the parent's stop token is activated, the stop is forwarded to all children
774 : @li When a winner is determined, stop_source_.request_stop() is called immediately
775 : @li Siblings must handle cancellation gracefully and complete before parent resumes
776 : @li Stop requests are cooperative; tasks must check and respond to them
777 :
778 : @par Concurrency/Overlap
779 : All awaitables are launched concurrently before any can complete.
780 : The launcher iterates through the range, starting each task on the
781 : caller's executor. Tasks may execute in parallel on multi-threaded
782 : executors or interleave on single-threaded executors. There is no
783 : guaranteed ordering of task completion.
784 :
785 : @par Notable Error Conditions
786 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
787 : @li Winner exception: if the winning task threw, that exception is rethrown
788 : @li Non-winner exceptions: silently discarded (only winner's result matters)
789 : @li Cancellation: tasks may complete via cancellation without throwing
790 :
791 : @par Example
792 : @code
793 : task<void> example() {
794 : std::array<task<Response>, 3> requests = {
795 : fetch_from_server(0),
796 : fetch_from_server(1),
797 : fetch_from_server(2)
798 : };
799 :
800 : auto [index, response] = co_await when_any(std::move(requests));
801 : }
802 : @endcode
803 :
804 : @par Example with Vector
805 : @code
806 : task<Response> fetch_fastest(std::vector<Server> const& servers) {
807 : std::vector<task<Response>> requests;
808 : for (auto const& server : servers)
809 : requests.push_back(fetch_from(server));
810 :
811 : auto [index, response] = co_await when_any(std::move(requests));
812 : co_return response;
813 : }
814 : @endcode
815 :
816 : @tparam R Range type satisfying IoAwaitableRange.
817 : @param awaitables Range of awaitables to race concurrently (must not be empty).
818 : @return A task yielding a pair of (winner_index, result).
819 :
820 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
821 : @throws Rethrows the winner's exception if the winning task threw an exception.
822 :
823 : @par Remarks
824 : Elements are moved from the range; for lvalue ranges, the original
825 : container will have moved-from elements after this call. The range
826 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
827 : the variadic overload, no variant wrapper is needed since all tasks
828 : share the same return type.
829 :
830 : @see when_any, IoAwaitableRange
831 : */
832 : template<IoAwaitableRange R>
833 : requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
834 21 : [[nodiscard]] auto when_any(R&& awaitables)
835 : -> task<std::pair<std::size_t, awaitable_result_t<std::ranges::range_value_t<R>>>>
836 : {
837 : using Awaitable = std::ranges::range_value_t<R>;
838 : using T = awaitable_result_t<Awaitable>;
839 : using result_type = std::pair<std::size_t, T>;
840 : using OwnedRange = std::remove_cvref_t<R>;
841 :
842 : auto count = std::ranges::size(awaitables);
843 : if(count == 0)
844 : throw std::invalid_argument("when_any requires at least one awaitable");
845 :
846 : // Move/copy range onto coroutine frame to ensure lifetime
847 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
848 :
849 : detail::when_any_homogeneous_state<T> state(count);
850 :
851 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
852 :
853 : if(state.core_.winner_exception_)
854 : std::rethrow_exception(state.core_.winner_exception_);
855 :
856 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
857 42 : }
858 :
859 : /** Wait for the first awaitable to complete (void range overload).
860 :
861 : Races a range of void-returning awaitables. Since void awaitables have
862 : no result value, only the winner's index is returned.
863 :
864 : @par Suspends
865 : The calling coroutine suspends when co_await is invoked. All awaitables
866 : in the range are launched concurrently and execute in parallel. The
867 : coroutine resumes only after all awaitables have completed, even though
868 : the winner is determined by the first to finish.
869 :
870 : @par Completion Conditions
871 : @li Winner is determined when the first awaitable completes (success or exception)
872 : @li Only one task can claim winner status via atomic compare-exchange
873 : @li Once a winner exists, stop is requested for all remaining siblings
874 : @li Parent coroutine resumes only after all siblings acknowledge completion
875 : @li The winner's index is returned; if the winner threw, the exception is rethrown
876 :
877 : @par Cancellation Semantics
878 : Cancellation is supported via stop_token propagated through the
879 : IoAwaitable protocol:
880 : @li Each child awaitable receives a stop_token derived from a shared stop_source
881 : @li When the parent's stop token is activated, the stop is forwarded to all children
882 : @li When a winner is determined, stop_source_.request_stop() is called immediately
883 : @li Siblings must handle cancellation gracefully and complete before parent resumes
884 : @li Stop requests are cooperative; tasks must check and respond to them
885 :
886 : @par Concurrency/Overlap
887 : All awaitables are launched concurrently before any can complete.
888 : The launcher iterates through the range, starting each task on the
889 : caller's executor. Tasks may execute in parallel on multi-threaded
890 : executors or interleave on single-threaded executors. There is no
891 : guaranteed ordering of task completion.
892 :
893 : @par Notable Error Conditions
894 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
895 : @li Winner exception: if the winning task threw, that exception is rethrown
896 : @li Non-winner exceptions: silently discarded (only winner's result matters)
897 : @li Cancellation: tasks may complete via cancellation without throwing
898 :
899 : @par Example
900 : @code
901 : task<void> example() {
902 : std::vector<task<void>> tasks;
903 : for (int i = 0; i < 5; ++i)
904 : tasks.push_back(background_work(i));
905 :
906 : std::size_t winner = co_await when_any(std::move(tasks));
907 : // winner is the index of the first task to complete
908 : }
909 : @endcode
910 :
911 : @par Example with Timeout
912 : @code
913 : task<void> with_timeout() {
914 : std::vector<task<void>> tasks;
915 : tasks.push_back(long_running_operation());
916 : tasks.push_back(delay(std::chrono::seconds(5)));
917 :
918 : std::size_t winner = co_await when_any(std::move(tasks));
919 : if (winner == 1) {
920 : // Timeout occurred
921 : }
922 : }
923 : @endcode
924 :
925 : @tparam R Range type satisfying IoAwaitableRange with void result.
926 : @param awaitables Range of void awaitables to race concurrently (must not be empty).
927 : @return A task yielding the winner's index (zero-based).
928 :
929 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
930 : @throws Rethrows the winner's exception if the winning task threw an exception.
931 :
932 : @par Remarks
933 : Elements are moved from the range; for lvalue ranges, the original
934 : container will have moved-from elements after this call. The range
935 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
936 : the non-void overload, no result storage is needed since void tasks
937 : produce no value.
938 :
939 : @see when_any, IoAwaitableRange
940 : */
941 : template<IoAwaitableRange R>
942 : requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
943 3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
944 : {
945 : using OwnedRange = std::remove_cvref_t<R>;
946 :
947 : auto count = std::ranges::size(awaitables);
948 : if(count == 0)
949 : throw std::invalid_argument("when_any requires at least one awaitable");
950 :
951 : // Move/copy range onto coroutine frame to ensure lifetime
952 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
953 :
954 : detail::when_any_homogeneous_state<void> state(count);
955 :
956 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
957 :
958 : if(state.core_.winner_exception_)
959 : std::rethrow_exception(state.core_.winner_exception_);
960 :
961 : co_return state.core_.winner_index_;
962 6 : }
963 :
964 : } // namespace capy
965 : } // namespace boost
966 :
967 : #endif
|