TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_sink.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/write_sink.hpp>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/io_task.hpp>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any BufferSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref BufferSink concept, enabling runtime polymorphism for
42 : buffer sink operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper exposes two interfaces for producing data:
46 : the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 : and the @ref WriteSink interface (`write_some`, `write`,
48 : `write_eof`). Choose the interface that matches how your data
49 : is produced:
50 :
51 : @par Choosing an Interface
52 :
53 : Use the **BufferSink** interface when you are a generator that
54 : produces data into externally-provided buffers. The sink owns
55 : the memory; you call @ref prepare to obtain writable buffers,
56 : fill them, then call @ref commit or @ref commit_eof.
57 :
58 : Use the **WriteSink** interface when you already have buffers
59 : containing the data to write:
60 : - If the entire body is available up front, call
61 : @ref write_eof(buffers) to send everything atomically.
62 : - If data arrives incrementally, call @ref write or
63 : @ref write_some in a loop, then @ref write_eof() when done.
64 : Prefer `write` (complete) unless your streaming pattern
65 : benefits from partial writes via `write_some`.
66 :
67 : If the wrapped type only satisfies @ref BufferSink, the
68 : @ref WriteSink operations are provided automatically.
69 :
70 : @par Construction Modes
71 :
72 : - **Owning**: Pass by value to transfer ownership. The wrapper
73 : allocates storage and owns the sink.
74 : - **Reference**: Pass a pointer to wrap without ownership. The
75 : pointed-to sink must outlive this wrapper.
76 :
77 : @par Awaitable Preallocation
78 : The constructor preallocates storage for the type-erased awaitable.
79 : This reserves all virtual address space at server startup
80 : so memory usage can be measured up front, rather than
81 : allocating piecemeal as traffic arrives.
82 :
83 : @par Thread Safety
84 : Not thread-safe. Concurrent operations on the same wrapper
85 : are undefined behavior.
86 :
87 : @par Example
88 : @code
89 : // Owning - takes ownership of the sink
90 : any_buffer_sink abs(some_buffer_sink{args...});
91 :
92 : // Reference - wraps without ownership
93 : some_buffer_sink sink;
94 : any_buffer_sink abs(&sink);
95 :
96 : // BufferSink interface: generate into callee-owned buffers
97 : mutable_buffer arr[16];
98 : auto bufs = abs.prepare(arr);
99 : // Write data into bufs[0..bufs.size())
100 : auto [ec] = co_await abs.commit(bytes_written);
101 : auto [ec2] = co_await abs.commit_eof(0);
102 :
103 : // WriteSink interface: send caller-owned buffers
104 : auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105 : auto [ec4] = co_await abs.write_eof();
106 :
107 : // Or send everything at once
108 : auto [ec5, n2] = co_await abs.write_eof(
109 : make_buffer(body_data));
110 : @endcode
111 :
112 : @see any_buffer_source, BufferSink, WriteSink
113 : */
114 : class any_buffer_sink
115 : {
116 : struct vtable;
117 : struct awaitable_ops;
118 : struct write_awaitable_ops;
119 :
120 : template<BufferSink S>
121 : struct vtable_for_impl;
122 :
123 : // hot-path members first for cache locality
124 : void* sink_ = nullptr;
125 : vtable const* vt_ = nullptr;
126 : void* cached_awaitable_ = nullptr;
127 : awaitable_ops const* active_ops_ = nullptr;
128 : write_awaitable_ops const* active_write_ops_ = nullptr;
129 : void* storage_ = nullptr;
130 :
131 : public:
132 : /** Destructor.
133 :
134 : Destroys the owned sink (if any) and releases the cached
135 : awaitable storage.
136 : */
137 : ~any_buffer_sink();
138 :
139 : /** Construct a default instance.
140 :
141 : Constructs an empty wrapper. Operations on a default-constructed
142 : wrapper result in undefined behavior.
143 : */
144 : any_buffer_sink() = default;
145 :
146 : /** Non-copyable.
147 :
148 : The awaitable cache is per-instance and cannot be shared.
149 : */
150 : any_buffer_sink(any_buffer_sink const&) = delete;
151 : any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152 :
153 : /** Construct by moving.
154 :
155 : Transfers ownership of the wrapped sink (if owned) and
156 : cached awaitable storage from `other`. After the move, `other` is
157 : in a default-constructed state.
158 :
159 : @param other The wrapper to move from.
160 : */
161 HIT 2 : any_buffer_sink(any_buffer_sink&& other) noexcept
162 2 : : sink_(std::exchange(other.sink_, nullptr))
163 2 : , vt_(std::exchange(other.vt_, nullptr))
164 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
166 2 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167 2 : , storage_(std::exchange(other.storage_, nullptr))
168 : {
169 2 : }
170 :
171 : /** Assign by moving.
172 :
173 : Destroys any owned sink and releases existing resources,
174 : then transfers ownership from `other`.
175 :
176 : @param other The wrapper to move from.
177 : @return Reference to this wrapper.
178 : */
179 : any_buffer_sink&
180 : operator=(any_buffer_sink&& other) noexcept;
181 :
182 : /** Construct by taking ownership of a BufferSink.
183 :
184 : Allocates storage and moves the sink into this wrapper.
185 : The wrapper owns the sink and will destroy it. If `S` also
186 : satisfies @ref WriteSink, native write operations are
187 : forwarded through the virtual boundary.
188 :
189 : @param s The sink to take ownership of.
190 : */
191 : template<BufferSink S>
192 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193 : any_buffer_sink(S s);
194 :
195 : /** Construct by wrapping a BufferSink without ownership.
196 :
197 : Wraps the given sink by pointer. The sink must remain
198 : valid for the lifetime of this wrapper. If `S` also
199 : satisfies @ref WriteSink, native write operations are
200 : forwarded through the virtual boundary.
201 :
202 : @param s Pointer to the sink to wrap.
203 : */
204 : template<BufferSink S>
205 : any_buffer_sink(S* s);
206 :
207 : /** Check if the wrapper contains a valid sink.
208 :
209 : @return `true` if wrapping a sink, `false` if default-constructed
210 : or moved-from.
211 : */
212 : bool
213 26 : has_value() const noexcept
214 : {
215 26 : return sink_ != nullptr;
216 : }
217 :
218 : /** Check if the wrapper contains a valid sink.
219 :
220 : @return `true` if wrapping a sink, `false` if default-constructed
221 : or moved-from.
222 : */
223 : explicit
224 3 : operator bool() const noexcept
225 : {
226 3 : return has_value();
227 : }
228 :
229 : /** Prepare writable buffers.
230 :
231 : Fills the provided span with mutable buffer descriptors
232 : pointing to the underlying sink's internal storage. This
233 : operation is synchronous.
234 :
235 : @param dest Span of mutable_buffer to fill.
236 :
237 : @return A span of filled buffers.
238 :
239 : @par Preconditions
240 : The wrapper must contain a valid sink (`has_value() == true`).
241 : */
242 : std::span<mutable_buffer>
243 : prepare(std::span<mutable_buffer> dest);
244 :
245 : /** Commit bytes written to the prepared buffers.
246 :
247 : Commits `n` bytes written to the buffers returned by the
248 : most recent call to @ref prepare. The operation may trigger
249 : underlying I/O.
250 :
251 : @param n The number of bytes to commit.
252 :
253 : @return An awaitable that await-returns `(error_code)`.
254 :
255 : @par Preconditions
256 : The wrapper must contain a valid sink (`has_value() == true`).
257 : */
258 : auto
259 : commit(std::size_t n);
260 :
261 : /** Commit final bytes and signal end-of-stream.
262 :
263 : Commits `n` bytes written to the buffers returned by the
264 : most recent call to @ref prepare and finalizes the sink.
265 : After success, no further operations are permitted.
266 :
267 : @param n The number of bytes to commit.
268 :
269 : @return An awaitable that await-returns `(error_code)`.
270 :
271 : @par Preconditions
272 : The wrapper must contain a valid sink (`has_value() == true`).
273 : */
274 : auto
275 : commit_eof(std::size_t n);
276 :
277 : /** Write some data from a buffer sequence.
278 :
279 : Attempt to write up to `buffer_size( buffers )` bytes from
280 : the buffer sequence to the underlying sink. May consume less
281 : than the full sequence.
282 :
283 : When the wrapped type provides native @ref WriteSink support,
284 : the operation forwards directly. Otherwise it is synthesized
285 : from @ref prepare and @ref commit with a buffer copy.
286 :
287 : @param buffers The buffer sequence to write.
288 :
289 : @return An awaitable that await-returns `(error_code,std::size_t)`.
290 :
291 : @par Preconditions
292 : The wrapper must contain a valid sink (`has_value() == true`).
293 : */
294 : template<ConstBufferSequence CB>
295 : io_task<std::size_t>
296 : write_some(CB buffers);
297 :
298 : /** Write all data from a buffer sequence.
299 :
300 : Writes all data from the buffer sequence to the underlying
301 : sink. This method satisfies the @ref WriteSink concept.
302 :
303 : When the wrapped type provides native @ref WriteSink support,
304 : each window is forwarded directly. Otherwise the data is
305 : copied into the sink via @ref prepare and @ref commit.
306 :
307 : @param buffers The buffer sequence to write.
308 :
309 : @return An awaitable that await-returns `(error_code,std::size_t)`.
310 :
311 : @par Preconditions
312 : The wrapper must contain a valid sink (`has_value() == true`).
313 : */
314 : template<ConstBufferSequence CB>
315 : io_task<std::size_t>
316 : write(CB buffers);
317 :
318 : /** Atomically write data and signal end-of-stream.
319 :
320 : Writes all data from the buffer sequence to the underlying
321 : sink and then signals end-of-stream.
322 :
323 : When the wrapped type provides native @ref WriteSink support,
324 : the final window is sent atomically via the underlying
325 : `write_eof(buffers)`. Otherwise the data is synthesized
326 : through @ref prepare, @ref commit, and @ref commit_eof.
327 :
328 : @param buffers The buffer sequence to write.
329 :
330 : @return An awaitable that await-returns `(error_code,std::size_t)`.
331 :
332 : @par Preconditions
333 : The wrapper must contain a valid sink (`has_value() == true`).
334 : */
335 : template<ConstBufferSequence CB>
336 : io_task<std::size_t>
337 : write_eof(CB buffers);
338 :
339 : /** Signal end-of-stream.
340 :
341 : Indicates that no more data will be written to the sink.
342 : This method satisfies the @ref WriteSink concept.
343 :
344 : When the wrapped type provides native @ref WriteSink support,
345 : the underlying `write_eof()` is called. Otherwise the
346 : operation is implemented as `commit_eof(0)`.
347 :
348 : @return An awaitable that await-returns `(error_code)`.
349 :
350 : @par Preconditions
351 : The wrapper must contain a valid sink (`has_value() == true`).
352 : */
353 : auto
354 : write_eof();
355 :
356 : protected:
357 : /** Rebind to a new sink after move.
358 :
359 : Updates the internal pointer to reference a new sink object.
360 : Used by owning wrappers after move assignment when the owned
361 : object has moved to a new location.
362 :
363 : @param new_sink The new sink to bind to. Must be the same
364 : type as the original sink.
365 :
366 : @note Terminates if called with a sink of different type
367 : than the original.
368 : */
369 : template<BufferSink S>
370 : void
371 : rebind(S& new_sink) noexcept
372 : {
373 : if(vt_ != &vtable_for_impl<S>::value)
374 : std::terminate();
375 : sink_ = &new_sink;
376 : }
377 :
378 : private:
379 : /** Forward a partial write through the vtable.
380 :
381 : Constructs the underlying `write_some` awaitable in
382 : cached storage and returns a type-erased awaitable.
383 : */
384 : auto
385 : write_some_(std::span<const_buffer const> buffers);
386 :
387 : /** Forward a complete write through the vtable.
388 :
389 : Constructs the underlying `write` awaitable in
390 : cached storage and returns a type-erased awaitable.
391 : */
392 : auto
393 : write_(std::span<const_buffer const> buffers);
394 :
395 : /** Forward an atomic write-with-EOF through the vtable.
396 :
397 : Constructs the underlying `write_eof(buffers)` awaitable
398 : in cached storage and returns a type-erased awaitable.
399 : */
400 : auto
401 : write_eof_buffers_(std::span<const_buffer const> buffers);
402 : };
403 :
404 : /** Type-erased ops for awaitables that await-return `io_result<>`. */
405 : struct any_buffer_sink::awaitable_ops
406 : {
407 : bool (*await_ready)(void*);
408 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
409 : io_result<> (*await_resume)(void*);
410 : void (*destroy)(void*) noexcept;
411 : };
412 :
413 : /** Type-erased ops for awaitables that await-return `io_result<std::size_t>`. */
414 : struct any_buffer_sink::write_awaitable_ops
415 : {
416 : bool (*await_ready)(void*);
417 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
418 : io_result<std::size_t> (*await_resume)(void*);
419 : void (*destroy)(void*) noexcept;
420 : };
421 :
422 : struct any_buffer_sink::vtable
423 : {
424 : void (*destroy)(void*) noexcept;
425 : std::span<mutable_buffer> (*do_prepare)(
426 : void* sink,
427 : std::span<mutable_buffer> dest);
428 : std::size_t awaitable_size;
429 : std::size_t awaitable_align;
430 : awaitable_ops const* (*construct_commit_awaitable)(
431 : void* sink,
432 : void* storage,
433 : std::size_t n);
434 : awaitable_ops const* (*construct_commit_eof_awaitable)(
435 : void* sink,
436 : void* storage,
437 : std::size_t n);
438 :
439 : // WriteSink forwarding (null when wrapped type is BufferSink-only)
440 : write_awaitable_ops const* (*construct_write_some_awaitable)(
441 : void* sink,
442 : void* storage,
443 : std::span<const_buffer const> buffers);
444 : write_awaitable_ops const* (*construct_write_awaitable)(
445 : void* sink,
446 : void* storage,
447 : std::span<const_buffer const> buffers);
448 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
449 : void* sink,
450 : void* storage,
451 : std::span<const_buffer const> buffers);
452 : awaitable_ops const* (*construct_write_eof_awaitable)(
453 : void* sink,
454 : void* storage);
455 : };
456 :
457 : template<BufferSink S>
458 : struct any_buffer_sink::vtable_for_impl
459 : {
460 : using CommitAwaitable = decltype(std::declval<S&>().commit(
461 : std::size_t{}));
462 : using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
463 : std::size_t{}));
464 :
465 : static void
466 18 : do_destroy_impl(void* sink) noexcept
467 : {
468 18 : static_cast<S*>(sink)->~S();
469 18 : }
470 :
471 : static std::span<mutable_buffer>
472 130 : do_prepare_impl(
473 : void* sink,
474 : std::span<mutable_buffer> dest)
475 : {
476 130 : auto& s = *static_cast<S*>(sink);
477 130 : return s.prepare(dest);
478 : }
479 :
480 : static awaitable_ops const*
481 109 : construct_commit_awaitable_impl(
482 : void* sink,
483 : void* storage,
484 : std::size_t n)
485 : {
486 109 : auto& s = *static_cast<S*>(sink);
487 109 : ::new(storage) CommitAwaitable(s.commit(n));
488 :
489 : static constexpr awaitable_ops ops = {
490 109 : +[](void* p) {
491 109 : return static_cast<CommitAwaitable*>(p)->await_ready();
492 : },
493 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
494 0 : return detail::call_await_suspend(
495 0 : static_cast<CommitAwaitable*>(p), h, env);
496 : },
497 HIT 109 : +[](void* p) {
498 109 : return static_cast<CommitAwaitable*>(p)->await_resume();
499 : },
500 109 : +[](void* p) noexcept {
501 109 : static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
502 : }
503 : };
504 109 : return &ops;
505 : }
506 :
507 : static awaitable_ops const*
508 70 : construct_commit_eof_awaitable_impl(
509 : void* sink,
510 : void* storage,
511 : std::size_t n)
512 : {
513 70 : auto& s = *static_cast<S*>(sink);
514 70 : ::new(storage) CommitEofAwaitable(s.commit_eof(n));
515 :
516 : static constexpr awaitable_ops ops = {
517 70 : +[](void* p) {
518 70 : return static_cast<CommitEofAwaitable*>(p)->await_ready();
519 : },
520 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
521 0 : return detail::call_await_suspend(
522 0 : static_cast<CommitEofAwaitable*>(p), h, env);
523 : },
524 HIT 70 : +[](void* p) {
525 70 : return static_cast<CommitEofAwaitable*>(p)->await_resume();
526 : },
527 70 : +[](void* p) noexcept {
528 70 : static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
529 : }
530 : };
531 70 : return &ops;
532 : }
533 :
534 : static write_awaitable_ops const*
535 6 : construct_write_some_awaitable_impl(
536 : void* sink,
537 : void* storage,
538 : std::span<const_buffer const> buffers)
539 : requires WriteSink<S>
540 : {
541 : using Aw = decltype(std::declval<S&>().write_some(
542 : std::span<const_buffer const>{}));
543 6 : auto& s = *static_cast<S*>(sink);
544 6 : ::new(storage) Aw(s.write_some(buffers));
545 :
546 : static constexpr write_awaitable_ops ops = {
547 6 : +[](void* p) {
548 6 : return static_cast<Aw*>(p)->await_ready();
549 : },
550 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
551 0 : return detail::call_await_suspend(
552 0 : static_cast<Aw*>(p), h, env);
553 : },
554 HIT 6 : +[](void* p) {
555 6 : return static_cast<Aw*>(p)->await_resume();
556 : },
557 6 : +[](void* p) noexcept {
558 6 : static_cast<Aw*>(p)->~Aw();
559 : }
560 : };
561 6 : return &ops;
562 : }
563 :
564 : static write_awaitable_ops const*
565 14 : construct_write_awaitable_impl(
566 : void* sink,
567 : void* storage,
568 : std::span<const_buffer const> buffers)
569 : requires WriteSink<S>
570 : {
571 : using Aw = decltype(std::declval<S&>().write(
572 : std::span<const_buffer const>{}));
573 14 : auto& s = *static_cast<S*>(sink);
574 14 : ::new(storage) Aw(s.write(buffers));
575 :
576 : static constexpr write_awaitable_ops ops = {
577 14 : +[](void* p) {
578 14 : return static_cast<Aw*>(p)->await_ready();
579 : },
580 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
581 0 : return detail::call_await_suspend(
582 0 : static_cast<Aw*>(p), h, env);
583 : },
584 HIT 14 : +[](void* p) {
585 14 : return static_cast<Aw*>(p)->await_resume();
586 : },
587 14 : +[](void* p) noexcept {
588 14 : static_cast<Aw*>(p)->~Aw();
589 : }
590 : };
591 14 : return &ops;
592 : }
593 :
594 : static write_awaitable_ops const*
595 12 : construct_write_eof_buffers_awaitable_impl(
596 : void* sink,
597 : void* storage,
598 : std::span<const_buffer const> buffers)
599 : requires WriteSink<S>
600 : {
601 : using Aw = decltype(std::declval<S&>().write_eof(
602 : std::span<const_buffer const>{}));
603 12 : auto& s = *static_cast<S*>(sink);
604 12 : ::new(storage) Aw(s.write_eof(buffers));
605 :
606 : static constexpr write_awaitable_ops ops = {
607 12 : +[](void* p) {
608 12 : return static_cast<Aw*>(p)->await_ready();
609 : },
610 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
611 0 : return detail::call_await_suspend(
612 0 : static_cast<Aw*>(p), h, env);
613 : },
614 HIT 12 : +[](void* p) {
615 12 : return static_cast<Aw*>(p)->await_resume();
616 : },
617 12 : +[](void* p) noexcept {
618 12 : static_cast<Aw*>(p)->~Aw();
619 : }
620 : };
621 12 : return &ops;
622 : }
623 :
624 : static awaitable_ops const*
625 16 : construct_write_eof_awaitable_impl(
626 : void* sink,
627 : void* storage)
628 : requires WriteSink<S>
629 : {
630 : using Aw = decltype(std::declval<S&>().write_eof());
631 16 : auto& s = *static_cast<S*>(sink);
632 16 : ::new(storage) Aw(s.write_eof());
633 :
634 : static constexpr awaitable_ops ops = {
635 16 : +[](void* p) {
636 16 : return static_cast<Aw*>(p)->await_ready();
637 : },
638 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
639 0 : return detail::call_await_suspend(
640 0 : static_cast<Aw*>(p), h, env);
641 : },
642 HIT 16 : +[](void* p) {
643 16 : return static_cast<Aw*>(p)->await_resume();
644 : },
645 16 : +[](void* p) noexcept {
646 16 : static_cast<Aw*>(p)->~Aw();
647 : }
648 : };
649 16 : return &ops;
650 : }
651 :
652 : static consteval std::size_t
653 : compute_max_size() noexcept
654 : {
655 : std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
656 : ? sizeof(CommitAwaitable)
657 : : sizeof(CommitEofAwaitable);
658 : if constexpr (WriteSink<S>)
659 : {
660 : using WS = decltype(std::declval<S&>().write_some(
661 : std::span<const_buffer const>{}));
662 : using W = decltype(std::declval<S&>().write(
663 : std::span<const_buffer const>{}));
664 : using WEB = decltype(std::declval<S&>().write_eof(
665 : std::span<const_buffer const>{}));
666 : using WE = decltype(std::declval<S&>().write_eof());
667 :
668 : if(sizeof(WS) > s) s = sizeof(WS);
669 : if(sizeof(W) > s) s = sizeof(W);
670 : if(sizeof(WEB) > s) s = sizeof(WEB);
671 : if(sizeof(WE) > s) s = sizeof(WE);
672 : }
673 : return s;
674 : }
675 :
676 : static consteval std::size_t
677 : compute_max_align() noexcept
678 : {
679 : std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
680 : ? alignof(CommitAwaitable)
681 : : alignof(CommitEofAwaitable);
682 : if constexpr (WriteSink<S>)
683 : {
684 : using WS = decltype(std::declval<S&>().write_some(
685 : std::span<const_buffer const>{}));
686 : using W = decltype(std::declval<S&>().write(
687 : std::span<const_buffer const>{}));
688 : using WEB = decltype(std::declval<S&>().write_eof(
689 : std::span<const_buffer const>{}));
690 : using WE = decltype(std::declval<S&>().write_eof());
691 :
692 : if(alignof(WS) > a) a = alignof(WS);
693 : if(alignof(W) > a) a = alignof(W);
694 : if(alignof(WEB) > a) a = alignof(WEB);
695 : if(alignof(WE) > a) a = alignof(WE);
696 : }
697 : return a;
698 : }
699 :
700 : static consteval vtable
701 : make_vtable() noexcept
702 : {
703 : vtable v{};
704 : v.destroy = &do_destroy_impl;
705 : v.do_prepare = &do_prepare_impl;
706 : v.awaitable_size = compute_max_size();
707 : v.awaitable_align = compute_max_align();
708 : v.construct_commit_awaitable = &construct_commit_awaitable_impl;
709 : v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
710 : v.construct_write_some_awaitable = nullptr;
711 : v.construct_write_awaitable = nullptr;
712 : v.construct_write_eof_buffers_awaitable = nullptr;
713 : v.construct_write_eof_awaitable = nullptr;
714 :
715 : if constexpr (WriteSink<S>)
716 : {
717 : v.construct_write_some_awaitable =
718 : &construct_write_some_awaitable_impl;
719 : v.construct_write_awaitable =
720 : &construct_write_awaitable_impl;
721 : v.construct_write_eof_buffers_awaitable =
722 : &construct_write_eof_buffers_awaitable_impl;
723 : v.construct_write_eof_awaitable =
724 : &construct_write_eof_awaitable_impl;
725 : }
726 : return v;
727 : }
728 :
729 : static constexpr vtable value = make_vtable();
730 : };
731 :
732 : inline
733 217 : any_buffer_sink::~any_buffer_sink()
734 : {
735 217 : if(storage_)
736 : {
737 17 : vt_->destroy(sink_);
738 17 : ::operator delete(storage_);
739 : }
740 217 : if(cached_awaitable_)
741 210 : ::operator delete(cached_awaitable_);
742 217 : }
743 :
744 : inline any_buffer_sink&
745 5 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
746 : {
747 5 : if(this != &other)
748 : {
749 4 : if(storage_)
750 : {
751 1 : vt_->destroy(sink_);
752 1 : ::operator delete(storage_);
753 : }
754 4 : if(cached_awaitable_)
755 2 : ::operator delete(cached_awaitable_);
756 4 : sink_ = std::exchange(other.sink_, nullptr);
757 4 : vt_ = std::exchange(other.vt_, nullptr);
758 4 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
759 4 : storage_ = std::exchange(other.storage_, nullptr);
760 4 : active_ops_ = std::exchange(other.active_ops_, nullptr);
761 4 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
762 : }
763 5 : return *this;
764 : }
765 :
766 : template<BufferSink S>
767 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
768 18 : any_buffer_sink::any_buffer_sink(S s)
769 18 : : vt_(&vtable_for_impl<S>::value)
770 : {
771 : struct guard {
772 : any_buffer_sink* self;
773 : bool committed = false;
774 18 : ~guard() {
775 18 : if(!committed && self->storage_) {
776 MIS 0 : self->vt_->destroy(self->sink_);
777 0 : ::operator delete(self->storage_);
778 0 : self->storage_ = nullptr;
779 0 : self->sink_ = nullptr;
780 : }
781 HIT 18 : }
782 18 : } g{this};
783 :
784 18 : storage_ = ::operator new(sizeof(S));
785 18 : sink_ = ::new(storage_) S(std::move(s));
786 :
787 18 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
788 :
789 18 : g.committed = true;
790 18 : }
791 :
792 : template<BufferSink S>
793 194 : any_buffer_sink::any_buffer_sink(S* s)
794 194 : : sink_(s)
795 194 : , vt_(&vtable_for_impl<S>::value)
796 : {
797 194 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
798 194 : }
799 :
800 : inline std::span<mutable_buffer>
801 130 : any_buffer_sink::prepare(std::span<mutable_buffer> dest)
802 : {
803 130 : return vt_->do_prepare(sink_, dest);
804 : }
805 :
806 : inline auto
807 109 : any_buffer_sink::commit(std::size_t n)
808 : {
809 : struct awaitable
810 : {
811 : any_buffer_sink* self_;
812 : std::size_t n_;
813 :
814 : bool
815 109 : await_ready()
816 : {
817 218 : self_->active_ops_ = self_->vt_->construct_commit_awaitable(
818 109 : self_->sink_,
819 109 : self_->cached_awaitable_,
820 : n_);
821 109 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
822 : }
823 :
824 : std::coroutine_handle<>
825 MIS 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
826 : {
827 0 : return self_->active_ops_->await_suspend(
828 0 : self_->cached_awaitable_, h, env);
829 : }
830 :
831 : io_result<>
832 HIT 109 : await_resume()
833 : {
834 : struct guard {
835 : any_buffer_sink* self;
836 109 : ~guard() {
837 109 : self->active_ops_->destroy(self->cached_awaitable_);
838 109 : self->active_ops_ = nullptr;
839 109 : }
840 109 : } g{self_};
841 109 : return self_->active_ops_->await_resume(
842 191 : self_->cached_awaitable_);
843 109 : }
844 : };
845 109 : return awaitable{this, n};
846 : }
847 :
848 : inline auto
849 54 : any_buffer_sink::commit_eof(std::size_t n)
850 : {
851 : struct awaitable
852 : {
853 : any_buffer_sink* self_;
854 : std::size_t n_;
855 :
856 : bool
857 54 : await_ready()
858 : {
859 108 : self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
860 54 : self_->sink_,
861 54 : self_->cached_awaitable_,
862 : n_);
863 54 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
864 : }
865 :
866 : std::coroutine_handle<>
867 MIS 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
868 : {
869 0 : return self_->active_ops_->await_suspend(
870 0 : self_->cached_awaitable_, h, env);
871 : }
872 :
873 : io_result<>
874 HIT 54 : await_resume()
875 : {
876 : struct guard {
877 : any_buffer_sink* self;
878 54 : ~guard() {
879 54 : self->active_ops_->destroy(self->cached_awaitable_);
880 54 : self->active_ops_ = nullptr;
881 54 : }
882 54 : } g{self_};
883 54 : return self_->active_ops_->await_resume(
884 92 : self_->cached_awaitable_);
885 54 : }
886 : };
887 54 : return awaitable{this, n};
888 : }
889 :
890 : inline auto
891 6 : any_buffer_sink::write_some_(
892 : std::span<const_buffer const> buffers)
893 : {
894 : struct awaitable
895 : {
896 : any_buffer_sink* self_;
897 : std::span<const_buffer const> buffers_;
898 :
899 : bool
900 6 : await_ready() const noexcept
901 : {
902 6 : return false;
903 : }
904 :
905 : std::coroutine_handle<>
906 6 : await_suspend(std::coroutine_handle<> h, io_env const* env)
907 : {
908 12 : self_->active_write_ops_ =
909 12 : self_->vt_->construct_write_some_awaitable(
910 6 : self_->sink_,
911 6 : self_->cached_awaitable_,
912 : buffers_);
913 :
914 6 : if(self_->active_write_ops_->await_ready(
915 6 : self_->cached_awaitable_))
916 6 : return h;
917 :
918 MIS 0 : return self_->active_write_ops_->await_suspend(
919 0 : self_->cached_awaitable_, h, env);
920 : }
921 :
922 : io_result<std::size_t>
923 HIT 6 : await_resume()
924 : {
925 : struct guard {
926 : any_buffer_sink* self;
927 6 : ~guard() {
928 6 : self->active_write_ops_->destroy(
929 6 : self->cached_awaitable_);
930 6 : self->active_write_ops_ = nullptr;
931 6 : }
932 6 : } g{self_};
933 6 : return self_->active_write_ops_->await_resume(
934 10 : self_->cached_awaitable_);
935 6 : }
936 : };
937 6 : return awaitable{this, buffers};
938 : }
939 :
940 : inline auto
941 14 : any_buffer_sink::write_(
942 : std::span<const_buffer const> buffers)
943 : {
944 : struct awaitable
945 : {
946 : any_buffer_sink* self_;
947 : std::span<const_buffer const> buffers_;
948 :
949 : bool
950 14 : await_ready() const noexcept
951 : {
952 14 : return false;
953 : }
954 :
955 : std::coroutine_handle<>
956 14 : await_suspend(std::coroutine_handle<> h, io_env const* env)
957 : {
958 28 : self_->active_write_ops_ =
959 28 : self_->vt_->construct_write_awaitable(
960 14 : self_->sink_,
961 14 : self_->cached_awaitable_,
962 : buffers_);
963 :
964 14 : if(self_->active_write_ops_->await_ready(
965 14 : self_->cached_awaitable_))
966 14 : return h;
967 :
968 MIS 0 : return self_->active_write_ops_->await_suspend(
969 0 : self_->cached_awaitable_, h, env);
970 : }
971 :
972 : io_result<std::size_t>
973 HIT 14 : await_resume()
974 : {
975 : struct guard {
976 : any_buffer_sink* self;
977 14 : ~guard() {
978 14 : self->active_write_ops_->destroy(
979 14 : self->cached_awaitable_);
980 14 : self->active_write_ops_ = nullptr;
981 14 : }
982 14 : } g{self_};
983 14 : return self_->active_write_ops_->await_resume(
984 24 : self_->cached_awaitable_);
985 14 : }
986 : };
987 14 : return awaitable{this, buffers};
988 : }
989 :
990 : inline auto
991 12 : any_buffer_sink::write_eof_buffers_(
992 : std::span<const_buffer const> buffers)
993 : {
994 : struct awaitable
995 : {
996 : any_buffer_sink* self_;
997 : std::span<const_buffer const> buffers_;
998 :
999 : bool
1000 12 : await_ready() const noexcept
1001 : {
1002 12 : return false;
1003 : }
1004 :
1005 : std::coroutine_handle<>
1006 12 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1007 : {
1008 24 : self_->active_write_ops_ =
1009 24 : self_->vt_->construct_write_eof_buffers_awaitable(
1010 12 : self_->sink_,
1011 12 : self_->cached_awaitable_,
1012 : buffers_);
1013 :
1014 12 : if(self_->active_write_ops_->await_ready(
1015 12 : self_->cached_awaitable_))
1016 12 : return h;
1017 :
1018 MIS 0 : return self_->active_write_ops_->await_suspend(
1019 0 : self_->cached_awaitable_, h, env);
1020 : }
1021 :
1022 : io_result<std::size_t>
1023 HIT 12 : await_resume()
1024 : {
1025 : struct guard {
1026 : any_buffer_sink* self;
1027 12 : ~guard() {
1028 12 : self->active_write_ops_->destroy(
1029 12 : self->cached_awaitable_);
1030 12 : self->active_write_ops_ = nullptr;
1031 12 : }
1032 12 : } g{self_};
1033 12 : return self_->active_write_ops_->await_resume(
1034 20 : self_->cached_awaitable_);
1035 12 : }
1036 : };
1037 12 : return awaitable{this, buffers};
1038 : }
1039 :
1040 : template<ConstBufferSequence CB>
1041 : io_task<std::size_t>
1042 22 : any_buffer_sink::write_some(CB buffers)
1043 : {
1044 : buffer_param<CB> bp(buffers);
1045 : auto src = bp.data();
1046 : if(src.empty())
1047 : co_return {{}, 0};
1048 :
1049 : // Native WriteSink path
1050 : if(vt_->construct_write_some_awaitable)
1051 : co_return co_await write_some_(src);
1052 :
1053 : // Synthesized path: prepare + buffer_copy + commit
1054 : mutable_buffer arr[detail::max_iovec_];
1055 : auto dst_bufs = prepare(arr);
1056 : if(dst_bufs.empty())
1057 : {
1058 : auto [ec] = co_await commit(0);
1059 : if(ec)
1060 : co_return {ec, 0};
1061 : dst_bufs = prepare(arr);
1062 : if(dst_bufs.empty())
1063 : co_return {{}, 0};
1064 : }
1065 :
1066 : auto n = buffer_copy(dst_bufs, src);
1067 : auto [ec] = co_await commit(n);
1068 : if(ec)
1069 : co_return {ec, 0};
1070 : co_return {{}, n};
1071 44 : }
1072 :
1073 : template<ConstBufferSequence CB>
1074 : io_task<std::size_t>
1075 38 : any_buffer_sink::write(CB buffers)
1076 : {
1077 : buffer_param<CB> bp(buffers);
1078 : std::size_t total = 0;
1079 :
1080 : // Native WriteSink path
1081 : if(vt_->construct_write_awaitable)
1082 : {
1083 : for(;;)
1084 : {
1085 : auto bufs = bp.data();
1086 : if(bufs.empty())
1087 : break;
1088 :
1089 : auto [ec, n] = co_await write_(bufs);
1090 : total += n;
1091 : if(ec)
1092 : co_return {ec, total};
1093 : bp.consume(n);
1094 : }
1095 : co_return {{}, total};
1096 : }
1097 :
1098 : // Synthesized path: prepare + buffer_copy + commit
1099 : for(;;)
1100 : {
1101 : auto src = bp.data();
1102 : if(src.empty())
1103 : break;
1104 :
1105 : mutable_buffer arr[detail::max_iovec_];
1106 : auto dst_bufs = prepare(arr);
1107 : if(dst_bufs.empty())
1108 : {
1109 : auto [ec] = co_await commit(0);
1110 : if(ec)
1111 : co_return {ec, total};
1112 : continue;
1113 : }
1114 :
1115 : auto n = buffer_copy(dst_bufs, src);
1116 : auto [ec] = co_await commit(n);
1117 : if(ec)
1118 : co_return {ec, total};
1119 : bp.consume(n);
1120 : total += n;
1121 : }
1122 :
1123 : co_return {{}, total};
1124 76 : }
1125 :
1126 : inline auto
1127 32 : any_buffer_sink::write_eof()
1128 : {
1129 : struct awaitable
1130 : {
1131 : any_buffer_sink* self_;
1132 :
1133 : bool
1134 32 : await_ready()
1135 : {
1136 32 : if(self_->vt_->construct_write_eof_awaitable)
1137 : {
1138 : // Native WriteSink: forward to underlying write_eof()
1139 32 : self_->active_ops_ =
1140 16 : self_->vt_->construct_write_eof_awaitable(
1141 16 : self_->sink_,
1142 16 : self_->cached_awaitable_);
1143 : }
1144 : else
1145 : {
1146 : // Synthesized: commit_eof(0)
1147 32 : self_->active_ops_ =
1148 16 : self_->vt_->construct_commit_eof_awaitable(
1149 16 : self_->sink_,
1150 16 : self_->cached_awaitable_,
1151 : 0);
1152 : }
1153 64 : return self_->active_ops_->await_ready(
1154 32 : self_->cached_awaitable_);
1155 : }
1156 :
1157 : std::coroutine_handle<>
1158 MIS 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1159 : {
1160 0 : return self_->active_ops_->await_suspend(
1161 0 : self_->cached_awaitable_, h, env);
1162 : }
1163 :
1164 : io_result<>
1165 HIT 32 : await_resume()
1166 : {
1167 : struct guard {
1168 : any_buffer_sink* self;
1169 32 : ~guard() {
1170 32 : self->active_ops_->destroy(self->cached_awaitable_);
1171 32 : self->active_ops_ = nullptr;
1172 32 : }
1173 32 : } g{self_};
1174 32 : return self_->active_ops_->await_resume(
1175 54 : self_->cached_awaitable_);
1176 32 : }
1177 : };
1178 32 : return awaitable{this};
1179 : }
1180 :
1181 : template<ConstBufferSequence CB>
1182 : io_task<std::size_t>
1183 40 : any_buffer_sink::write_eof(CB buffers)
1184 : {
1185 : // Native WriteSink path
1186 : if(vt_->construct_write_eof_buffers_awaitable)
1187 : {
1188 : const_buffer_param<CB> bp(buffers);
1189 : std::size_t total = 0;
1190 :
1191 : for(;;)
1192 : {
1193 : auto bufs = bp.data();
1194 : if(bufs.empty())
1195 : {
1196 : auto [ec] = co_await write_eof();
1197 : co_return {ec, total};
1198 : }
1199 :
1200 : if(!bp.more())
1201 : {
1202 : // Last window: send atomically with EOF
1203 : auto [ec, n] = co_await write_eof_buffers_(bufs);
1204 : total += n;
1205 : co_return {ec, total};
1206 : }
1207 :
1208 : auto [ec, n] = co_await write_(bufs);
1209 : total += n;
1210 : if(ec)
1211 : co_return {ec, total};
1212 : bp.consume(n);
1213 : }
1214 : }
1215 :
1216 : // Synthesized path: prepare + buffer_copy + commit + commit_eof
1217 : buffer_param<CB> bp(buffers);
1218 : std::size_t total = 0;
1219 :
1220 : for(;;)
1221 : {
1222 : auto src = bp.data();
1223 : if(src.empty())
1224 : break;
1225 :
1226 : mutable_buffer arr[detail::max_iovec_];
1227 : auto dst_bufs = prepare(arr);
1228 : if(dst_bufs.empty())
1229 : {
1230 : auto [ec] = co_await commit(0);
1231 : if(ec)
1232 : co_return {ec, total};
1233 : continue;
1234 : }
1235 :
1236 : auto n = buffer_copy(dst_bufs, src);
1237 : auto [ec] = co_await commit(n);
1238 : if(ec)
1239 : co_return {ec, total};
1240 : bp.consume(n);
1241 : total += n;
1242 : }
1243 :
1244 : auto [ec] = co_await commit_eof(0);
1245 : if(ec)
1246 : co_return {ec, total};
1247 :
1248 : co_return {{}, total};
1249 80 : }
1250 :
1251 : static_assert(BufferSink<any_buffer_sink>);
1252 : static_assert(WriteSink<any_buffer_sink>);
1253 :
1254 : } // namespace capy
1255 : } // namespace boost
1256 :
1257 : #endif
|