mirror of
https://github.com/signalapp/libsignal.git
synced 2026-04-25 17:25:18 +02:00
net: Make a hybrid chat's WS task cancel any shared H2 requests
...rather than relying on an explicit disconnect() call or waiting until Drop (which may happen well after the websocket is closed).
This commit is contained in:
@@ -394,6 +394,7 @@ impl Chat {
|
||||
connection,
|
||||
listener,
|
||||
response_tx.downgrade(),
|
||||
shared_h2_connection.clone(),
|
||||
));
|
||||
let state = TaskState::MaybeStillRunning {
|
||||
request_tx,
|
||||
@@ -418,17 +419,12 @@ impl TaskState {
|
||||
request_tx,
|
||||
response_tx,
|
||||
task,
|
||||
shared_h2_connection,
|
||||
shared_h2_connection: _,
|
||||
} => {
|
||||
// Signal to the task, if it's still running, that it should
|
||||
// quit. Do this by hanging up on it, at which point it will
|
||||
// exit.
|
||||
drop((request_tx, response_tx));
|
||||
// Similarly, cancel any outstanding H2 requests, and let the connection exit
|
||||
// naturally.
|
||||
if let Some(h2) = shared_h2_connection {
|
||||
h2.disconnect_all();
|
||||
}
|
||||
TaskState::SignaledToEnd(task)
|
||||
}
|
||||
state @ (TaskState::SignaledToEnd(_) | TaskState::Finished(_)) => state,
|
||||
@@ -721,6 +717,7 @@ async fn spawned_task_body<
|
||||
connection: ConnectionImpl<I, GCI>,
|
||||
listener: EventListener,
|
||||
weak_response_tx: mpsc::WeakUnboundedSender<OutgoingResponse>,
|
||||
shared_h2_connection: Option<Http2Client<GrpcBody>>,
|
||||
) -> Result<FinishReason, TaskErrorState> {
|
||||
pin_mut!(connection);
|
||||
let tokio_rt = tokio::runtime::Handle::current();
|
||||
@@ -733,6 +730,10 @@ async fn spawned_task_body<
|
||||
log::error!("[{log_tag}] chat handler task exited abnormally");
|
||||
listener_state.send_event_blocking(ListenerEvent::Finished(Err(FinishError::Unknown)));
|
||||
});
|
||||
|
||||
let cancel_h2_connection_on_exit =
|
||||
shared_h2_connection.map(|h2| scopeguard::guard(h2, |h2| h2.disconnect_all()));
|
||||
|
||||
let result = loop {
|
||||
let (id, incoming_request) = match connection.as_mut().handle_one_event().await {
|
||||
Outcome::Continue(None) => continue,
|
||||
@@ -760,7 +761,8 @@ async fn spawned_task_body<
|
||||
let task_result = result.as_ref().map_err(Into::into).copied();
|
||||
|
||||
// The loop is finishing. Make sure to tell the listener after disarming the
|
||||
// scope guard.
|
||||
// scope guard and performing other cleanup.
|
||||
drop(cancel_h2_connection_on_exit);
|
||||
let mut listener = scopeguard::ScopeGuard::into_inner(listener_state);
|
||||
listener
|
||||
.send_event(
|
||||
|
||||
Reference in New Issue
Block a user