LCOV - code coverage report
Current view: top level - capy/io - any_write_sink.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.6 % 212 192 20
Test Date: 2026-03-11 22:31:12 Functions: 77.9 % 77 60 17

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      11                 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/buffers/buffer_array.hpp>
      17                 : #include <boost/capy/buffers/buffer_param.hpp>
      18                 : #include <boost/capy/concept/io_awaitable.hpp>
      19                 : #include <boost/capy/concept/write_sink.hpp>
      20                 : #include <coroutine>
      21                 : #include <boost/capy/ex/io_env.hpp>
      22                 : #include <boost/capy/io_result.hpp>
      23                 : #include <boost/capy/io_task.hpp>
      24                 : 
      25                 : #include <concepts>
      26                 : #include <coroutine>
      27                 : #include <cstddef>
      28                 : #include <exception>
      29                 : #include <new>
      30                 : #include <span>
      31                 : #include <stop_token>
      32                 : #include <system_error>
      33                 : #include <utility>
      34                 : 
      35                 : namespace boost {
      36                 : namespace capy {
      37                 : 
      38                 : /** Type-erased wrapper for any WriteSink.
      39                 : 
      40                 :     This class provides type erasure for any type satisfying the
      41                 :     @ref WriteSink concept, enabling runtime polymorphism for
      42                 :     sink write operations. It uses cached awaitable storage to achieve
      43                 :     zero steady-state allocation after construction.
      44                 : 
      45                 :     The wrapper supports two construction modes:
      46                 :     - **Owning**: Pass by value to transfer ownership. The wrapper
      47                 :       allocates storage and owns the sink.
      48                 :     - **Reference**: Pass a pointer to wrap without ownership. The
      49                 :       pointed-to sink must outlive this wrapper.
      50                 : 
      51                 :     @par Awaitable Preallocation
      52                 :     The constructor preallocates storage for the type-erased awaitable.
      53                 :     This reserves all virtual address space at server startup
      54                 :     so memory usage can be measured up front, rather than
      55                 :     allocating piecemeal as traffic arrives.
      56                 : 
      57                 :     @par Immediate Completion
      58                 :     Operations complete immediately without suspending when the
      59                 :     buffer sequence is empty, or when the underlying sink's
      60                 :     awaitable reports readiness via `await_ready`.
      61                 : 
      62                 :     @par Thread Safety
      63                 :     Not thread-safe. Concurrent operations on the same wrapper
      64                 :     are undefined behavior.
      65                 : 
      66                 :     @par Example
      67                 :     @code
      68                 :     // Owning - takes ownership of the sink
      69                 :     any_write_sink ws(some_sink{args...});
      70                 : 
      71                 :     // Reference - wraps without ownership
      72                 :     some_sink sink;
      73                 :     any_write_sink ws(&sink);
      74                 : 
      75                 :     const_buffer buf(data, size);
      76                 :     auto [ec, n] = co_await ws.write(std::span(&buf, 1));
      77                 :     auto [ec2] = co_await ws.write_eof();
      78                 :     @endcode
      79                 : 
      80                 :     @see any_write_stream, WriteSink
      81                 : */
      82                 : class any_write_sink
      83                 : {
      84                 :     struct vtable;
      85                 :     struct write_awaitable_ops;
      86                 :     struct eof_awaitable_ops;
      87                 : 
      88                 :     template<WriteSink S>
      89                 :     struct vtable_for_impl;
      90                 : 
      91                 :     void* sink_ = nullptr;
      92                 :     vtable const* vt_ = nullptr;
      93                 :     void* cached_awaitable_ = nullptr;
      94                 :     void* storage_ = nullptr;
      95                 :     write_awaitable_ops const* active_write_ops_ = nullptr;
      96                 :     eof_awaitable_ops const* active_eof_ops_ = nullptr;
      97                 : 
      98                 : public:
      99                 :     /** Destructor.
     100                 : 
     101                 :         Destroys the owned sink (if any) and releases the cached
     102                 :         awaitable storage.
     103                 :     */
     104                 :     ~any_write_sink();
     105                 : 
     106                 :     /** Construct a default instance.
     107                 : 
     108                 :         Constructs an empty wrapper. Operations on a default-constructed
     109                 :         wrapper result in undefined behavior.
     110                 :     */
     111                 :     any_write_sink() = default;
     112                 : 
     113                 :     /** Non-copyable.
     114                 : 
     115                 :         The awaitable cache is per-instance and cannot be shared.
     116                 :     */
     117                 :     any_write_sink(any_write_sink const&) = delete;
     118                 :     any_write_sink& operator=(any_write_sink const&) = delete;
     119                 : 
     120                 :     /** Construct by moving.
     121                 : 
     122                 :         Transfers ownership of the wrapped sink (if owned) and
     123                 :         cached awaitable storage from `other`. After the move, `other` is
     124                 :         in a default-constructed state.
     125                 : 
     126                 :         @param other The wrapper to move from.
     127                 :     */
     128 HIT           1 :     any_write_sink(any_write_sink&& other) noexcept
     129               1 :         : sink_(std::exchange(other.sink_, nullptr))
     130               1 :         , vt_(std::exchange(other.vt_, nullptr))
     131               1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     132               1 :         , storage_(std::exchange(other.storage_, nullptr))
     133               1 :         , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
     134               1 :         , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
     135                 :     {
     136               1 :     }
     137                 : 
     138                 :     /** Assign by moving.
     139                 : 
     140                 :         Destroys any owned sink and releases existing resources,
     141                 :         then transfers ownership from `other`.
     142                 : 
     143                 :         @param other The wrapper to move from.
     144                 :         @return Reference to this wrapper.
     145                 :     */
     146                 :     any_write_sink&
     147                 :     operator=(any_write_sink&& other) noexcept;
     148                 : 
     149                 :     /** Construct by taking ownership of a WriteSink.
     150                 : 
     151                 :         Allocates storage and moves the sink into this wrapper.
     152                 :         The wrapper owns the sink and will destroy it.
     153                 : 
     154                 :         @param s The sink to take ownership of.
     155                 :     */
     156                 :     template<WriteSink S>
     157                 :         requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     158                 :     any_write_sink(S s);
     159                 : 
     160                 :     /** Construct by wrapping a WriteSink without ownership.
     161                 : 
     162                 :         Wraps the given sink by pointer. The sink must remain
     163                 :         valid for the lifetime of this wrapper.
     164                 : 
     165                 :         @param s Pointer to the sink to wrap.
     166                 :     */
     167                 :     template<WriteSink S>
     168                 :     any_write_sink(S* s);
     169                 : 
     170                 :     /** Check if the wrapper contains a valid sink.
     171                 : 
     172                 :         @return `true` if wrapping a sink, `false` if default-constructed
     173                 :             or moved-from.
     174                 :     */
     175                 :     bool
     176              15 :     has_value() const noexcept
     177                 :     {
     178              15 :         return sink_ != nullptr;
     179                 :     }
     180                 : 
     181                 :     /** Check if the wrapper contains a valid sink.
     182                 : 
     183                 :         @return `true` if wrapping a sink, `false` if default-constructed
     184                 :             or moved-from.
     185                 :     */
     186                 :     explicit
     187               2 :     operator bool() const noexcept
     188                 :     {
     189               2 :         return has_value();
     190                 :     }
     191                 : 
     192                 :     /** Initiate a partial write operation.
     193                 : 
     194                 :         Attempt to write up to `buffer_size( buffers )` bytes from
     195                 :         the provided buffer sequence. May consume less than the
     196                 :         full sequence.
     197                 : 
     198                 :         @param buffers The buffer sequence containing data to write.
     199                 : 
     200                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     201                 : 
     202                 :         @par Immediate Completion
     203                 :         The operation completes immediately without suspending
     204                 :         the calling coroutine when:
     205                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     206                 :         @li The underlying sink's awaitable reports immediate
     207                 :             readiness via `await_ready`.
     208                 : 
     209                 :         @note This is a partial operation and may not process the
     210                 :         entire buffer sequence. Use @ref write for guaranteed
     211                 :         complete transfer.
     212                 : 
     213                 :         @par Preconditions
     214                 :         The wrapper must contain a valid sink (`has_value() == true`).
     215                 :     */
     216                 :     template<ConstBufferSequence CB>
     217                 :     auto
     218                 :     write_some(CB buffers);
     219                 : 
     220                 :     /** Initiate a complete write operation.
     221                 : 
     222                 :         Writes data from the provided buffer sequence. The operation
     223                 :         completes when all bytes have been consumed, or an error
     224                 :         occurs. Forwards to the underlying sink's `write` operation,
     225                 :         windowed through @ref buffer_param when the sequence exceeds
     226                 :         the per-call buffer limit.
     227                 : 
     228                 :         @param buffers The buffer sequence containing data to write.
     229                 : 
     230                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     231                 : 
     232                 :         @par Immediate Completion
     233                 :         The operation completes immediately without suspending
     234                 :         the calling coroutine when:
     235                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     236                 :         @li Every underlying `write` call completes
     237                 :             immediately (the wrapped sink reports readiness
     238                 :             via `await_ready` on each iteration).
     239                 : 
     240                 :         @par Preconditions
     241                 :         The wrapper must contain a valid sink (`has_value() == true`).
     242                 :     */
     243                 :     template<ConstBufferSequence CB>
     244                 :     io_task<std::size_t>
     245                 :     write(CB buffers);
     246                 : 
     247                 :     /** Atomically write data and signal end-of-stream.
     248                 : 
     249                 :         Writes all data from the buffer sequence and then signals
     250                 :         end-of-stream. The implementation decides how to partition
     251                 :         the data across calls to the underlying sink's @ref write
     252                 :         and `write_eof`. When the caller's buffer sequence is
     253                 :         non-empty, the final call to the underlying sink is always
     254                 :         `write_eof` with a non-empty buffer sequence. When the
     255                 :         caller's buffer sequence is empty, only `write_eof()` with
     256                 :         no data is called.
     257                 : 
     258                 :         @param buffers The buffer sequence containing data to write.
     259                 : 
     260                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     261                 : 
     262                 :         @par Immediate Completion
     263                 :         The operation completes immediately without suspending
     264                 :         the calling coroutine when:
     265                 :         @li The buffer sequence is empty. Only the @ref write_eof()
     266                 :             call is performed.
     267                 :         @li All underlying operations complete immediately (the
     268                 :             wrapped sink reports readiness via `await_ready`).
     269                 : 
     270                 :         @par Preconditions
     271                 :         The wrapper must contain a valid sink (`has_value() == true`).
     272                 :     */
     273                 :     template<ConstBufferSequence CB>
     274                 :     io_task<std::size_t>
     275                 :     write_eof(CB buffers);
     276                 : 
     277                 :     /** Signal end of data.
     278                 : 
     279                 :         Indicates that no more data will be written to the sink.
     280                 :         The operation completes when the sink is finalized, or
     281                 :         an error occurs.
     282                 : 
     283                 :         @return An awaitable that await-returns `(error_code)`.
     284                 : 
     285                 :         @par Immediate Completion
     286                 :         The operation completes immediately without suspending
     287                 :         the calling coroutine when the underlying sink's awaitable
     288                 :         reports immediate readiness via `await_ready`.
     289                 : 
     290                 :         @par Preconditions
     291                 :         The wrapper must contain a valid sink (`has_value() == true`).
     292                 :     */
     293                 :     auto
     294                 :     write_eof();
     295                 : 
     296                 : protected:
     297                 :     /** Rebind to a new sink after move.
     298                 : 
     299                 :         Updates the internal pointer to reference a new sink object.
     300                 :         Used by owning wrappers after move assignment when the owned
     301                 :         object has moved to a new location.
     302                 : 
     303                 :         @param new_sink The new sink to bind to. Must be the same
     304                 :             type as the original sink.
     305                 : 
     306                 :         @note Terminates if called with a sink of different type
     307                 :             than the original.
     308                 :     */
     309                 :     template<WriteSink S>
     310                 :     void
     311                 :     rebind(S& new_sink) noexcept
     312                 :     {
     313                 :         if(vt_ != &vtable_for_impl<S>::value)
     314                 :             std::terminate();
     315                 :         sink_ = &new_sink;
     316                 :     }
     317                 : 
     318                 : private:
     319                 :     auto
     320                 :     write_some_(std::span<const_buffer const> buffers);
     321                 : 
     322                 :     auto
     323                 :     write_(std::span<const_buffer const> buffers);
     324                 : 
     325                 :     auto
     326                 :     write_eof_buffers_(std::span<const_buffer const> buffers);
     327                 : };
     328                 : 
     329                 : struct any_write_sink::write_awaitable_ops
     330                 : {
     331                 :     bool (*await_ready)(void*);
     332                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     333                 :     io_result<std::size_t> (*await_resume)(void*);
     334                 :     void (*destroy)(void*) noexcept;
     335                 : };
     336                 : 
     337                 : struct any_write_sink::eof_awaitable_ops
     338                 : {
     339                 :     bool (*await_ready)(void*);
     340                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     341                 :     io_result<> (*await_resume)(void*);
     342                 :     void (*destroy)(void*) noexcept;
     343                 : };
     344                 : 
     345                 : struct any_write_sink::vtable
     346                 : {
     347                 :     write_awaitable_ops const* (*construct_write_some_awaitable)(
     348                 :         void* sink,
     349                 :         void* storage,
     350                 :         std::span<const_buffer const> buffers);
     351                 :     write_awaitable_ops const* (*construct_write_awaitable)(
     352                 :         void* sink,
     353                 :         void* storage,
     354                 :         std::span<const_buffer const> buffers);
     355                 :     write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
     356                 :         void* sink,
     357                 :         void* storage,
     358                 :         std::span<const_buffer const> buffers);
     359                 :     eof_awaitable_ops const* (*construct_eof_awaitable)(
     360                 :         void* sink,
     361                 :         void* storage);
     362                 :     std::size_t awaitable_size;
     363                 :     std::size_t awaitable_align;
     364                 :     void (*destroy)(void*) noexcept;
     365                 : };
     366                 : 
     367                 : template<WriteSink S>
     368                 : struct any_write_sink::vtable_for_impl
     369                 : {
     370                 :     using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
     371                 :         std::span<const_buffer const>{}));
     372                 :     using WriteAwaitable = decltype(std::declval<S&>().write(
     373                 :         std::span<const_buffer const>{}));
     374                 :     using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
     375                 :         std::span<const_buffer const>{}));
     376                 :     using EofAwaitable = decltype(std::declval<S&>().write_eof());
     377                 : 
     378                 :     static void
     379               6 :     do_destroy_impl(void* sink) noexcept
     380                 :     {
     381               6 :         static_cast<S*>(sink)->~S();
     382               6 :     }
     383                 : 
     384                 :     static write_awaitable_ops const*
     385              40 :     construct_write_some_awaitable_impl(
     386                 :         void* sink,
     387                 :         void* storage,
     388                 :         std::span<const_buffer const> buffers)
     389                 :     {
     390              40 :         auto& s = *static_cast<S*>(sink);
     391              40 :         ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
     392                 : 
     393                 :         static constexpr write_awaitable_ops ops = {
     394              40 :             +[](void* p) {
     395              40 :                 return static_cast<WriteSomeAwaitable*>(p)->await_ready();
     396                 :             },
     397               2 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     398               2 :                 return detail::call_await_suspend(
     399               2 :                     static_cast<WriteSomeAwaitable*>(p), h, env);
     400                 :             },
     401              38 :             +[](void* p) {
     402              38 :                 return static_cast<WriteSomeAwaitable*>(p)->await_resume();
     403                 :             },
     404              42 :             +[](void* p) noexcept {
     405               2 :                 static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
     406                 :             }
     407                 :         };
     408              40 :         return &ops;
     409                 :     }
     410                 : 
     411                 :     static write_awaitable_ops const*
     412              78 :     construct_write_awaitable_impl(
     413                 :         void* sink,
     414                 :         void* storage,
     415                 :         std::span<const_buffer const> buffers)
     416                 :     {
     417              78 :         auto& s = *static_cast<S*>(sink);
     418              78 :         ::new(storage) WriteAwaitable(s.write(buffers));
     419                 : 
     420                 :         static constexpr write_awaitable_ops ops = {
     421              78 :             +[](void* p) {
     422              78 :                 return static_cast<WriteAwaitable*>(p)->await_ready();
     423                 :             },
     424 MIS           0 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     425               0 :                 return detail::call_await_suspend(
     426               0 :                     static_cast<WriteAwaitable*>(p), h, env);
     427                 :             },
     428 HIT          78 :             +[](void* p) {
     429              78 :                 return static_cast<WriteAwaitable*>(p)->await_resume();
     430                 :             },
     431              78 :             +[](void* p) noexcept {
     432 MIS           0 :                 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
     433                 :             }
     434                 :         };
     435 HIT          78 :         return &ops;
     436                 :     }
     437                 : 
     438                 :     static write_awaitable_ops const*
     439              16 :     construct_write_eof_buffers_awaitable_impl(
     440                 :         void* sink,
     441                 :         void* storage,
     442                 :         std::span<const_buffer const> buffers)
     443                 :     {
     444              16 :         auto& s = *static_cast<S*>(sink);
     445              16 :         ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
     446                 : 
     447                 :         static constexpr write_awaitable_ops ops = {
     448              16 :             +[](void* p) {
     449              16 :                 return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
     450                 :             },
     451 MIS           0 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     452               0 :                 return detail::call_await_suspend(
     453               0 :                     static_cast<WriteEofBuffersAwaitable*>(p), h, env);
     454                 :             },
     455 HIT          16 :             +[](void* p) {
     456              16 :                 return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
     457                 :             },
     458              16 :             +[](void* p) noexcept {
     459 MIS           0 :                 static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
     460                 :             }
     461                 :         };
     462 HIT          16 :         return &ops;
     463                 :     }
     464                 : 
     465                 :     static eof_awaitable_ops const*
     466              17 :     construct_eof_awaitable_impl(
     467                 :         void* sink,
     468                 :         void* storage)
     469                 :     {
     470              17 :         auto& s = *static_cast<S*>(sink);
     471              17 :         ::new(storage) EofAwaitable(s.write_eof());
     472                 : 
     473                 :         static constexpr eof_awaitable_ops ops = {
     474              17 :             +[](void* p) {
     475              17 :                 return static_cast<EofAwaitable*>(p)->await_ready();
     476                 :             },
     477               1 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     478               1 :                 return detail::call_await_suspend(
     479               1 :                     static_cast<EofAwaitable*>(p), h, env);
     480                 :             },
     481              16 :             +[](void* p) {
     482              16 :                 return static_cast<EofAwaitable*>(p)->await_resume();
     483                 :             },
     484              18 :             +[](void* p) noexcept {
     485               1 :                 static_cast<EofAwaitable*>(p)->~EofAwaitable();
     486                 :             }
     487                 :         };
     488              17 :         return &ops;
     489                 :     }
     490                 : 
     491                 :     static constexpr std::size_t max4(
     492                 :         std::size_t a, std::size_t b,
     493                 :         std::size_t c, std::size_t d) noexcept
     494                 :     {
     495                 :         std::size_t ab = a > b ? a : b;
     496                 :         std::size_t cd = c > d ? c : d;
     497                 :         return ab > cd ? ab : cd;
     498                 :     }
     499                 : 
     500                 :     static constexpr std::size_t max_awaitable_size =
     501                 :         max4(sizeof(WriteSomeAwaitable),
     502                 :              sizeof(WriteAwaitable),
     503                 :              sizeof(WriteEofBuffersAwaitable),
     504                 :              sizeof(EofAwaitable));
     505                 : 
     506                 :     static constexpr std::size_t max_awaitable_align =
     507                 :         max4(alignof(WriteSomeAwaitable),
     508                 :              alignof(WriteAwaitable),
     509                 :              alignof(WriteEofBuffersAwaitable),
     510                 :              alignof(EofAwaitable));
     511                 : 
     512                 :     static constexpr vtable value = {
     513                 :         &construct_write_some_awaitable_impl,
     514                 :         &construct_write_awaitable_impl,
     515                 :         &construct_write_eof_buffers_awaitable_impl,
     516                 :         &construct_eof_awaitable_impl,
     517                 :         max_awaitable_size,
     518                 :         max_awaitable_align,
     519                 :         &do_destroy_impl
     520                 :     };
     521                 : };
     522                 : 
     523                 : inline
     524             129 : any_write_sink::~any_write_sink()
     525                 : {
     526             129 :     if(storage_)
     527                 :     {
     528               6 :         vt_->destroy(sink_);
     529               6 :         ::operator delete(storage_);
     530                 :     }
     531             129 :     if(cached_awaitable_)
     532                 :     {
     533             124 :         if(active_write_ops_)
     534               1 :             active_write_ops_->destroy(cached_awaitable_);
     535             123 :         else if(active_eof_ops_)
     536               1 :             active_eof_ops_->destroy(cached_awaitable_);
     537             124 :         ::operator delete(cached_awaitable_);
     538                 :     }
     539             129 : }
     540                 : 
     541                 : inline any_write_sink&
     542               2 : any_write_sink::operator=(any_write_sink&& other) noexcept
     543                 : {
     544               2 :     if(this != &other)
     545                 :     {
     546               2 :         if(storage_)
     547                 :         {
     548 MIS           0 :             vt_->destroy(sink_);
     549               0 :             ::operator delete(storage_);
     550                 :         }
     551 HIT           2 :         if(cached_awaitable_)
     552                 :         {
     553               1 :             if(active_write_ops_)
     554               1 :                 active_write_ops_->destroy(cached_awaitable_);
     555 MIS           0 :             else if(active_eof_ops_)
     556               0 :                 active_eof_ops_->destroy(cached_awaitable_);
     557 HIT           1 :             ::operator delete(cached_awaitable_);
     558                 :         }
     559               2 :         sink_ = std::exchange(other.sink_, nullptr);
     560               2 :         vt_ = std::exchange(other.vt_, nullptr);
     561               2 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     562               2 :         storage_ = std::exchange(other.storage_, nullptr);
     563               2 :         active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
     564               2 :         active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
     565                 :     }
     566               2 :     return *this;
     567                 : }
     568                 : 
     569                 : template<WriteSink S>
     570                 :     requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     571               6 : any_write_sink::any_write_sink(S s)
     572               6 :     : vt_(&vtable_for_impl<S>::value)
     573                 : {
     574                 :     struct guard {
     575                 :         any_write_sink* self;
     576                 :         bool committed = false;
     577               6 :         ~guard() {
     578               6 :             if(!committed && self->storage_) {
     579 MIS           0 :                 self->vt_->destroy(self->sink_);
     580               0 :                 ::operator delete(self->storage_);
     581               0 :                 self->storage_ = nullptr;
     582               0 :                 self->sink_ = nullptr;
     583                 :             }
     584 HIT           6 :         }
     585               6 :     } g{this};
     586                 : 
     587               6 :     storage_ = ::operator new(sizeof(S));
     588               6 :     sink_ = ::new(storage_) S(std::move(s));
     589                 : 
     590                 :     // Preallocate the awaitable storage (sized for max of write/eof)
     591               6 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     592                 : 
     593               6 :     g.committed = true;
     594               6 : }
     595                 : 
     596                 : template<WriteSink S>
     597             119 : any_write_sink::any_write_sink(S* s)
     598             119 :     : sink_(s)
     599             119 :     , vt_(&vtable_for_impl<S>::value)
     600                 : {
     601                 :     // Preallocate the awaitable storage (sized for max of write/eof)
     602             119 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     603             119 : }
     604                 : 
     605                 : inline auto
     606                 : any_write_sink::write_some_(
     607                 :     std::span<const_buffer const> buffers)
     608                 : {
     609                 :     struct awaitable
     610                 :     {
     611                 :         any_write_sink* self_;
     612                 :         std::span<const_buffer const> buffers_;
     613                 : 
     614                 :         bool
     615                 :         await_ready() const noexcept
     616                 :         {
     617                 :             return false;
     618                 :         }
     619                 : 
     620                 :         std::coroutine_handle<>
     621                 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     622                 :         {
     623                 :             self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
     624                 :                 self_->sink_,
     625                 :                 self_->cached_awaitable_,
     626                 :                 buffers_);
     627                 : 
     628                 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     629                 :                 return h;
     630                 : 
     631                 :             return self_->active_write_ops_->await_suspend(
     632                 :                 self_->cached_awaitable_, h, env);
     633                 :         }
     634                 : 
     635                 :         io_result<std::size_t>
     636                 :         await_resume()
     637                 :         {
     638                 :             struct guard {
     639                 :                 any_write_sink* self;
     640                 :                 ~guard() {
     641                 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     642                 :                     self->active_write_ops_ = nullptr;
     643                 :                 }
     644                 :             } g{self_};
     645                 :             return self_->active_write_ops_->await_resume(
     646                 :                 self_->cached_awaitable_);
     647                 :         }
     648                 :     };
     649                 :     return awaitable{this, buffers};
     650                 : }
     651                 : 
     652                 : inline auto
     653              78 : any_write_sink::write_(
     654                 :     std::span<const_buffer const> buffers)
     655                 : {
     656                 :     struct awaitable
     657                 :     {
     658                 :         any_write_sink* self_;
     659                 :         std::span<const_buffer const> buffers_;
     660                 : 
     661                 :         bool
     662              78 :         await_ready() const noexcept
     663                 :         {
     664              78 :             return false;
     665                 :         }
     666                 : 
     667                 :         std::coroutine_handle<>
     668              78 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     669                 :         {
     670             156 :             self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
     671              78 :                 self_->sink_,
     672              78 :                 self_->cached_awaitable_,
     673                 :                 buffers_);
     674                 : 
     675              78 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     676              78 :                 return h;
     677                 : 
     678 MIS           0 :             return self_->active_write_ops_->await_suspend(
     679               0 :                 self_->cached_awaitable_, h, env);
     680                 :         }
     681                 : 
     682                 :         io_result<std::size_t>
     683 HIT          78 :         await_resume()
     684                 :         {
     685                 :             struct guard {
     686                 :                 any_write_sink* self;
     687              78 :                 ~guard() {
     688              78 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     689              78 :                     self->active_write_ops_ = nullptr;
     690              78 :                 }
     691              78 :             } g{self_};
     692              78 :             return self_->active_write_ops_->await_resume(
     693             135 :                 self_->cached_awaitable_);
     694              78 :         }
     695                 :     };
     696              78 :     return awaitable{this, buffers};
     697                 : }
     698                 : 
     699                 : inline auto
     700              17 : any_write_sink::write_eof()
     701                 : {
     702                 :     struct awaitable
     703                 :     {
     704                 :         any_write_sink* self_;
     705                 : 
     706                 :         bool
     707              17 :         await_ready() const noexcept
     708                 :         {
     709              17 :             return false;
     710                 :         }
     711                 : 
     712                 :         std::coroutine_handle<>
     713              17 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     714                 :         {
     715                 :             // Construct the underlying awaitable into cached storage
     716              34 :             self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
     717              17 :                 self_->sink_,
     718              17 :                 self_->cached_awaitable_);
     719                 : 
     720                 :             // Check if underlying is immediately ready
     721              17 :             if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
     722              16 :                 return h;
     723                 : 
     724                 :             // Forward to underlying awaitable
     725               1 :             return self_->active_eof_ops_->await_suspend(
     726               1 :                 self_->cached_awaitable_, h, env);
     727                 :         }
     728                 : 
     729                 :         io_result<>
     730              16 :         await_resume()
     731                 :         {
     732                 :             struct guard {
     733                 :                 any_write_sink* self;
     734              16 :                 ~guard() {
     735              16 :                     self->active_eof_ops_->destroy(self->cached_awaitable_);
     736              16 :                     self->active_eof_ops_ = nullptr;
     737              16 :                 }
     738              16 :             } g{self_};
     739              16 :             return self_->active_eof_ops_->await_resume(
     740              27 :                 self_->cached_awaitable_);
     741              16 :         }
     742                 :     };
     743              17 :     return awaitable{this};
     744                 : }
     745                 : 
     746                 : inline auto
     747              16 : any_write_sink::write_eof_buffers_(
     748                 :     std::span<const_buffer const> buffers)
     749                 : {
     750                 :     struct awaitable
     751                 :     {
     752                 :         any_write_sink* self_;
     753                 :         std::span<const_buffer const> buffers_;
     754                 : 
     755                 :         bool
     756              16 :         await_ready() const noexcept
     757                 :         {
     758              16 :             return false;
     759                 :         }
     760                 : 
     761                 :         std::coroutine_handle<>
     762              16 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     763                 :         {
     764              32 :             self_->active_write_ops_ =
     765              32 :                 self_->vt_->construct_write_eof_buffers_awaitable(
     766              16 :                     self_->sink_,
     767              16 :                     self_->cached_awaitable_,
     768                 :                     buffers_);
     769                 : 
     770              16 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     771              16 :                 return h;
     772                 : 
     773 MIS           0 :             return self_->active_write_ops_->await_suspend(
     774               0 :                 self_->cached_awaitable_, h, env);
     775                 :         }
     776                 : 
     777                 :         io_result<std::size_t>
     778 HIT          16 :         await_resume()
     779                 :         {
     780                 :             struct guard {
     781                 :                 any_write_sink* self;
     782              16 :                 ~guard() {
     783              16 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     784              16 :                     self->active_write_ops_ = nullptr;
     785              16 :                 }
     786              16 :             } g{self_};
     787              16 :             return self_->active_write_ops_->await_resume(
     788              27 :                 self_->cached_awaitable_);
     789              16 :         }
     790                 :     };
     791              16 :     return awaitable{this, buffers};
     792                 : }
     793                 : 
     794                 : template<ConstBufferSequence CB>
     795                 : auto
     796              42 : any_write_sink::write_some(CB buffers)
     797                 : {
     798                 :     struct awaitable
     799                 :     {
     800                 :         any_write_sink* self_;
     801                 :         const_buffer_array<detail::max_iovec_> ba_;
     802                 : 
     803              42 :         awaitable(
     804                 :             any_write_sink* self,
     805                 :             CB const& buffers)
     806              42 :             : self_(self)
     807              42 :             , ba_(buffers)
     808                 :         {
     809              42 :         }
     810                 : 
     811                 :         bool
     812              42 :         await_ready() const noexcept
     813                 :         {
     814              42 :             return ba_.to_span().empty();
     815                 :         }
     816                 : 
     817                 :         std::coroutine_handle<>
     818              40 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     819                 :         {
     820              40 :             self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
     821              40 :                 self_->sink_,
     822              40 :                 self_->cached_awaitable_,
     823              40 :                 ba_.to_span());
     824                 : 
     825              40 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     826              38 :                 return h;
     827                 : 
     828               2 :             return self_->active_write_ops_->await_suspend(
     829               2 :                 self_->cached_awaitable_, h, env);
     830                 :         }
     831                 : 
     832                 :         io_result<std::size_t>
     833              40 :         await_resume()
     834                 :         {
     835              40 :             if(ba_.to_span().empty())
     836               2 :                 return {{}, 0};
     837                 : 
     838                 :             struct guard {
     839                 :                 any_write_sink* self;
     840              38 :                 ~guard() {
     841              38 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     842              38 :                     self->active_write_ops_ = nullptr;
     843              38 :                 }
     844              38 :             } g{self_};
     845              38 :             return self_->active_write_ops_->await_resume(
     846              38 :                 self_->cached_awaitable_);
     847              38 :         }
     848                 :     };
     849              42 :     return awaitable{this, buffers};
     850                 : }
     851                 : 
     852                 : template<ConstBufferSequence CB>
     853                 : io_task<std::size_t>
     854              68 : any_write_sink::write(CB buffers)
     855                 : {
     856                 :     buffer_param<CB> bp(buffers);
     857                 :     std::size_t total = 0;
     858                 : 
     859                 :     for(;;)
     860                 :     {
     861                 :         auto bufs = bp.data();
     862                 :         if(bufs.empty())
     863                 :             break;
     864                 : 
     865                 :         auto [ec, n] = co_await write_(bufs);
     866                 :         total += n;
     867                 :         if(ec)
     868                 :             co_return {ec, total};
     869                 :         bp.consume(n);
     870                 :     }
     871                 : 
     872                 :     co_return {{}, total};
     873             136 : }
     874                 : 
     875                 : template<ConstBufferSequence CB>
     876                 : io_task<std::size_t>
     877              26 : any_write_sink::write_eof(CB buffers)
     878                 : {
     879                 :     const_buffer_param<CB> bp(buffers);
     880                 :     std::size_t total = 0;
     881                 : 
     882                 :     for(;;)
     883                 :     {
     884                 :         auto bufs = bp.data();
     885                 :         if(bufs.empty())
     886                 :         {
     887                 :             auto [ec] = co_await write_eof();
     888                 :             co_return {ec, total};
     889                 :         }
     890                 : 
     891                 :         if(! bp.more())
     892                 :         {
     893                 :             // Last window — send atomically with EOF
     894                 :             auto [ec, n] = co_await write_eof_buffers_(bufs);
     895                 :             total += n;
     896                 :             co_return {ec, total};
     897                 :         }
     898                 : 
     899                 :         auto [ec, n] = co_await write_(bufs);
     900                 :         total += n;
     901                 :         if(ec)
     902                 :             co_return {ec, total};
     903                 :         bp.consume(n);
     904                 :     }
     905              52 : }
     906                 : 
     907                 : } // namespace capy
     908                 : } // namespace boost
     909                 : 
     910                 : #endif
        

Generated by: LCOV version 2.3