LibIPC: Move message decoding from main thread to I/O thread

Previously, IPC messages were decoded on the main thread:

1. I/O thread received raw bytes and file descriptors
2. I/O thread stored them in a queue and notified main thread
3. Main thread decoded bytes into Message objects
4. Main thread processed the messages

Now, decoding happens on the I/O thread:

1. I/O thread receives raw bytes and file descriptors
2. I/O thread decodes them using a configurable MessageDecoder
3. I/O thread calls MessageHandler which stores decoded messages
4. I/O thread signals condition variable (for sync waiters)
5. I/O thread wakes main event loop via deferred_invoke()
6. Main thread processes already-decoded messages

This is achieved by:

- Adding MessageDecoder and MessageHandler callbacks to TransportSocket
- Connection template sets up the decoder (tries both endpoints)
- ConnectionBase::initialize_messaging() sets up the handler
- Storing a WeakEventLoopReference to wake the main thread
- Using mutex + condition variable for thread-safe queue access
- Sync message waiting now uses the CV directly instead of polling

The raw message API (read_as_many_messages_as_possible_without_blocking)
is preserved for MessagePort which uses its own decoding logic.

This architecture prepares for future multi-thread dispatch where
different message types could be routed to different handler threads
(e.g., scrolling messages to a dedicated scroll thread).
This commit is contained in:
Andreas Kling
2026-01-23 17:30:51 +01:00
committed by Andreas Kling
parent 96ce468b60
commit 757795ada4
Notes: github-actions[bot] 2026-02-06 11:08:12 +00:00
6 changed files with 265 additions and 103 deletions

View File

@@ -10,6 +10,7 @@
#include <AK/Types.h>
#include <LibIPC/HandleType.h>
#include <LibIPC/Limits.h>
#include <LibIPC/Message.h>
#include <LibIPC/TransportSocketWindows.h>
#include <AK/Windows.h>
@@ -21,6 +22,39 @@ TransportSocketWindows::TransportSocketWindows(NonnullOwnPtr<Core::LocalSocket>
{
}
void TransportSocketWindows::set_message_decoder(MessageDecoder decoder)
{
m_decoder = move(decoder);
}
void TransportSocketWindows::set_message_handler(MessageHandler handler)
{
m_message_handler = move(handler);
}
void TransportSocketWindows::set_peer_closed_handler(PeerClosedHandler handler)
{
m_peer_closed_handler = move(handler);
}
void TransportSocketWindows::start()
{
// Windows does not use a separate I/O thread.
// Instead, set up a read hook that decodes messages on the main thread.
VERIFY(m_decoder);
VERIFY(m_message_handler);
m_socket->on_ready_to_read = [this] {
auto should_shutdown = read_as_many_messages_as_possible_without_blocking([this](Message&& message) {
Queue<File> fds;
if (auto decoded = m_decoder(message.bytes.span(), fds)) {
m_message_handler(decoded.release_nonnull());
}
});
if (should_shutdown == ShouldShutdown::Yes && m_peer_closed_handler)
m_peer_closed_handler();
};
}
void TransportSocketWindows::set_peer_pid(int pid)
{
m_peer_pid = pid;
@@ -28,6 +62,7 @@ void TransportSocketWindows::set_peer_pid(int pid)
void TransportSocketWindows::set_up_read_hook(Function<void()> hook)
{
// Raw message path (used by MessagePort)
VERIFY(m_socket->is_open());
m_socket->on_ready_to_read = move(hook);
}