RequestServer: Avoid O(N^2) copy when draining response buffer

write_queued_bytes_without_blocking() used to allocate a Vector sized
to the entire queued response buffer and memcpy every queued byte into
it, on every curl on_data_received callback. When the client pipe was
slower than the network, the buffer grew, and each arriving chunk
triggered a full copy of everything still queued. With enough pending
data this added up to tens of gigabytes of memcpy per request and
stalled RequestServer for tens of seconds.

Drain the stream in a loop using peek_some_contiguous() + send()
directly from the underlying chunk, and discard exactly what the
socket accepted. No intermediate buffer, no copy. The loop exits on
EAGAIN and enables the writer notifier, matching the previous
back-pressure behavior.
This commit is contained in:
Andreas Kling
2026-04-22 09:14:55 +02:00
committed by Jelle Raaijmakers
parent 612c61cc16
commit f0765e80f3
Notes: github-actions[bot] 2026-04-22 11:33:29 +00:00

View File

@@ -791,25 +791,22 @@ void Request::transfer_headers_to_client_if_needed()
ErrorOr<void> Request::write_queued_bytes_without_blocking()
{
Vector<u8> bytes_to_send;
bytes_to_send.resize(m_response_buffer.used_buffer_size());
m_response_buffer.peek_some(bytes_to_send);
auto write_bytes_to_disk_cache = [&](size_t byte_count) {
auto write_bytes_to_disk_cache = [&](ReadonlyBytes bytes) {
if (!m_cache_entry_writer.has_value())
return;
auto bytes_to_write = bytes_to_send.span().slice(0, byte_count);
if (m_cache_entry_writer->write_data(bytes_to_write).is_error())
if (m_cache_entry_writer->write_data(bytes).is_error())
m_cache_entry_writer.clear();
};
if (m_type == RequestType::BackgroundRevalidation) {
write_bytes_to_disk_cache(bytes_to_send.size());
MUST(m_response_buffer.discard(bytes_to_send.size()));
while (!m_response_buffer.is_eof()) {
auto bytes = m_response_buffer.peek_some_contiguous();
write_bytes_to_disk_cache(bytes);
MUST(m_response_buffer.discard(bytes.size()));
}
if (m_response_buffer.is_eof() && m_curl_result_code.has_value())
if (m_curl_result_code.has_value())
transition_to_state(State::Complete);
return {};
@@ -825,22 +822,32 @@ ErrorOr<void> Request::write_queued_bytes_without_blocking()
});
}
auto result = m_client_request_pipe->write(bytes_to_send);
if (result.is_error()) {
if (!first_is_one_of(result.error().code(), EAGAIN, EWOULDBLOCK))
return result.release_error();
while (!m_response_buffer.is_eof()) {
auto bytes = m_response_buffer.peek_some_contiguous();
m_client_writer_notifier->set_enabled(true);
return {};
auto result = m_client_request_pipe->write(bytes);
if (result.is_error()) {
if (!first_is_one_of(result.error().code(), EAGAIN, EWOULDBLOCK))
return result.release_error();
m_client_writer_notifier->set_enabled(true);
return {};
}
auto written = result.value();
write_bytes_to_disk_cache(bytes.slice(0, written));
MUST(m_response_buffer.discard(written));
m_bytes_transferred_to_client += written;
if (written < bytes.size()) {
m_client_writer_notifier->set_enabled(true);
return {};
}
}
write_bytes_to_disk_cache(result.value());
MUST(m_response_buffer.discard(result.value()));
m_bytes_transferred_to_client += result.value();
m_client_writer_notifier->set_enabled(!m_response_buffer.is_eof());
if (m_response_buffer.is_eof() && m_curl_result_code.has_value())
m_client_writer_notifier->set_enabled(false);
if (m_curl_result_code.has_value())
transition_to_state(State::Complete);
return {};