/* * Copyright (c) 2024, Dan Klishch * * SPDX-License-Identifier: BSD-2-Clause */ #pragma once #include #include #include #include namespace Test { enum class StreamCloseExpectation { Reset, Close, }; class AsyncMemoryInputStream final : public AsyncInputStream { public: AsyncMemoryInputStream(StringView data, StreamCloseExpectation expectation, Vector&& chunks); ~AsyncMemoryInputStream(); void reset() override; Coroutine> close() override; bool is_open() const override; Coroutine> enqueue_some(Badge) override; ReadonlyBytes buffered_data_unchecked(Badge) const override; void dequeue(Badge, size_t bytes) override; private: StringView m_data; StreamCloseExpectation m_expectation; Vector m_chunks; bool m_is_closed { false }; bool m_is_reset { false }; bool m_encountered_eof { false }; size_t m_read_head { 0 }; size_t m_peek_head { 0 }; size_t m_next_chunk_index { 1 }; size_t m_last_enqueue { 0 }; std::coroutine_handle<> m_awaiter; }; class AsyncMemoryOutputStream final : public AsyncOutputStream { public: // FIXME: Support artificial atomic write limits similar to `chunks` parameter in // AsyncMemoryInputStream. AsyncMemoryOutputStream(StreamCloseExpectation expectation); ~AsyncMemoryOutputStream(); void reset() override; Coroutine> close() override; bool is_open() const override; Coroutine> write_some(ReadonlyBytes data) override; ReadonlyBytes view() const { return m_buffer; } private: StreamCloseExpectation m_expectation; bool m_is_closed { false }; bool m_is_reset { false }; ByteBuffer m_buffer; }; Coroutine> read_until_eof(AsyncInputStream& stream); Vector randomly_partition_input(u32 partition_probability_numerator, u32 partition_probability_denominator, size_t length); }