/* * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ #include #include namespace Media { NonnullRefPtr IncrementallyPopulatedStream::create_empty() { return adopt_ref(*new IncrementallyPopulatedStream({}, false)); } NonnullRefPtr IncrementallyPopulatedStream::create_from_buffer(ByteBuffer&& buffer) { return adopt_ref(*new IncrementallyPopulatedStream(move(buffer), true)); } void IncrementallyPopulatedStream::append(ByteBuffer&& buffer) { Threading::MutexLocker locker { m_mutex }; m_buffer.append(buffer); m_state_changed.broadcast(); } void IncrementallyPopulatedStream::close() { Threading::MutexLocker locker { m_mutex }; m_closed = true; m_state_changed.broadcast(); } u64 IncrementallyPopulatedStream::size() { Threading::MutexLocker locker { m_mutex }; while (!m_closed && !m_expected_size.has_value()) m_state_changed.wait(); if (m_closed) return m_buffer.size(); return m_expected_size.value(); } void IncrementallyPopulatedStream::set_expected_size(u64 expected_size) { Threading::MutexLocker locker { m_mutex }; m_expected_size = expected_size; m_buffer.ensure_capacity(expected_size); m_state_changed.broadcast(); } DecoderErrorOr IncrementallyPopulatedStream::read_at(Cursor& consumer, size_t position, Bytes& bytes, AllowPositionAtEnd allow_position_at_end) { Threading::MutexLocker locker { m_mutex }; while (position + bytes.size() > m_buffer.size() && !m_closed && !consumer.m_aborted) { consumer.m_blocked = true; m_state_changed.wait(); consumer.m_blocked = false; } if (consumer.m_aborted) return DecoderError::with_description(DecoderErrorCategory::Aborted, "Blocking read was aborted"sv); if (position > m_buffer.size() || (allow_position_at_end == AllowPositionAtEnd::No && position == m_buffer.size())) return DecoderError::with_description(DecoderErrorCategory::EndOfStream, "Blocking read reached end of stream"sv); return m_buffer.bytes().slice(position).copy_trimmed_to(bytes); } DecoderErrorOr IncrementallyPopulatedStream::Cursor::seek(size_t offset, SeekMode mode) { size_t new_position = m_position; switch (mode) { case SeekMode::SetPosition: new_position = offset; break; case SeekMode::FromCurrentPosition: new_position += offset; break; case SeekMode::FromEndPosition: new_position = this->size() + offset; break; default: VERIFY_NOT_REACHED(); } Bytes empty; TRY(m_stream->read_at(*this, new_position, empty, AllowPositionAtEnd::Yes)); m_position = new_position; return {}; } DecoderErrorOr IncrementallyPopulatedStream::Cursor::read_into(Bytes bytes) { auto read_count = TRY(m_stream->read_at(*this, m_position, bytes, AllowPositionAtEnd::No)); m_position += read_count; return read_count; } void IncrementallyPopulatedStream::Cursor::abort() { Threading::MutexLocker locker { m_stream->m_mutex }; m_aborted = true; m_stream->m_state_changed.broadcast(); } }