TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_array.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <boost/capy/concept/write_sink.hpp>
20 : #include <coroutine>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/io_task.hpp>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any WriteSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref WriteSink concept, enabling runtime polymorphism for
42 : sink write operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper supports two construction modes:
46 : - **Owning**: Pass by value to transfer ownership. The wrapper
47 : allocates storage and owns the sink.
48 : - **Reference**: Pass a pointer to wrap without ownership. The
49 : pointed-to sink must outlive this wrapper.
50 :
51 : @par Awaitable Preallocation
52 : The constructor preallocates storage for the type-erased awaitable.
53 : This reserves all virtual address space at server startup
54 : so memory usage can be measured up front, rather than
55 : allocating piecemeal as traffic arrives.
56 :
57 : @par Immediate Completion
58 : Operations complete immediately without suspending when the
59 : buffer sequence is empty, or when the underlying sink's
60 : awaitable reports readiness via `await_ready`.
61 :
62 : @par Thread Safety
63 : Not thread-safe. Concurrent operations on the same wrapper
64 : are undefined behavior.
65 :
66 : @par Example
67 : @code
68 : // Owning - takes ownership of the sink
69 : any_write_sink ws(some_sink{args...});
70 :
71 : // Reference - wraps without ownership
72 : some_sink sink;
73 : any_write_sink ws(&sink);
74 :
75 : const_buffer buf(data, size);
76 : auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77 : auto [ec2] = co_await ws.write_eof();
78 : @endcode
79 :
80 : @see any_write_stream, WriteSink
81 : */
82 : class any_write_sink
83 : {
84 : struct vtable;
85 : struct write_awaitable_ops;
86 : struct eof_awaitable_ops;
87 :
88 : template<WriteSink S>
89 : struct vtable_for_impl;
90 :
91 : void* sink_ = nullptr;
92 : vtable const* vt_ = nullptr;
93 : void* cached_awaitable_ = nullptr;
94 : void* storage_ = nullptr;
95 : write_awaitable_ops const* active_write_ops_ = nullptr;
96 : eof_awaitable_ops const* active_eof_ops_ = nullptr;
97 :
98 : public:
99 : /** Destructor.
100 :
101 : Destroys the owned sink (if any) and releases the cached
102 : awaitable storage.
103 : */
104 : ~any_write_sink();
105 :
106 : /** Construct a default instance.
107 :
108 : Constructs an empty wrapper. Operations on a default-constructed
109 : wrapper result in undefined behavior.
110 : */
111 : any_write_sink() = default;
112 :
113 : /** Non-copyable.
114 :
115 : The awaitable cache is per-instance and cannot be shared.
116 : */
117 : any_write_sink(any_write_sink const&) = delete;
118 : any_write_sink& operator=(any_write_sink const&) = delete;
119 :
120 : /** Construct by moving.
121 :
122 : Transfers ownership of the wrapped sink (if owned) and
123 : cached awaitable storage from `other`. After the move, `other` is
124 : in a default-constructed state.
125 :
126 : @param other The wrapper to move from.
127 : */
128 HIT 1 : any_write_sink(any_write_sink&& other) noexcept
129 1 : : sink_(std::exchange(other.sink_, nullptr))
130 1 : , vt_(std::exchange(other.vt_, nullptr))
131 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132 1 : , storage_(std::exchange(other.storage_, nullptr))
133 1 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134 1 : , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135 : {
136 1 : }
137 :
138 : /** Assign by moving.
139 :
140 : Destroys any owned sink and releases existing resources,
141 : then transfers ownership from `other`.
142 :
143 : @param other The wrapper to move from.
144 : @return Reference to this wrapper.
145 : */
146 : any_write_sink&
147 : operator=(any_write_sink&& other) noexcept;
148 :
149 : /** Construct by taking ownership of a WriteSink.
150 :
151 : Allocates storage and moves the sink into this wrapper.
152 : The wrapper owns the sink and will destroy it.
153 :
154 : @param s The sink to take ownership of.
155 : */
156 : template<WriteSink S>
157 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158 : any_write_sink(S s);
159 :
160 : /** Construct by wrapping a WriteSink without ownership.
161 :
162 : Wraps the given sink by pointer. The sink must remain
163 : valid for the lifetime of this wrapper.
164 :
165 : @param s Pointer to the sink to wrap.
166 : */
167 : template<WriteSink S>
168 : any_write_sink(S* s);
169 :
170 : /** Check if the wrapper contains a valid sink.
171 :
172 : @return `true` if wrapping a sink, `false` if default-constructed
173 : or moved-from.
174 : */
175 : bool
176 15 : has_value() const noexcept
177 : {
178 15 : return sink_ != nullptr;
179 : }
180 :
181 : /** Check if the wrapper contains a valid sink.
182 :
183 : @return `true` if wrapping a sink, `false` if default-constructed
184 : or moved-from.
185 : */
186 : explicit
187 2 : operator bool() const noexcept
188 : {
189 2 : return has_value();
190 : }
191 :
192 : /** Initiate a partial write operation.
193 :
194 : Attempt to write up to `buffer_size( buffers )` bytes from
195 : the provided buffer sequence. May consume less than the
196 : full sequence.
197 :
198 : @param buffers The buffer sequence containing data to write.
199 :
200 : @return An awaitable that await-returns `(error_code,std::size_t)`.
201 :
202 : @par Immediate Completion
203 : The operation completes immediately without suspending
204 : the calling coroutine when:
205 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
206 : @li The underlying sink's awaitable reports immediate
207 : readiness via `await_ready`.
208 :
209 : @note This is a partial operation and may not process the
210 : entire buffer sequence. Use @ref write for guaranteed
211 : complete transfer.
212 :
213 : @par Preconditions
214 : The wrapper must contain a valid sink (`has_value() == true`).
215 : */
216 : template<ConstBufferSequence CB>
217 : auto
218 : write_some(CB buffers);
219 :
220 : /** Initiate a complete write operation.
221 :
222 : Writes data from the provided buffer sequence. The operation
223 : completes when all bytes have been consumed, or an error
224 : occurs. Forwards to the underlying sink's `write` operation,
225 : windowed through @ref buffer_param when the sequence exceeds
226 : the per-call buffer limit.
227 :
228 : @param buffers The buffer sequence containing data to write.
229 :
230 : @return An awaitable that await-returns `(error_code,std::size_t)`.
231 :
232 : @par Immediate Completion
233 : The operation completes immediately without suspending
234 : the calling coroutine when:
235 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
236 : @li Every underlying `write` call completes
237 : immediately (the wrapped sink reports readiness
238 : via `await_ready` on each iteration).
239 :
240 : @par Preconditions
241 : The wrapper must contain a valid sink (`has_value() == true`).
242 : */
243 : template<ConstBufferSequence CB>
244 : io_task<std::size_t>
245 : write(CB buffers);
246 :
247 : /** Atomically write data and signal end-of-stream.
248 :
249 : Writes all data from the buffer sequence and then signals
250 : end-of-stream. The implementation decides how to partition
251 : the data across calls to the underlying sink's @ref write
252 : and `write_eof`. When the caller's buffer sequence is
253 : non-empty, the final call to the underlying sink is always
254 : `write_eof` with a non-empty buffer sequence. When the
255 : caller's buffer sequence is empty, only `write_eof()` with
256 : no data is called.
257 :
258 : @param buffers The buffer sequence containing data to write.
259 :
260 : @return An awaitable that await-returns `(error_code,std::size_t)`.
261 :
262 : @par Immediate Completion
263 : The operation completes immediately without suspending
264 : the calling coroutine when:
265 : @li The buffer sequence is empty. Only the @ref write_eof()
266 : call is performed.
267 : @li All underlying operations complete immediately (the
268 : wrapped sink reports readiness via `await_ready`).
269 :
270 : @par Preconditions
271 : The wrapper must contain a valid sink (`has_value() == true`).
272 : */
273 : template<ConstBufferSequence CB>
274 : io_task<std::size_t>
275 : write_eof(CB buffers);
276 :
277 : /** Signal end of data.
278 :
279 : Indicates that no more data will be written to the sink.
280 : The operation completes when the sink is finalized, or
281 : an error occurs.
282 :
283 : @return An awaitable that await-returns `(error_code)`.
284 :
285 : @par Immediate Completion
286 : The operation completes immediately without suspending
287 : the calling coroutine when the underlying sink's awaitable
288 : reports immediate readiness via `await_ready`.
289 :
290 : @par Preconditions
291 : The wrapper must contain a valid sink (`has_value() == true`).
292 : */
293 : auto
294 : write_eof();
295 :
296 : protected:
297 : /** Rebind to a new sink after move.
298 :
299 : Updates the internal pointer to reference a new sink object.
300 : Used by owning wrappers after move assignment when the owned
301 : object has moved to a new location.
302 :
303 : @param new_sink The new sink to bind to. Must be the same
304 : type as the original sink.
305 :
306 : @note Terminates if called with a sink of different type
307 : than the original.
308 : */
309 : template<WriteSink S>
310 : void
311 : rebind(S& new_sink) noexcept
312 : {
313 : if(vt_ != &vtable_for_impl<S>::value)
314 : std::terminate();
315 : sink_ = &new_sink;
316 : }
317 :
318 : private:
319 : auto
320 : write_some_(std::span<const_buffer const> buffers);
321 :
322 : auto
323 : write_(std::span<const_buffer const> buffers);
324 :
325 : auto
326 : write_eof_buffers_(std::span<const_buffer const> buffers);
327 : };
328 :
329 : struct any_write_sink::write_awaitable_ops
330 : {
331 : bool (*await_ready)(void*);
332 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
333 : io_result<std::size_t> (*await_resume)(void*);
334 : void (*destroy)(void*) noexcept;
335 : };
336 :
337 : struct any_write_sink::eof_awaitable_ops
338 : {
339 : bool (*await_ready)(void*);
340 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
341 : io_result<> (*await_resume)(void*);
342 : void (*destroy)(void*) noexcept;
343 : };
344 :
345 : struct any_write_sink::vtable
346 : {
347 : write_awaitable_ops const* (*construct_write_some_awaitable)(
348 : void* sink,
349 : void* storage,
350 : std::span<const_buffer const> buffers);
351 : write_awaitable_ops const* (*construct_write_awaitable)(
352 : void* sink,
353 : void* storage,
354 : std::span<const_buffer const> buffers);
355 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
356 : void* sink,
357 : void* storage,
358 : std::span<const_buffer const> buffers);
359 : eof_awaitable_ops const* (*construct_eof_awaitable)(
360 : void* sink,
361 : void* storage);
362 : std::size_t awaitable_size;
363 : std::size_t awaitable_align;
364 : void (*destroy)(void*) noexcept;
365 : };
366 :
367 : template<WriteSink S>
368 : struct any_write_sink::vtable_for_impl
369 : {
370 : using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
371 : std::span<const_buffer const>{}));
372 : using WriteAwaitable = decltype(std::declval<S&>().write(
373 : std::span<const_buffer const>{}));
374 : using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
375 : std::span<const_buffer const>{}));
376 : using EofAwaitable = decltype(std::declval<S&>().write_eof());
377 :
378 : static void
379 6 : do_destroy_impl(void* sink) noexcept
380 : {
381 6 : static_cast<S*>(sink)->~S();
382 6 : }
383 :
384 : static write_awaitable_ops const*
385 40 : construct_write_some_awaitable_impl(
386 : void* sink,
387 : void* storage,
388 : std::span<const_buffer const> buffers)
389 : {
390 40 : auto& s = *static_cast<S*>(sink);
391 40 : ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
392 :
393 : static constexpr write_awaitable_ops ops = {
394 40 : +[](void* p) {
395 40 : return static_cast<WriteSomeAwaitable*>(p)->await_ready();
396 : },
397 2 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
398 2 : return detail::call_await_suspend(
399 2 : static_cast<WriteSomeAwaitable*>(p), h, env);
400 : },
401 38 : +[](void* p) {
402 38 : return static_cast<WriteSomeAwaitable*>(p)->await_resume();
403 : },
404 42 : +[](void* p) noexcept {
405 2 : static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
406 : }
407 : };
408 40 : return &ops;
409 : }
410 :
411 : static write_awaitable_ops const*
412 78 : construct_write_awaitable_impl(
413 : void* sink,
414 : void* storage,
415 : std::span<const_buffer const> buffers)
416 : {
417 78 : auto& s = *static_cast<S*>(sink);
418 78 : ::new(storage) WriteAwaitable(s.write(buffers));
419 :
420 : static constexpr write_awaitable_ops ops = {
421 78 : +[](void* p) {
422 78 : return static_cast<WriteAwaitable*>(p)->await_ready();
423 : },
424 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
425 0 : return detail::call_await_suspend(
426 0 : static_cast<WriteAwaitable*>(p), h, env);
427 : },
428 HIT 78 : +[](void* p) {
429 78 : return static_cast<WriteAwaitable*>(p)->await_resume();
430 : },
431 78 : +[](void* p) noexcept {
432 MIS 0 : static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
433 : }
434 : };
435 HIT 78 : return &ops;
436 : }
437 :
438 : static write_awaitable_ops const*
439 16 : construct_write_eof_buffers_awaitable_impl(
440 : void* sink,
441 : void* storage,
442 : std::span<const_buffer const> buffers)
443 : {
444 16 : auto& s = *static_cast<S*>(sink);
445 16 : ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
446 :
447 : static constexpr write_awaitable_ops ops = {
448 16 : +[](void* p) {
449 16 : return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
450 : },
451 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
452 0 : return detail::call_await_suspend(
453 0 : static_cast<WriteEofBuffersAwaitable*>(p), h, env);
454 : },
455 HIT 16 : +[](void* p) {
456 16 : return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
457 : },
458 16 : +[](void* p) noexcept {
459 MIS 0 : static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
460 : }
461 : };
462 HIT 16 : return &ops;
463 : }
464 :
465 : static eof_awaitable_ops const*
466 17 : construct_eof_awaitable_impl(
467 : void* sink,
468 : void* storage)
469 : {
470 17 : auto& s = *static_cast<S*>(sink);
471 17 : ::new(storage) EofAwaitable(s.write_eof());
472 :
473 : static constexpr eof_awaitable_ops ops = {
474 17 : +[](void* p) {
475 17 : return static_cast<EofAwaitable*>(p)->await_ready();
476 : },
477 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
478 1 : return detail::call_await_suspend(
479 1 : static_cast<EofAwaitable*>(p), h, env);
480 : },
481 16 : +[](void* p) {
482 16 : return static_cast<EofAwaitable*>(p)->await_resume();
483 : },
484 18 : +[](void* p) noexcept {
485 1 : static_cast<EofAwaitable*>(p)->~EofAwaitable();
486 : }
487 : };
488 17 : return &ops;
489 : }
490 :
491 : static constexpr std::size_t max4(
492 : std::size_t a, std::size_t b,
493 : std::size_t c, std::size_t d) noexcept
494 : {
495 : std::size_t ab = a > b ? a : b;
496 : std::size_t cd = c > d ? c : d;
497 : return ab > cd ? ab : cd;
498 : }
499 :
500 : static constexpr std::size_t max_awaitable_size =
501 : max4(sizeof(WriteSomeAwaitable),
502 : sizeof(WriteAwaitable),
503 : sizeof(WriteEofBuffersAwaitable),
504 : sizeof(EofAwaitable));
505 :
506 : static constexpr std::size_t max_awaitable_align =
507 : max4(alignof(WriteSomeAwaitable),
508 : alignof(WriteAwaitable),
509 : alignof(WriteEofBuffersAwaitable),
510 : alignof(EofAwaitable));
511 :
512 : static constexpr vtable value = {
513 : &construct_write_some_awaitable_impl,
514 : &construct_write_awaitable_impl,
515 : &construct_write_eof_buffers_awaitable_impl,
516 : &construct_eof_awaitable_impl,
517 : max_awaitable_size,
518 : max_awaitable_align,
519 : &do_destroy_impl
520 : };
521 : };
522 :
523 : inline
524 129 : any_write_sink::~any_write_sink()
525 : {
526 129 : if(storage_)
527 : {
528 6 : vt_->destroy(sink_);
529 6 : ::operator delete(storage_);
530 : }
531 129 : if(cached_awaitable_)
532 : {
533 124 : if(active_write_ops_)
534 1 : active_write_ops_->destroy(cached_awaitable_);
535 123 : else if(active_eof_ops_)
536 1 : active_eof_ops_->destroy(cached_awaitable_);
537 124 : ::operator delete(cached_awaitable_);
538 : }
539 129 : }
540 :
541 : inline any_write_sink&
542 2 : any_write_sink::operator=(any_write_sink&& other) noexcept
543 : {
544 2 : if(this != &other)
545 : {
546 2 : if(storage_)
547 : {
548 MIS 0 : vt_->destroy(sink_);
549 0 : ::operator delete(storage_);
550 : }
551 HIT 2 : if(cached_awaitable_)
552 : {
553 1 : if(active_write_ops_)
554 1 : active_write_ops_->destroy(cached_awaitable_);
555 MIS 0 : else if(active_eof_ops_)
556 0 : active_eof_ops_->destroy(cached_awaitable_);
557 HIT 1 : ::operator delete(cached_awaitable_);
558 : }
559 2 : sink_ = std::exchange(other.sink_, nullptr);
560 2 : vt_ = std::exchange(other.vt_, nullptr);
561 2 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
562 2 : storage_ = std::exchange(other.storage_, nullptr);
563 2 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
564 2 : active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
565 : }
566 2 : return *this;
567 : }
568 :
569 : template<WriteSink S>
570 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
571 6 : any_write_sink::any_write_sink(S s)
572 6 : : vt_(&vtable_for_impl<S>::value)
573 : {
574 : struct guard {
575 : any_write_sink* self;
576 : bool committed = false;
577 6 : ~guard() {
578 6 : if(!committed && self->storage_) {
579 MIS 0 : self->vt_->destroy(self->sink_);
580 0 : ::operator delete(self->storage_);
581 0 : self->storage_ = nullptr;
582 0 : self->sink_ = nullptr;
583 : }
584 HIT 6 : }
585 6 : } g{this};
586 :
587 6 : storage_ = ::operator new(sizeof(S));
588 6 : sink_ = ::new(storage_) S(std::move(s));
589 :
590 : // Preallocate the awaitable storage (sized for max of write/eof)
591 6 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
592 :
593 6 : g.committed = true;
594 6 : }
595 :
596 : template<WriteSink S>
597 119 : any_write_sink::any_write_sink(S* s)
598 119 : : sink_(s)
599 119 : , vt_(&vtable_for_impl<S>::value)
600 : {
601 : // Preallocate the awaitable storage (sized for max of write/eof)
602 119 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
603 119 : }
604 :
605 : inline auto
606 : any_write_sink::write_some_(
607 : std::span<const_buffer const> buffers)
608 : {
609 : struct awaitable
610 : {
611 : any_write_sink* self_;
612 : std::span<const_buffer const> buffers_;
613 :
614 : bool
615 : await_ready() const noexcept
616 : {
617 : return false;
618 : }
619 :
620 : std::coroutine_handle<>
621 : await_suspend(std::coroutine_handle<> h, io_env const* env)
622 : {
623 : self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
624 : self_->sink_,
625 : self_->cached_awaitable_,
626 : buffers_);
627 :
628 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
629 : return h;
630 :
631 : return self_->active_write_ops_->await_suspend(
632 : self_->cached_awaitable_, h, env);
633 : }
634 :
635 : io_result<std::size_t>
636 : await_resume()
637 : {
638 : struct guard {
639 : any_write_sink* self;
640 : ~guard() {
641 : self->active_write_ops_->destroy(self->cached_awaitable_);
642 : self->active_write_ops_ = nullptr;
643 : }
644 : } g{self_};
645 : return self_->active_write_ops_->await_resume(
646 : self_->cached_awaitable_);
647 : }
648 : };
649 : return awaitable{this, buffers};
650 : }
651 :
652 : inline auto
653 78 : any_write_sink::write_(
654 : std::span<const_buffer const> buffers)
655 : {
656 : struct awaitable
657 : {
658 : any_write_sink* self_;
659 : std::span<const_buffer const> buffers_;
660 :
661 : bool
662 78 : await_ready() const noexcept
663 : {
664 78 : return false;
665 : }
666 :
667 : std::coroutine_handle<>
668 78 : await_suspend(std::coroutine_handle<> h, io_env const* env)
669 : {
670 156 : self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
671 78 : self_->sink_,
672 78 : self_->cached_awaitable_,
673 : buffers_);
674 :
675 78 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
676 78 : return h;
677 :
678 MIS 0 : return self_->active_write_ops_->await_suspend(
679 0 : self_->cached_awaitable_, h, env);
680 : }
681 :
682 : io_result<std::size_t>
683 HIT 78 : await_resume()
684 : {
685 : struct guard {
686 : any_write_sink* self;
687 78 : ~guard() {
688 78 : self->active_write_ops_->destroy(self->cached_awaitable_);
689 78 : self->active_write_ops_ = nullptr;
690 78 : }
691 78 : } g{self_};
692 78 : return self_->active_write_ops_->await_resume(
693 135 : self_->cached_awaitable_);
694 78 : }
695 : };
696 78 : return awaitable{this, buffers};
697 : }
698 :
699 : inline auto
700 17 : any_write_sink::write_eof()
701 : {
702 : struct awaitable
703 : {
704 : any_write_sink* self_;
705 :
706 : bool
707 17 : await_ready() const noexcept
708 : {
709 17 : return false;
710 : }
711 :
712 : std::coroutine_handle<>
713 17 : await_suspend(std::coroutine_handle<> h, io_env const* env)
714 : {
715 : // Construct the underlying awaitable into cached storage
716 34 : self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
717 17 : self_->sink_,
718 17 : self_->cached_awaitable_);
719 :
720 : // Check if underlying is immediately ready
721 17 : if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
722 16 : return h;
723 :
724 : // Forward to underlying awaitable
725 1 : return self_->active_eof_ops_->await_suspend(
726 1 : self_->cached_awaitable_, h, env);
727 : }
728 :
729 : io_result<>
730 16 : await_resume()
731 : {
732 : struct guard {
733 : any_write_sink* self;
734 16 : ~guard() {
735 16 : self->active_eof_ops_->destroy(self->cached_awaitable_);
736 16 : self->active_eof_ops_ = nullptr;
737 16 : }
738 16 : } g{self_};
739 16 : return self_->active_eof_ops_->await_resume(
740 27 : self_->cached_awaitable_);
741 16 : }
742 : };
743 17 : return awaitable{this};
744 : }
745 :
746 : inline auto
747 16 : any_write_sink::write_eof_buffers_(
748 : std::span<const_buffer const> buffers)
749 : {
750 : struct awaitable
751 : {
752 : any_write_sink* self_;
753 : std::span<const_buffer const> buffers_;
754 :
755 : bool
756 16 : await_ready() const noexcept
757 : {
758 16 : return false;
759 : }
760 :
761 : std::coroutine_handle<>
762 16 : await_suspend(std::coroutine_handle<> h, io_env const* env)
763 : {
764 32 : self_->active_write_ops_ =
765 32 : self_->vt_->construct_write_eof_buffers_awaitable(
766 16 : self_->sink_,
767 16 : self_->cached_awaitable_,
768 : buffers_);
769 :
770 16 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
771 16 : return h;
772 :
773 MIS 0 : return self_->active_write_ops_->await_suspend(
774 0 : self_->cached_awaitable_, h, env);
775 : }
776 :
777 : io_result<std::size_t>
778 HIT 16 : await_resume()
779 : {
780 : struct guard {
781 : any_write_sink* self;
782 16 : ~guard() {
783 16 : self->active_write_ops_->destroy(self->cached_awaitable_);
784 16 : self->active_write_ops_ = nullptr;
785 16 : }
786 16 : } g{self_};
787 16 : return self_->active_write_ops_->await_resume(
788 27 : self_->cached_awaitable_);
789 16 : }
790 : };
791 16 : return awaitable{this, buffers};
792 : }
793 :
794 : template<ConstBufferSequence CB>
795 : auto
796 42 : any_write_sink::write_some(CB buffers)
797 : {
798 : struct awaitable
799 : {
800 : any_write_sink* self_;
801 : const_buffer_array<detail::max_iovec_> ba_;
802 :
803 42 : awaitable(
804 : any_write_sink* self,
805 : CB const& buffers)
806 42 : : self_(self)
807 42 : , ba_(buffers)
808 : {
809 42 : }
810 :
811 : bool
812 42 : await_ready() const noexcept
813 : {
814 42 : return ba_.to_span().empty();
815 : }
816 :
817 : std::coroutine_handle<>
818 40 : await_suspend(std::coroutine_handle<> h, io_env const* env)
819 : {
820 40 : self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
821 40 : self_->sink_,
822 40 : self_->cached_awaitable_,
823 40 : ba_.to_span());
824 :
825 40 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
826 38 : return h;
827 :
828 2 : return self_->active_write_ops_->await_suspend(
829 2 : self_->cached_awaitable_, h, env);
830 : }
831 :
832 : io_result<std::size_t>
833 40 : await_resume()
834 : {
835 40 : if(ba_.to_span().empty())
836 2 : return {{}, 0};
837 :
838 : struct guard {
839 : any_write_sink* self;
840 38 : ~guard() {
841 38 : self->active_write_ops_->destroy(self->cached_awaitable_);
842 38 : self->active_write_ops_ = nullptr;
843 38 : }
844 38 : } g{self_};
845 38 : return self_->active_write_ops_->await_resume(
846 38 : self_->cached_awaitable_);
847 38 : }
848 : };
849 42 : return awaitable{this, buffers};
850 : }
851 :
852 : template<ConstBufferSequence CB>
853 : io_task<std::size_t>
854 68 : any_write_sink::write(CB buffers)
855 : {
856 : buffer_param<CB> bp(buffers);
857 : std::size_t total = 0;
858 :
859 : for(;;)
860 : {
861 : auto bufs = bp.data();
862 : if(bufs.empty())
863 : break;
864 :
865 : auto [ec, n] = co_await write_(bufs);
866 : total += n;
867 : if(ec)
868 : co_return {ec, total};
869 : bp.consume(n);
870 : }
871 :
872 : co_return {{}, total};
873 136 : }
874 :
875 : template<ConstBufferSequence CB>
876 : io_task<std::size_t>
877 26 : any_write_sink::write_eof(CB buffers)
878 : {
879 : const_buffer_param<CB> bp(buffers);
880 : std::size_t total = 0;
881 :
882 : for(;;)
883 : {
884 : auto bufs = bp.data();
885 : if(bufs.empty())
886 : {
887 : auto [ec] = co_await write_eof();
888 : co_return {ec, total};
889 : }
890 :
891 : if(! bp.more())
892 : {
893 : // Last window — send atomically with EOF
894 : auto [ec, n] = co_await write_eof_buffers_(bufs);
895 : total += n;
896 : co_return {ec, total};
897 : }
898 :
899 : auto [ec, n] = co_await write_(bufs);
900 : total += n;
901 : if(ec)
902 : co_return {ec, total};
903 : bp.consume(n);
904 : }
905 52 : }
906 :
907 : } // namespace capy
908 : } // namespace boost
909 :
910 : #endif
|