LibIPC: Notify readers when thread exits

This commit is contained in:
Jonathan Gamble
2026-02-12 23:33:40 -06:00
committed by Gregory Bertilson
parent 19af2bebc7
commit fd0709b6ce
Notes: github-actions[bot] 2026-03-14 07:06:26 +00:00
5 changed files with 112 additions and 10 deletions

View File

@@ -81,11 +81,9 @@ TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
m_send_queue = adopt_ref(*new SendQueue);
{
auto fds = MUST(Core::System::pipe2(O_CLOEXEC | O_NONBLOCK));
m_wakeup_io_thread_read_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[0]));
m_wakeup_io_thread_write_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[1]));
}
auto fds = MUST(Core::System::pipe2(O_CLOEXEC | O_NONBLOCK));
m_wakeup_io_thread_read_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[0]));
m_wakeup_io_thread_write_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[1]));
{
auto fds = MUST(Core::System::pipe2(O_CLOEXEC | O_NONBLOCK));
@@ -164,6 +162,7 @@ intptr_t TransportSocket::io_thread_loop()
VERIFY(m_io_thread_state == IOThreadState::Stopped);
m_peer_eof = true;
m_incoming_cv.broadcast();
notify_read_available();
return 0;
}
@@ -188,6 +187,14 @@ void TransportSocket::stop_io_thread(IOThreadState desired_state)
(void)m_io_thread->join();
}
void TransportSocket::notify_read_available()
{
if (!m_notify_hook_write_fd)
return;
Array<u8, 1> bytes = { 0 };
(void)Core::System::write(m_notify_hook_write_fd->value(), bytes);
}
void TransportSocket::set_up_read_hook(Function<void()> hook)
{
m_on_read_hook = move(hook);
@@ -472,11 +479,6 @@ void TransportSocket::read_incoming_messages()
m_unprocessed_bytes.clear();
}
auto notify_read_available = [&] {
Array<u8, 1> bytes = { 0 };
(void)Core::System::write(m_notify_hook_write_fd->value(), bytes);
};
if (!batch.is_empty()) {
Threading::MutexLocker locker(m_incoming_mutex);
m_incoming_messages.extend(move(batch));

View File

@@ -91,6 +91,7 @@ private:
void stop_io_thread(IOThreadState desired_state);
void wake_io_thread();
void read_incoming_messages();
void notify_read_available();
NonnullOwnPtr<Core::LocalSocket> m_socket;

View File

@@ -6,6 +6,7 @@ add_subdirectory(LibDatabase)
add_subdirectory(LibDiff)
add_subdirectory(LibDNS)
add_subdirectory(LibHTTP)
add_subdirectory(LibIPC)
add_subdirectory(LibJS)
add_subdirectory(LibRegex)
add_subdirectory(LibTest)

View File

@@ -0,0 +1,3 @@
if (NOT WIN32)
ladybird_test("TestTransportSocket.cpp" LibIPC LIBS LibIPC)
endif()

View File

@@ -0,0 +1,95 @@
/*
* Copyright (c) 2026, The Ladybird developers
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#include <AK/Atomic.h>
#include <AK/Function.h>
#include <AK/Time.h>
#include <LibCore/EventLoop.h>
#include <LibCore/Socket.h>
#include <LibCore/System.h>
#include <LibIPC/TransportSocket.h>
#include <LibTest/TestCase.h>
using namespace AK::TimeLiterals;
static void spin_until(Core::EventLoop& loop, Function<bool()> condition, AK::Duration timeout = 2000_ms)
{
i64 const timeout_ms = timeout.to_milliseconds();
for (i64 elapsed_ms = 0; elapsed_ms < timeout_ms; elapsed_ms += 5) {
(void)loop.pump(Core::EventLoop::WaitMode::PollForEvents);
if (condition())
return;
MUST(Core::System::sleep_ms(5));
}
FAIL("Timed out waiting for condition");
}
TEST_CASE(read_hook_is_notified_on_peer_hangup)
{
Core::EventLoop loop;
int fds[2] = {};
MUST(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, fds));
auto reader_socket = TRY_OR_FAIL(Core::LocalSocket::adopt_fd(fds[0]));
auto peer_socket = TRY_OR_FAIL(Core::LocalSocket::adopt_fd(fds[1]));
MUST(reader_socket->set_blocking(false));
MUST(peer_socket->set_blocking(false));
IPC::TransportSocket transport(move(reader_socket));
IGNORE_USE_IN_ESCAPING_LAMBDA Atomic<bool> observed_shutdown = false;
transport.set_up_read_hook([&] {
auto should_shutdown = transport.read_as_many_messages_as_possible_without_blocking([](auto&&) {
});
if (should_shutdown == IPC::TransportSocket::ShouldShutdown::Yes)
observed_shutdown.store(true, AK::MemoryOrder::memory_order_relaxed);
});
peer_socket->close();
spin_until(loop, [&] {
return observed_shutdown.load(AK::MemoryOrder::memory_order_relaxed);
});
EXPECT(observed_shutdown.load(AK::MemoryOrder::memory_order_relaxed));
}
TEST_CASE(read_hook_is_notified_when_io_thread_exits_on_close)
{
Core::EventLoop loop;
int fds[2] = {};
MUST(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, fds));
auto reader_socket = TRY_OR_FAIL(Core::LocalSocket::adopt_fd(fds[0]));
auto peer_socket = TRY_OR_FAIL(Core::LocalSocket::adopt_fd(fds[1]));
MUST(reader_socket->set_blocking(false));
MUST(peer_socket->set_blocking(false));
IPC::TransportSocket transport(move(reader_socket));
IGNORE_USE_IN_ESCAPING_LAMBDA Atomic<bool> observed_shutdown = false;
transport.set_up_read_hook([&] {
auto should_shutdown = transport.read_as_many_messages_as_possible_without_blocking([](auto&&) {
});
if (should_shutdown == IPC::TransportSocket::ShouldShutdown::Yes)
observed_shutdown.store(true, AK::MemoryOrder::memory_order_relaxed);
});
transport.close();
spin_until(loop, [&] {
return observed_shutdown.load(AK::MemoryOrder::memory_order_relaxed);
});
EXPECT(observed_shutdown.load(AK::MemoryOrder::memory_order_relaxed));
}