LibThreading: Simplify BackgroundAction callback invocation

Using a Promise in BackgroundAction was not doing anything since the
change to use a weak reference to the event loop, so let's just drop
that.

The thread will now always move itself (and therefore its callbacks)
over to the originating thread before completing, regardless of the
presence of callbacks. This ensures that ref counting remains on the
main thread.

In addition, BackgroundAction's completion callback can no longer
return errors. This functionality wasn't actually used anywhere, it was
a holdover from the behavior of Core::Promise.
This commit is contained in:
Zaggy1024
2026-02-26 22:50:26 -06:00
committed by Gregory Bertilson
parent 567ba4d0ad
commit 1c7aca7e07
Notes: github-actions[bot] 2026-03-02 23:08:52 +00:00
3 changed files with 28 additions and 126 deletions

View File

@@ -42,10 +42,6 @@ class BackgroundAction final
C_OBJECT(BackgroundAction);
public:
// Promise is an implementation detail of BackgroundAction in order to communicate with EventLoop.
// All of the promise's callbacks and state are either managed by us or by EventLoop.
using Promise = Core::Promise<NonnullRefPtr<Core::EventReceiver>>;
virtual ~BackgroundAction() = default;
Optional<Result> const& result() const { return m_result; }
@@ -58,85 +54,40 @@ public:
bool is_canceled() const { return m_canceled.load(AK::MemoryOrder::memory_order_relaxed); }
private:
BackgroundAction(ESCAPING Function<ErrorOr<Result>(BackgroundAction&)> action, ESCAPING Function<ErrorOr<void>(Result)> on_complete, ESCAPING Optional<Function<void(Error)>> on_error = {})
BackgroundAction(ESCAPING Function<ErrorOr<Result>(BackgroundAction&)> action, ESCAPING Function<void(Result)> on_complete, ESCAPING Function<void(Error)> on_error = {})
: m_action(move(action))
, m_on_complete(move(on_complete))
, m_on_error(move(on_error))
{
auto promise = Promise::construct();
if (m_on_complete) {
auto self = NonnullRefPtr(*this);
promise->on_resolution = [](NonnullRefPtr<Core::EventReceiver>& object) -> ErrorOr<void> {
auto self = static_ptr_cast<BackgroundAction<Result>>(object);
VERIFY(self->m_result.has_value());
if (auto maybe_error = self->m_on_complete(self->m_result.release_value()); maybe_error.is_error()) {
// If on_complete returns an error, we pass it along to your on_error handler.
if (self->m_on_error)
self->m_on_error(maybe_error.release_error());
}
return {};
};
promise->on_rejection = [self](Error& error) {
if (error.is_errno() && error.code() == ECANCELED)
self->m_canceled.store(true, AK::MemoryOrder::memory_order_relaxed);
};
Core::EventLoop::current().add_job(promise);
}
if (on_error.has_value())
m_on_error = on_error.release_value();
enqueue_work([self = NonnullRefPtr(*this), promise = move(promise), origin_event_loop = Core::EventLoop::current_weak()]() mutable {
auto* self_ptr = self.ptr();
auto post_to_origin = [&](StringView message_type, Function<void()> callback) {
if (auto origin = origin_event_loop->take()) {
origin->deferred_invoke(move(callback));
} else {
dbgln("BackgroundAction {:p}: dropped {} (origin loop gone)", self_ptr, message_type);
}
};
enqueue_work([self = NonnullRefPtr(*this), origin_event_loop = Core::EventLoop::current_weak()]() mutable {
auto result = self->m_action(*self);
auto const has_job = static_cast<bool>(self->m_on_complete);
auto const canceled = self->m_canceled.load(AK::MemoryOrder::memory_order_relaxed);
if (canceled) {
if (has_job) {
post_to_origin("promise rejection"sv, [promise = move(promise)]() mutable {
promise->reject(Error::from_errno(ECANCELED));
});
}
auto event_loop = origin_event_loop->take();
if (!event_loop) {
dbgln("BackgroundAction {:p} was dropped, the origin loop is gone.", self.ptr());
return;
}
if (!result.is_error()) {
self->m_result = result.release_value();
if (has_job) {
post_to_origin("on_complete"sv, [self = move(self), promise = move(promise)]() mutable {
// Our promise's resolution function will never error.
(void)promise->resolve(*self);
});
event_loop->deferred_invoke([self = move(self), result = move(result)]() mutable {
auto const canceled = self->m_canceled.load(AK::MemoryOrder::memory_order_relaxed);
if (canceled)
return;
if (result.is_error()) {
if (self->m_on_error)
self->m_on_error(result.release_error());
return;
}
return;
}
auto error = result.release_error();
if (has_job) {
post_to_origin("promise rejection"sv, [promise = move(promise), error = Error::copy(error)]() mutable {
promise->reject(Error::copy(error));
});
}
if (self->m_on_error) {
post_to_origin("on_error"sv, [self = move(self), error = Error::copy(error)]() mutable {
self->m_on_error(Error::copy(error));
});
}
if (self->m_on_complete)
self->m_on_complete(result.release_value());
});
});
}
Function<ErrorOr<Result>(BackgroundAction&)> m_action;
Function<ErrorOr<void>(Result)> m_on_complete;
Function<void(Error)> m_on_error = [](Error error) {
dbgln("Error occurred while running a BackgroundAction: {}", error);
};
Function<void(Result)> m_on_complete;
Function<void(Error)> m_on_error;
Optional<Result> m_result;
Atomic<bool> m_canceled { false };
};

View File

@@ -189,7 +189,7 @@ NonnullRefPtr<ConnectionFromClient::Job> ConnectionFromClient::make_decode_image
[encoded_buffer = move(encoded_buffer), ideal_size = move(ideal_size), mime_type = move(mime_type)](auto&) mutable -> ErrorOr<DecodeResult> {
return TRY(decode_image_to_details(move(encoded_buffer), ideal_size, mime_type));
},
[strong_this = NonnullRefPtr(*this), request_id](DecodeResult result) -> ErrorOr<void> {
[strong_this = NonnullRefPtr(*this), request_id](DecodeResult result) {
i64 session_id = 0;
if (result.decoder) {
@@ -204,9 +204,8 @@ NonnullRefPtr<ConnectionFromClient::Job> ConnectionFromClient::make_decode_image
strong_this->async_did_decode_image(request_id, result.is_animated, result.loop_count, move(result.bitmaps), move(result.durations), result.scale, move(result.color_profile), session_id);
strong_this->m_pending_jobs.remove(request_id);
return {};
},
[strong_this = NonnullRefPtr(*this), request_id](Error error) -> void {
[strong_this = NonnullRefPtr(*this), request_id](Error error) {
if (strong_this->is_open())
strong_this->async_did_fail_to_decode_image(request_id, MUST(String::formatted("Decoding failed: {}", error)));
strong_this->m_pending_jobs.remove(request_id);
@@ -263,16 +262,15 @@ void ConnectionFromClient::request_animation_frames(i64 session_id, u32 start_fr
}
return frames;
},
[strong_this = NonnullRefPtr(*this), session_id](Vector<Gfx::ImageFrameDescriptor> frames) -> ErrorOr<void> {
[strong_this = NonnullRefPtr(*this), session_id](Vector<Gfx::ImageFrameDescriptor> frames) {
Vector<RefPtr<Gfx::Bitmap>> bitmaps;
bitmaps.ensure_capacity(frames.size());
for (auto& frame : frames)
bitmaps.unchecked_append(move(frame.image));
strong_this->async_did_decode_animation_frames(session_id, Gfx::BitmapSequence { move(bitmaps) });
strong_this->m_pending_frame_jobs.remove(session_id);
return {};
},
[strong_this = NonnullRefPtr(*this), session_id](Error error) -> void {
[strong_this = NonnullRefPtr(*this), session_id](Error error) {
if (strong_this->is_open())
strong_this->async_did_fail_animation_decode(session_id, MUST(String::formatted("Frame decode failed: {}", error)));
strong_this->m_pending_frame_jobs.remove(session_id);

View File

@@ -48,10 +48,9 @@ TEST_CASE(background_action_on_error_called_on_action_failure_and_on_origin_thre
action_ran.store(true, AK::MemoryOrder::memory_order_relaxed);
return Error::from_string_literal("action failed");
},
[&](int) -> ErrorOr<void> {
[&](int) {
on_complete_called.store(true, AK::MemoryOrder::memory_order_relaxed);
loop.quit(1);
return {};
},
[&](Error error) {
EXPECT(pthread_equal(origin_thread_id, pthread_self()));
@@ -71,51 +70,6 @@ TEST_CASE(background_action_on_error_called_on_action_failure_and_on_origin_thre
(void)background_action;
}
TEST_CASE(background_action_on_error_called_when_on_complete_returns_error)
{
Core::EventLoop loop;
pthread_t const origin_thread_id = pthread_self();
IGNORE_USE_IN_ESCAPING_LAMBDA Atomic<int> on_error_count = 0;
IGNORE_USE_IN_ESCAPING_LAMBDA Atomic<int> on_complete_count = 0;
IGNORE_USE_IN_ESCAPING_LAMBDA Atomic<int> stage = 0;
Optional<pthread_t> action_thread_id;
auto background_action = Threading::BackgroundAction<int>::construct(
[&](auto&) -> ErrorOr<int> {
action_thread_id = pthread_self();
return 42;
},
[&](int value) -> ErrorOr<void> {
EXPECT(pthread_equal(origin_thread_id, pthread_self()));
EXPECT_EQ(value, 42);
on_complete_count.fetch_add(1, AK::MemoryOrder::memory_order_relaxed);
stage.store(1, AK::MemoryOrder::memory_order_relaxed);
return Error::from_string_literal("on_complete failed");
},
[&](Error error) {
EXPECT(pthread_equal(origin_thread_id, pthread_self()));
EXPECT_EQ(error.string_literal(), "on_complete failed"sv);
EXPECT_EQ(stage.load(AK::MemoryOrder::memory_order_relaxed), 1);
on_error_count.fetch_add(1, AK::MemoryOrder::memory_order_relaxed);
stage.store(2, AK::MemoryOrder::memory_order_relaxed);
loop.quit(0);
});
loop.exec();
EXPECT(action_thread_id.has_value());
EXPECT(!pthread_equal(action_thread_id.value(), origin_thread_id));
EXPECT_EQ(on_complete_count.load(AK::MemoryOrder::memory_order_relaxed), 1);
EXPECT_EQ(on_error_count.load(AK::MemoryOrder::memory_order_relaxed), 1);
EXPECT_EQ(stage.load(AK::MemoryOrder::memory_order_relaxed), 2);
(void)background_action;
}
TEST_CASE(background_action_cancel_suppresses_on_error_and_on_complete)
{
Core::EventLoop loop;
@@ -136,9 +90,8 @@ TEST_CASE(background_action_cancel_suppresses_on_error_and_on_complete)
finished.store(true, AK::MemoryOrder::memory_order_relaxed);
return Error::from_string_literal("error after cancel");
},
[&](int) -> ErrorOr<void> {
[&](int) {
on_complete_count.fetch_add(1, AK::MemoryOrder::memory_order_relaxed);
return {};
},
[&](Error) {
on_error_count.fetch_add(1, AK::MemoryOrder::memory_order_relaxed);