diff --git a/rust/bridge/shared/types/src/net/chat.rs b/rust/bridge/shared/types/src/net/chat.rs index 17b85ff32..57e9e7fdb 100644 --- a/rust/bridge/shared/types/src/net/chat.rs +++ b/rust/bridge/shared/types/src/net/chat.rs @@ -445,6 +445,7 @@ async fn establish_chat_connection( initial_request_id: 0, }, auth, + auth_type, ) .inspect(|r| match r { Ok(_) => log::info!("successfully connected {auth_type} chat"), diff --git a/rust/net/examples/https_proxy.rs b/rust/net/examples/https_proxy.rs index d312757ec..7dbf64c7d 100644 --- a/rust/net/examples/https_proxy.rs +++ b/rust/net/examples/https_proxy.rs @@ -137,7 +137,7 @@ async fn main() { async move { tokio::time::sleep(START_NEXT_DELAY * i).await; log::info!("connecting via: {route:?}"); - connector.connect(route).await + connector.connect(route, "main".into()).await } })); let mut connection = connect_attempts diff --git a/rust/net/examples/socks_proxy.rs b/rust/net/examples/socks_proxy.rs index 636e0d495..021a34ead 100644 --- a/rust/net/examples/socks_proxy.rs +++ b/rust/net/examples/socks_proxy.rs @@ -134,7 +134,7 @@ async fn main() { async move { tokio::time::sleep(START_NEXT_DELAY * i).await; log::info!("connecting via: {route:?}"); - connector.connect(route).await + connector.connect(route, "main".into()).await } })); // needed to scope the borrow of `connector` diff --git a/rust/net/infra/src/route.rs b/rust/net/infra/src/route.rs index c731190b7..895617c1e 100644 --- a/rust/net/infra/src/route.rs +++ b/rust/net/infra/src/route.rs @@ -196,6 +196,7 @@ pub async fn connect( ordered_routes: impl Iterator, resolver: &impl Resolver, connector: C, + log_tag: Arc, mut on_error: impl FnMut(C::Error) -> ControlFlow, ) -> ( Result>, @@ -272,7 +273,7 @@ where Event::NextRouteAvailable(Some(route)) => { connects_in_progress.push(async { let started = Instant::now(); - let result = connector.connect(route.clone()).await; + let result = connector.connect(route.clone(), log_tag.clone()).await; (route, result, started) }); poll_schedule_for_next = false; @@ -755,6 +756,7 @@ mod test { &self, (): (), route: R, + _log_tag: Arc, ) -> impl Future> + Send { let (sender, receiver) = oneshot::channel(); self.outgoing @@ -788,6 +790,7 @@ mod test { .map(|(h, _addr)| FakeRoute(UnresolvedHost::from(Arc::from(*h)))), &resolver, connector, + "test".into(), |_err: FakeConnectError| ControlFlow::::Continue(()), ) .await @@ -887,6 +890,7 @@ mod test { .map(|(h, _addr)| FakeRoute(UnresolvedHost::from(Arc::from(*h)))), &resolver, connector, + "test".into(), |_err: FakeConnectError| ControlFlow::::Continue(()), ) .await; diff --git a/rust/net/infra/src/route/connect.rs b/rust/net/infra/src/route/connect.rs index ec2891475..5b09472f8 100644 --- a/rust/net/infra/src/route/connect.rs +++ b/rust/net/infra/src/route/connect.rs @@ -7,6 +7,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::net::IpAddr; +use std::sync::Arc; use derive_where::derive_where; use futures_util::TryFutureExt; @@ -38,6 +39,7 @@ pub trait Connector { &self, over: Inner, route: R, + log_tag: Arc, ) -> impl Future> + Send; } @@ -46,8 +48,9 @@ pub trait ConnectorExt: Connector { fn connect( &self, route: R, + log_tag: Arc, ) -> impl Future> + Send { - self.connect_over((), route) + self.connect_over((), route, log_tag) } } impl> ConnectorExt for C {} @@ -139,6 +142,7 @@ where &self, over: Inner, route: WebSocketRoute>, + log_tag: Arc, ) -> impl Future> + Send { let Self { outer, @@ -155,11 +159,11 @@ where } = route; async move { let inner = inner - .connect_over(over, tls_route) + .connect_over(over, tls_route, log_tag.clone()) .await .map_err(Into::into)?; outer - .connect_over(inner, (ws_fragment, http_fragment)) + .connect_over(inner, (ws_fragment, http_fragment), log_tag) .await .map_err(Into::into) } @@ -184,6 +188,7 @@ where &self, over: Inner, route: TlsRoute, + log_tag: Arc, ) -> impl Future> + Send { let Self { outer, @@ -196,11 +201,11 @@ where } = route; async move { let inner = inner - .connect_over(over, tcp_route) + .connect_over(over, tcp_route, log_tag.clone()) .await .map_err(Into::into)?; outer - .connect_over(inner, tls_fragment) + .connect_over(inner, tls_fragment, log_tag) .await .map_err(Into::into) } @@ -228,17 +233,18 @@ where &self, over: Inner, route: DirectOrProxyRoute, + log_tag: Arc, ) -> impl Future> + Send { match route { DirectOrProxyRoute::Direct(d) => Either::Left( self.direct - .connect_over(over, d) + .connect_over(over, d, log_tag) .map_ok(Either::Left) .map_err(Into::into), ), DirectOrProxyRoute::Proxy(p) => Either::Right( self.proxy - .connect_over(over, p) + .connect_over(over, p, log_tag) .map_ok(Either::Right) .map_err(Into::into), ), @@ -255,8 +261,9 @@ impl, R, Inner> Connector for &C { &self, over: Inner, route: R, + log_tag: Arc, ) -> impl Future> + Send { - (*self).connect_over(over, route) + (*self).connect_over(over, route, log_tag) } } @@ -278,7 +285,7 @@ pub mod testutils { impl Connector for ConnectFn where - F: Fn(Inner, R) -> Fut, + F: Fn(Inner, R, Arc) -> Fut, Fut: Future> + Send, { type Connection = C; @@ -289,8 +296,9 @@ pub mod testutils { &self, over: Inner, route: R, + log_tag: Arc, ) -> impl Future> + Send { - self.0(over, route) + self.0(over, route, log_tag) } } } diff --git a/rust/net/infra/src/route/connect/throttle.rs b/rust/net/infra/src/route/connect/throttle.rs index 499b026f8..08c2e6c38 100644 --- a/rust/net/infra/src/route/connect/throttle.rs +++ b/rust/net/infra/src/route/connect/throttle.rs @@ -61,6 +61,7 @@ where &self, over: Inner, route: R, + log_tag: Arc, ) -> impl Future> + Send { let Self { inner, permits } = self; async move { @@ -68,7 +69,7 @@ where .acquire_owned() .await .expect("semaphore not closed"); - let connection = inner.connect_over(over, route).await?; + let connection = inner.connect_over(over, route, log_tag).await?; Ok(ThrottledConnection(connection, permit)) } } diff --git a/rust/net/infra/src/route/describe.rs b/rust/net/infra/src/route/describe.rs index 37d48fe9d..472e50b5e 100644 --- a/rust/net/infra/src/route/describe.rs +++ b/rust/net/infra/src/route/describe.rs @@ -113,9 +113,10 @@ impl, D: Send> &self, over: Inner, route: WithLoggableDescription, + log_tag: Arc, ) -> impl Future> + Send { self.0 - .connect_over(over, route.route) + .connect_over(over, route.route, log_tag) .map_ok(|c| (c, route.description)) } } diff --git a/rust/net/infra/src/tcp_ssl.rs b/rust/net/infra/src/tcp_ssl.rs index e21fe3370..863281552 100644 --- a/rust/net/infra/src/tcp_ssl.rs +++ b/rust/net/infra/src/tcp_ssl.rs @@ -116,15 +116,17 @@ impl TransportConnector for DirectConnector { connection_params: &TransportConnectionParams, alpn: Alpn, ) -> Result, TransportConnectError> { + let log_tag: Arc = "DirectConnector".into(); let StreamAndInfo(tcp_stream, remote_address) = connect_tcp( &self.dns_resolver, RouteType::Direct, connection_params.tcp_host.as_deref(), connection_params.port, + log_tag.clone(), ) .await?; - let ssl_stream = connect_tls(tcp_stream, connection_params, alpn).await?; + let ssl_stream = connect_tls(tcp_stream, connection_params, alpn, log_tag).await?; Ok(StreamAndInfo(ssl_stream, remote_address)) } @@ -150,6 +152,7 @@ impl Connector, ()> for StatelessDirect { &self, (): (), route: TcpRoute, + _log_tag: Arc, ) -> impl Future> { let TcpRoute { address, port } = route; @@ -170,6 +173,7 @@ where &self, inner: Inner, fragment: TlsRouteFragment, + _log_tag: Arc, ) -> impl Future> + Send { let TlsRouteFragment { root_certs, @@ -239,6 +243,7 @@ async fn connect_tls( transport: S, connection_params: &TransportConnectionParams, alpn: Alpn, + log_tag: Arc, ) -> Result, TransportConnectError> { let route = TlsRouteFragment { root_certs: connection_params.certs.clone(), @@ -246,7 +251,9 @@ async fn connect_tls( alpn: Some(alpn), }; - StatelessDirect.connect_over(transport, route).await + StatelessDirect + .connect_over(transport, route, log_tag) + .await } async fn connect_tcp( @@ -254,6 +261,7 @@ async fn connect_tcp( route_type: RouteType, host: Host<&str>, port: NonZeroU16, + log_tag: Arc, ) -> Result, TransportConnectError> { let dns_lookup = match host { Host::Ip(ip) => { @@ -291,13 +299,14 @@ async fn connect_tcp( let staggered_futures = dns_lookup.into_iter().enumerate().map(|(idx, ip)| { let delay = TCP_CONNECTION_ATTEMPT_DELAY * idx.try_into().unwrap(); let connector = &connector; + let log_tag = log_tag.clone(); async move { if !delay.is_zero() { tokio::time::sleep(delay).await; } let route = TcpRoute { address: ip, port }; connector - .connect(route) + .connect(route, log_tag) .inspect_err(|e| { log::debug!("failed to connect to IP [{ip}] with an error: {e:?}"); }) diff --git a/rust/net/infra/src/tcp_ssl/proxy.rs b/rust/net/infra/src/tcp_ssl/proxy.rs index d42faf351..f6dbca48d 100644 --- a/rust/net/infra/src/tcp_ssl/proxy.rs +++ b/rust/net/infra/src/tcp_ssl/proxy.rs @@ -4,6 +4,7 @@ // use std::net::IpAddr; +use std::sync::Arc; use futures_util::TryFutureExt; use tokio::net::TcpStream; @@ -33,6 +34,7 @@ impl Connector, ()> for StatelessProxied { &self, (): (), route: ConnectionProxyRoute, + log_tag: Arc, ) -> Result { match route { ConnectionProxyRoute::Tls { proxy } => { @@ -43,21 +45,25 @@ impl Connector, ()> for StatelessProxied { let connector = super::StatelessDirect; - let tcp = connector.connect(inner).await?; + let tcp = connector.connect(inner, log_tag.clone()).await?; connector - .connect_over(tcp, tls_fragment) + .connect_over(tcp, tls_fragment, log_tag) .await .map(Into::into) } ConnectionProxyRoute::Tcp { proxy } => { let connector = super::StatelessDirect; - match connector.connect(proxy).await { + match connector.connect(proxy, log_tag).await { Ok(connection) => Ok(connection.into()), Err(_io_error) => Err(TransportConnectError::TcpConnectionFailed), } } - ConnectionProxyRoute::Socks(route) => self.connect(route).map_ok(Into::into).await, - ConnectionProxyRoute::Https(route) => self.connect(route).map_ok(Into::into).await, + ConnectionProxyRoute::Socks(route) => { + self.connect(route, log_tag).map_ok(Into::into).await + } + ConnectionProxyRoute::Https(route) => { + self.connect(route, log_tag).map_ok(Into::into).await + } } } } diff --git a/rust/net/infra/src/tcp_ssl/proxy/https.rs b/rust/net/infra/src/tcp_ssl/proxy/https.rs index 8941288bd..56314f5ad 100644 --- a/rust/net/infra/src/tcp_ssl/proxy/https.rs +++ b/rust/net/infra/src/tcp_ssl/proxy/https.rs @@ -7,6 +7,7 @@ use std::fmt::Display; use std::future::Future; use std::net::IpAddr; use std::num::NonZeroU16; +use std::sync::Arc; use bytes::Bytes; use futures_util::TryFutureExt as _; @@ -57,6 +58,7 @@ impl Connector, ()> for super::StatelessProxied { &self, (): (), route: HttpsProxyRoute, + log_tag: Arc, ) -> impl Future> + Send { let HttpsProxyRoute { fragment, inner } = route; async move { @@ -64,8 +66,16 @@ impl Connector, ()> for super::StatelessProxied { let tcp_connector = StatelessTcpConnector::default(); let inner = inner .map_either( - |tls| tls_connector.connect(tls).map_ok(Either::Left), - |tcp| tcp_connector.connect(tcp).map_ok(Either::Right), + |tls| { + tls_connector + .connect(tls, log_tag.clone()) + .map_ok(Either::Left) + }, + |tcp| { + tcp_connector + .connect(tcp, log_tag.clone()) + .map_ok(Either::Right) + }, ) .await?; let info = inner.transport_info(); @@ -93,7 +103,7 @@ impl Connector, ()> for super::StatelessProxied { info, }), Err(e) => { - log::info!("failed to connect via HTTP proxy: {e}"); + log::info!("[{log_tag}] failed to connect via HTTP proxy: {e}"); Err(TransportConnectError::ProxyProtocol) } } @@ -411,7 +421,7 @@ mod test { }; let mut client_stream = super::super::StatelessProxied - .connect(route) + .connect(route, "test".into()) .await .expect("can connect"); @@ -491,7 +501,9 @@ mod test { inner: Either::Right(route_to_proxy), }; - let connect_result = super::super::StatelessProxied.connect(route).await; + let connect_result = super::super::StatelessProxied + .connect(route, "test".into()) + .await; assert_matches!(connect_result, Err(TransportConnectError::ProxyProtocol)); } diff --git a/rust/net/infra/src/tcp_ssl/proxy/socks.rs b/rust/net/infra/src/tcp_ssl/proxy/socks.rs index c148c8677..2b1b2c305 100644 --- a/rust/net/infra/src/tcp_ssl/proxy/socks.rs +++ b/rust/net/infra/src/tcp_ssl/proxy/socks.rs @@ -79,11 +79,14 @@ impl TransportConnector for SocksConnector { log::info!("connecting to {which_protocol:?} proxy over TCP"); log::debug!("connecting to {which_protocol:?} proxy at {proxy_host}:{proxy_port} over TCP"); + let log_tag: Arc = "SocksConnector".into(); + let StreamAndInfo(tcp_stream, remote_address) = crate::tcp_ssl::connect_tcp( dns_resolver, RouteType::SocksProxy, proxy_host.as_deref(), *proxy_port, + log_tag.clone(), ) .await?; let is_ipv6 = tcp_stream @@ -135,7 +138,8 @@ impl TransportConnector for SocksConnector { })?; log::debug!("connecting TLS through proxy"); - let stream = crate::tcp_ssl::connect_tls(socks_stream, connection_params, alpn).await?; + let stream = + crate::tcp_ssl::connect_tls(socks_stream, connection_params, alpn, log_tag).await?; log::info!("connection through SOCKS proxy established successfully"); Ok(StreamAndInfo( @@ -158,6 +162,7 @@ impl Connector, ()> for super::StatelessProxied { &self, (): (), route: SocksRoute, + log_tag: Arc, ) -> impl Future> + Send { let SocksRoute { protocol, @@ -167,18 +172,18 @@ impl Connector, ()> for super::StatelessProxied { } = route; async move { - log::info!("establishing connection to host over SOCKS proxy"); + log::info!("[{log_tag}] establishing connection to host over SOCKS proxy"); log::debug!( - "establishing connection to {:?} over SOCKS proxy", + "[{log_tag}] establishing connection to {:?} over SOCKS proxy", target_addr ); - log::info!("connecting to {protocol:?} proxy over TCP"); + log::info!("[{log_tag}] connecting to {protocol:?} proxy over TCP"); let TcpRoute { address: proxy_host, port: proxy_port, } = &proxy; - log::debug!("connecting to {protocol:?} proxy at {proxy_host}:{proxy_port} over TCP"); + log::debug!("[{log_tag}] connecting to {protocol:?} proxy at {proxy_host}:{proxy_port} over TCP"); let target = match &target_addr { crate::route::ProxyTarget::ResolvedLocally(ip) => { @@ -189,9 +194,11 @@ impl Connector, ()> for super::StatelessProxied { } }; - let stream = super::super::StatelessDirect.connect(proxy).await?; - log::info!("performing proxy handshake"); - log::debug!("performing proxy handshake with {target:?}"); + let stream = super::super::StatelessDirect + .connect(proxy, log_tag.clone()) + .await?; + log::info!("[{log_tag}] performing proxy handshake"); + log::debug!("[{log_tag}] performing proxy handshake with {target:?}"); protocol .connect_to_proxy(stream, target) .await diff --git a/rust/net/infra/src/tcp_ssl/proxy/tls.rs b/rust/net/infra/src/tcp_ssl/proxy/tls.rs index fb3ae7cda..151beef02 100644 --- a/rust/net/infra/src/tcp_ssl/proxy/tls.rs +++ b/rust/net/infra/src/tcp_ssl/proxy/tls.rs @@ -55,11 +55,13 @@ impl TransportConnector for TlsProxyConnector { connection_params: &TransportConnectionParams, alpn: Alpn, ) -> Result, TransportConnectError> { + let log_tag: Arc = "TlsProxyConnector".into(); let StreamAndInfo(tcp_stream, remote_address) = connect_tcp( &self.dns_resolver, RouteType::TlsProxy, self.proxy_host.as_deref(), self.proxy_port, + log_tag.clone(), ) .await?; @@ -92,7 +94,7 @@ impl TransportConnector for TlsProxyConnector { } }; - let tls_stream = connect_tls(inner_stream, connection_params, alpn).await?; + let tls_stream = connect_tls(inner_stream, connection_params, alpn, log_tag).await?; Ok(StreamAndInfo( tls_stream, diff --git a/rust/net/infra/src/ws.rs b/rust/net/infra/src/ws.rs index fef12c4e3..9e3ac1451 100644 --- a/rust/net/infra/src/ws.rs +++ b/rust/net/infra/src/ws.rs @@ -166,6 +166,7 @@ where &self, inner: Inner, route: (WebSocketRouteFragment, HttpRouteFragment), + _log_tag: Arc, ) -> impl std::future::Future> + Send { let ( WebSocketRouteFragment { @@ -638,6 +639,7 @@ pub mod testutil { front_name: None, }, ), + "test".into(), ); let server_future = tokio_tungstenite::accept_async(server); let (client_res, server_res) = tokio::join!(client_future, server_future); diff --git a/rust/net/infra/src/ws2.rs b/rust/net/infra/src/ws2.rs index 1aa673a33..054719e2e 100644 --- a/rust/net/infra/src/ws2.rs +++ b/rust/net/infra/src/ws2.rs @@ -6,6 +6,7 @@ use std::fmt::Display; use std::io::Error as IoError; use std::pin::Pin; +use std::sync::Arc; use futures_util::{Sink, SinkExt as _, Stream, StreamExt as _}; use pin_project::pin_project; @@ -100,6 +101,9 @@ pub struct Connection { /// Configuration for this websocket client's behavior. config: Config, + + /// A tag to include in log lines, to disambiguate multiple websockets. + log_tag: Arc, } /// Fatal error that causes a connection to be closed. @@ -201,7 +205,7 @@ impl Connection where R: Stream, { - pub fn new(stream: S, outgoing_rx: R, config: Config) -> Self { + pub fn new(stream: S, outgoing_rx: R, config: Config, log_tag: Arc) -> Self { Self { stream, outgoing_rx, @@ -211,6 +215,7 @@ where last_heard_from_server: None, last_sent_to_server: None, last_sent_ping_to_server: None, + log_tag, } } @@ -254,6 +259,7 @@ where last_sent_to_server, last_sent_ping_to_server, last_heard_from_server, + log_tag, } = self.project(); // For the first call this function, assume we just heard from & sent to @@ -386,7 +392,9 @@ where } Message::Close(close) => { let code = close.as_ref().map(|c| c.code); - log::info!("received a close frame from the server with code {code:?}",); + log::info!( + "[{log_tag}] received a close frame from the server with code {code:?}", + ); match close { None | Some(CloseFrame { @@ -651,6 +659,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -686,6 +695,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -735,6 +745,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -802,6 +813,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -824,6 +836,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -856,6 +869,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -887,6 +901,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -929,6 +944,7 @@ mod test { remote_idle_disconnect_timeout: REMOTE_DISCONNECT_TIMEOUT, local_idle_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -964,6 +980,7 @@ mod test { remote_idle_ping_timeout: REMOTE_IDLE_TIMEOUT, remote_idle_disconnect_timeout: REMOTE_DISCONNECT_TIMEOUT, }, + "test".into(), ); pin_mut!(connection); diff --git a/rust/net/infra/src/ws2/attested.rs b/rust/net/infra/src/ws2/attested.rs index d1967f43f..87989581c 100644 --- a/rust/net/infra/src/ws2/attested.rs +++ b/rust/net/infra/src/ws2/attested.rs @@ -4,6 +4,7 @@ // use std::borrow::Cow; +use std::sync::Arc; use attest::client_connection::ClientConnection; use tokio::sync::{mpsc, oneshot}; @@ -71,9 +72,10 @@ impl AttestedConnection { pub async fn connect( ws: WebSocketStream, ws_config: crate::ws2::Config, + log_tag: Arc, new_handshake: impl FnOnce(&[u8]) -> attest::enclave::Result, ) -> Result { - let mut ws_client = WsClient::new(ws, ws_config); + let mut ws_client = WsClient::new(ws, ws_config, log_tag); let client_connection = authenticate(&mut ws_client, new_handshake).await?; @@ -163,11 +165,18 @@ impl WsClient { fn new( ws: WebSocketStream, ws_config: crate::ws2::Config, + log_tag: Arc, ) -> Self { let (outgoing_tx, outgoing_rx) = mpsc::channel(WS_MESSAGE_BUFFER); let (incoming_tx, incoming_rx) = mpsc::channel(WS_MESSAGE_BUFFER); - let _task = tokio::spawn(spawned_task_body(ws, outgoing_rx, incoming_tx, ws_config)); + let _task = tokio::spawn(spawned_task_body( + ws, + outgoing_rx, + incoming_tx, + ws_config, + log_tag, + )); Self { outgoing_tx, @@ -240,9 +249,14 @@ async fn spawned_task_body( outgoing_rx: mpsc::Receiver<(TextOrBinary, oneshot::Sender>)>, incoming_tx: mpsc::Sender, ReceiveError>>, config: crate::ws2::Config, + log_tag: Arc, ) -> Result<(), TaskExitError> { - let mut connection = - crate::ws2::Connection::new(stream, ReceiverStream::new(outgoing_rx), config); + let mut connection = crate::ws2::Connection::new( + stream, + ReceiverStream::new(outgoing_rx), + config, + log_tag.clone(), + ); let mut connection = std::pin::pin!(connection); loop { @@ -250,7 +264,9 @@ async fn spawned_task_body( Outcome::Continue(event) => match event { MessageEvent::SentMessage(response_sender) => { if response_sender.send(Ok(())).is_err() { - log::debug!("failed to signal send because the sender was dropped"); + log::debug!( + "[{log_tag}] failed to signal send because the sender was dropped" + ); } } MessageEvent::SendFailed(response_sender, tungstenite_send_error) => { @@ -259,7 +275,7 @@ async fn spawned_task_body( .send(Err(tungstenite_send_error.into())) .is_err() { - log::debug!("failed to signal send error because the sender was dropped"); + log::debug!("[{log_tag}] failed to signal send error because the sender was dropped"); } return Err(task_err); } @@ -270,7 +286,7 @@ async fn spawned_task_body( .is_err() { log::debug!( - "failed to forward received message because the receiver was dropped" + "[{log_tag}] failed to forward received message because the receiver was dropped" ); // The receiver has been dropped, so we should exit. return Ok(()); @@ -284,7 +300,9 @@ async fn spawned_task_body( .await .is_err() { - log::debug!("failed to send close event because the receiver was dropped") + log::debug!( + "[{log_tag}] failed to send close event because the receiver was dropped" + ) } return Ok(()); } @@ -303,7 +321,7 @@ async fn spawned_task_body( .await .is_err() { - log::debug!("failed to send abnormal close event because the receiver was dropped"); + log::debug!("[{log_tag}] failed to send abnormal close event because the receiver was dropped"); } return Err(TaskExitError::AbnormalServerClose { code }); } @@ -329,7 +347,9 @@ async fn spawned_task_body( ), }; if incoming_tx.send(Err(tx_error)).await.is_err() { - log::debug!("failed to signal send error because the receiver was dropped") + log::debug!( + "[{log_tag}] failed to signal send error because the receiver was dropped" + ) } return Err(exit_error); } @@ -650,13 +670,17 @@ mod test { attest::sgx_session::testutil::private_key(), )); - let mut connection = - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { + let mut connection = AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { assert_eq!(fake_attestation, FAKE_ATTESTATION); attest::sgx_session::testutil::handshake_from_tests_data() - }) - .await - .unwrap(); + }, + ) + .await + .unwrap(); connection.send(Vec::from(ECHO_BYTES)).await.unwrap(); let response: Vec = connection.receive().await.unwrap().unwrap_next(); @@ -681,7 +705,8 @@ mod test { } assert_matches!( - AttestedConnection::connect(client, FAKE_WS_CONFIG, fail_to_handshake).await, + AttestedConnection::connect(client, FAKE_WS_CONFIG, "test".into(), fail_to_handshake) + .await, Err(_) ); } @@ -695,13 +720,17 @@ mod test { attest::sgx_session::testutil::private_key(), )); - let mut connection = - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { + let mut connection = AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { assert_eq!(fake_attestation, FAKE_ATTESTATION); attest::sgx_session::testutil::handshake_from_tests_data() - }) - .await - .unwrap(); + }, + ) + .await + .unwrap(); connection.send(Vec::from(ECHO_BYTES)).await.unwrap(); // Decoding a vec as a 32-bit float shouldn't work. diff --git a/rust/net/src/cdsi.rs b/rust/net/src/cdsi.rs index cc597fbd2..bf1fc1480 100644 --- a/rust/net/src/cdsi.rs +++ b/rust/net/src/cdsi.rs @@ -337,7 +337,7 @@ impl CdsiConnection { { log::info!("connecting to CDSI endpoint"); let (connection, _info) = endpoint - .connect(auth, transport_connector) + .connect(auth, transport_connector, "cdsi".into()) .inspect_err(|e| { log::warn!("CDSI connection failed: {e}"); }) @@ -363,6 +363,7 @@ impl CdsiConnection { resolver, confirmation_header_name, ws_config, + "cdsi".into(), move |attestation_message| Cdsi::new_handshake(params, attestation_message), ) .await?; @@ -736,10 +737,15 @@ mod test { )); let cdsi_connection = CdsiConnection( - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { - assert_eq!(fake_attestation, FAKE_ATTESTATION); - attest::sgx_session::testutil::handshake_from_tests_data() - }) + AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { + assert_eq!(fake_attestation, FAKE_ATTESTATION); + attest::sgx_session::testutil::handshake_from_tests_data() + }, + ) .await .expect("handshake failed"), ); @@ -790,10 +796,15 @@ mod test { )); let cdsi_connection = CdsiConnection( - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { - assert_eq!(fake_attestation, FAKE_ATTESTATION); - attest::sgx_session::testutil::handshake_from_tests_data() - }) + AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { + assert_eq!(fake_attestation, FAKE_ATTESTATION); + attest::sgx_session::testutil::handshake_from_tests_data() + }, + ) .await .expect("handshake failed"), ); @@ -836,10 +847,15 @@ mod test { )); let cdsi_connection = CdsiConnection( - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { - assert_eq!(fake_attestation, FAKE_ATTESTATION); - attest::sgx_session::testutil::handshake_from_tests_data() - }) + AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { + assert_eq!(fake_attestation, FAKE_ATTESTATION); + attest::sgx_session::testutil::handshake_from_tests_data() + }, + ) .await .expect("handshake failed"), ); @@ -912,10 +928,15 @@ mod test { )); let cdsi_connection = CdsiConnection( - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { - assert_eq!(fake_attestation, FAKE_ATTESTATION); - attest::sgx_session::testutil::handshake_from_tests_data() - }) + AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { + assert_eq!(fake_attestation, FAKE_ATTESTATION); + attest::sgx_session::testutil::handshake_from_tests_data() + }, + ) .await .expect("handshake failed"), ); diff --git a/rust/net/src/chat.rs b/rust/net/src/chat.rs index a6a9f2708..464e42055 100644 --- a/rust/net/src/chat.rs +++ b/rust/net/src/chat.rs @@ -535,6 +535,7 @@ pub struct PendingChatConnection { connection: ChatWebSocketConnection, ws_config: ws2::Config, connection_info: ConnectionInfo, + log_tag: Arc, } pub struct AuthenticatedChatHeaders { @@ -553,6 +554,7 @@ impl ChatConnection { user_agent: &UserAgent, ws_config: self::ws2::Config, auth: Option, + log_tag: &str, ) -> Result { let headers = auth .into_iter() @@ -573,6 +575,7 @@ impl ChatConnection { fragment: ws_fragment.clone(), }); + let log_tag: Arc = log_tag.into(); let result = ConnectState::connect_ws( connect, ws_routes, @@ -585,8 +588,9 @@ impl ChatConnection { ThrottlingConnector::new(crate::infra::ws::Stateless, 1), resolver, confirmation_header_name.as_ref(), + log_tag.clone(), |error| { - log::debug!("connection attempt failed with {error}"); + log::debug!("[{log_tag}] connection attempt failed with {error}"); match error.classify() { ErrorClass::Intermittent => ControlFlow::Continue(()), ErrorClass::Fatal | ErrorClass::RetryAt(_) => { @@ -608,6 +612,7 @@ impl ChatConnection { connection: ws_connection, connection_info, ws_config, + log_tag, } }), Err(e) => { @@ -634,9 +639,16 @@ impl ChatConnection { connection, ws_config, connection_info, + log_tag, } = pending; Self { - inner: crate::chat::ws2::Chat::new(tokio_runtime, connection, ws_config, listener), + inner: crate::chat::ws2::Chat::new( + tokio_runtime, + connection, + ws_config, + log_tag, + listener, + ), connection_info, } } @@ -668,7 +680,10 @@ impl PendingChatConnection { pub async fn disconnect(&mut self) { if let Err(error) = self.connection.close().await { - log::error!("pending chat connection disconnect failed with {error}"); + log::error!( + "[{}] pending chat connection disconnect failed with {error}", + &self.log_tag + ); } } } @@ -754,6 +769,7 @@ pub mod test_support { &user_agent, ws_config, None, + "test", ) .await?; diff --git a/rust/net/src/chat/ws2.rs b/rust/net/src/chat/ws2.rs index 7aa961ea4..59b85d31d 100644 --- a/rust/net/src/chat/ws2.rs +++ b/rust/net/src/chat/ws2.rs @@ -9,6 +9,7 @@ use std::future::Future; use std::io::ErrorKind as IoErrorKind; use std::panic::AssertUnwindSafe; use std::pin::Pin; +use std::sync::Arc; use futures_util::{pin_mut, Sink, Stream, StreamExt as _}; use http::uri::PathAndQuery; @@ -147,6 +148,7 @@ impl Chat { tokio_runtime: tokio::runtime::Handle, transport: T, config: Config, + log_tag: Arc, listener: EventListener, ) -> Self where @@ -173,6 +175,7 @@ impl Chat { }, ), initial_request_id, + log_tag, listener, tokio_runtime, ) @@ -274,6 +277,7 @@ impl Chat { fn new_inner( into_inner_connection: impl IntoInnerConnection, initial_request_id: u64, + log_tag: Arc, listener: EventListener, tokio_runtime: tokio::runtime::Handle, ) -> Self { @@ -282,6 +286,7 @@ impl Chat { let requests_in_flight = InFlightRequests { outstanding_reqs: Default::default(), + log_tag: log_tag.clone(), }; let mut request_id = initial_request_id; @@ -294,15 +299,21 @@ impl Chat { (message, meta) }); - let response_rx = UnboundedReceiverStream::new(response_rx).map(|response| { + let log_tag_for_responses = log_tag.clone(); + let response_rx = UnboundedReceiverStream::new(response_rx).map(move |response| { let OutgoingResponse { id, status } = response; - log::debug!("sending response for incoming request {}", id); + log::debug!( + "[{log_tag_for_responses}] sending response for incoming request {}", + id + ); let message = response_for_status(id, status); (message, OutgoingMeta::ResponseToIncoming) }); - let inner_connection = into_inner_connection - .into_inner_connection(tokio_stream::StreamExt::merge(request_rx, response_rx)); + let inner_connection = into_inner_connection.into_inner_connection( + tokio_stream::StreamExt::merge(request_rx, response_rx), + log_tag.clone(), + ); let connection = ConnectionImpl { inner: inner_connection, @@ -311,6 +322,7 @@ impl Chat { let task = tokio_runtime.spawn(spawned_task_body( connection, + log_tag, listener, response_tx.downgrade(), )); @@ -364,6 +376,7 @@ enum TaskState { struct InFlightRequests { outstanding_reqs: HashMap>>, + log_tag: Arc, } /// Why the task finished unexpectedly. @@ -576,6 +589,7 @@ impl ListenerState { /// [`Outcome::Finished`]. async fn spawned_task_body( connection: ConnectionImpl, + log_tag: Arc, listener: EventListener, weak_response_tx: mpsc::WeakUnboundedSender, ) -> Result { @@ -586,7 +600,7 @@ async fn spawned_task_body( // In case the task panics, make sure the callback at least knows about the // disconnection. let mut listener_state = scopeguard::guard_on_unwind(listener_state, |mut listener_state| { - log::error!("chat handler task exited abnormally"); + log::error!("[{log_tag}] chat handler task exited abnormally"); listener_state.send_event_blocking(ListenerEvent::Finished(Err(FinishError::Unknown))); }); let result = loop { @@ -598,7 +612,7 @@ async fn spawned_task_body( Outcome::Finished(result) => break result, }; - log::debug!("received incoming request from server: {id}"); + log::debug!("[{log_tag}] received incoming request from server: {id}"); let event = ListenerEvent::ReceivedMessage( incoming_request, @@ -610,8 +624,8 @@ async fn spawned_task_body( listener_state.send_event(&tokio_rt, event).await; }; match &result { - Ok(reason) => log::info!("chat handler task finishing after {reason}"), - Err(err) => log::info!("chat handler task is stopping due to {err}"), + Ok(reason) => log::info!("[{log_tag}] chat handler task finishing after {reason}"), + Err(err) => log::info!("[{log_tag}] chat handler task is stopping due to {err}"), } let task_result = result.as_ref().map_err(Into::into).copied(); @@ -746,7 +760,10 @@ impl InFlightRequests { id: RequestId, response_sender: oneshot::Sender>, ) { - let Self { outstanding_reqs } = self; + let Self { + outstanding_reqs, + log_tag: _, + } = self; let prev = outstanding_reqs.insert(id, response_sender); assert!( prev.is_none(), @@ -756,11 +773,17 @@ impl InFlightRequests { } fn finish_send(&mut self, id: RequestId, result: Result) { - let Self { outstanding_reqs } = self; + let Self { + outstanding_reqs, + log_tag, + } = self; if let Some(sender) = outstanding_reqs.remove(&id) { let _ignore_send_error = sender.send(result); } else { - log::error!("tried to send response to nonexistent request {}", id.0); + log::error!( + "[{log_tag}] tried to send response to nonexistent request {}", + id.0 + ); } } } @@ -774,7 +797,11 @@ impl InFlightRequests { /// [`InnerConnection`]. trait IntoInnerConnection { /// Turn `self` and an outgoing stream into an `InnerConnection` impl. - fn into_inner_connection(self, outgoing_stream: R) -> impl InnerConnection + Send + 'static + fn into_inner_connection( + self, + outgoing_stream: R, + log_tag: Arc, + ) -> impl InnerConnection + Send + 'static where R: Stream + Send + 'static; } @@ -786,12 +813,16 @@ where + Send + 'static, { - fn into_inner_connection(self, outgoing_stream: R) -> impl InnerConnection + Send + 'static + fn into_inner_connection( + self, + outgoing_stream: R, + log_tag: Arc, + ) -> impl InnerConnection + Send + 'static where R: Stream + Send + 'static, { let (stream, config) = self; - crate::infra::ws2::Connection::new(stream, outgoing_stream, config) + crate::infra::ws2::Connection::new(stream, outgoing_stream, config, log_tag) } } @@ -841,6 +872,7 @@ impl ConnectionImpl { requests_in_flight: &mut InFlightRequests, event: Outcome, Result>, ) -> Outcome, Result> { + let log_tag = &requests_in_flight.log_tag; match event { Outcome::Finished(Ok(finish)) => return Outcome::Finished(Ok(finish)), Outcome::Finished(Err(err)) => { @@ -875,7 +907,7 @@ impl ConnectionImpl { Err(TaskExitError::SendProtocol(protocol_error.clone())) } }; - log::warn!("shutting down after send failed: {send_error}"); + log::warn!("[{log_tag}] shutting down after send failed: {send_error}"); match meta { OutgoingMeta::SentRequest(_request_id, response_sender) => { // The server isn't going to get our response to an @@ -907,11 +939,11 @@ impl ConnectionImpl { // there's nothing to do here. We could be strict here // and close the connection, or ignore the message and // keep going. We choose the latter. - log::warn!("received invalid message: {e}"); + log::warn!("[{log_tag}] received invalid message: {e}"); } Err(ChatProtocolError::InvalidResponse(id)) => { log::warn!( - "received invalid response for outgoing request {id}", + "[{log_tag}] received invalid response for outgoing request {id}", id = id.0 ); requests_in_flight.finish_send(id, Err(TaskSendError::InvalidResponse)); @@ -920,7 +952,10 @@ impl ConnectionImpl { // incoming requests. } Ok(ChatMessage::Response(id, response)) => { - log::debug!("received response for outgoing request {id}", id = id.0); + log::debug!( + "[{log_tag}] received response for outgoing request {id}", + id = id.0 + ); requests_in_flight.finish_send(id, Ok(response)) } Ok(ChatMessage::Request(id, request_proto)) => { @@ -1219,6 +1254,7 @@ mod test { incoming_events: incoming_events_rx, }, initial_request_id, + "test".into(), listener, tokio::runtime::Handle::current(), ); @@ -1304,6 +1340,7 @@ mod test { fn into_inner_connection( self, outgoing_stream: R, + _log_tag: Arc, ) -> impl InnerConnection + Send + 'static where R: Stream + Send + 'static, diff --git a/rust/net/src/connect_state.rs b/rust/net/src/connect_state.rs index 49190357e..1c7f63f7e 100644 --- a/rust/net/src/connect_state.rs +++ b/rust/net/src/connect_state.rs @@ -5,6 +5,7 @@ use std::default::Default; use std::ops::ControlFlow; +use std::sync::Arc; use http::HeaderName; use itertools::Itertools as _; @@ -78,6 +79,7 @@ where ws_connector: WC, resolver: &DnsResolver, confirmation_header_name: Option<&HeaderName>, + log_tag: Arc, mut on_error: impl FnMut(WebSocketServiceConnectError) -> ControlFlow, ) -> Result<(WC::Connection, RouteInfo), ConnectError> where @@ -94,7 +96,10 @@ where .routes(&connect_read.route_provider_context) .collect_vec(); - log::info!("starting connection attempt with {} routes", routes.len()); + log::info!( + "[{log_tag}] starting connection attempt with {} routes", + routes.len() + ); let route_provider = routes.into_iter().map(ResolveWithSavedDescription); let connector = DescribedRouteConnector(ComposedConnector::new( @@ -110,6 +115,7 @@ where route_provider, resolver, connector, + log_tag.clone(), |error| { let error = WebSocketServiceConnectError::from_websocket_error( error, @@ -128,10 +134,10 @@ where match &result { Ok((_connection, route)) => log::info!( - "connection through {route} succeeded after {:.3?}", + "[{log_tag}] connection through {route} succeeded after {:.3?}", start.elapsed() ), - Err(e) => log::info!("connection failed with {e}"), + Err(e) => log::info!("[{log_tag}] connection failed with {e}"), } this.write().await.attempts_record.apply_outcome_updates( @@ -164,6 +170,7 @@ where resolver: &DnsResolver, confirmation_header_name: Option, ws_config: libsignal_net_infra::ws2::Config, + log_tag: Arc, new_handshake: impl FnOnce(&[u8]) -> Result, ) -> Result<(AttestedConnection, RouteInfo), crate::enclave::Error> where @@ -180,6 +187,7 @@ where crate::infra::ws::Stateless, resolver, confirmation_header_name.as_ref(), + log_tag.clone(), |error| match error.classify() { ErrorClass::Intermittent => ControlFlow::Continue(()), ErrorClass::RetryAt(_) | ErrorClass::Fatal => { @@ -195,7 +203,7 @@ where ConnectError::FatalConnect(e) => e, })?; - let connection = AttestedConnection::connect(ws, ws_config, new_handshake).await?; + let connection = AttestedConnection::connect(ws, ws_config, log_tag, new_handshake).await?; Ok((connection, route_info)) } } @@ -292,7 +300,7 @@ mod test { }, }; - let ws_connector = ConnectFn(|(), route| { + let ws_connector = ConnectFn(|(), route, _log_tag| { let (ws, http) = &route; std::future::ready( if (ws, http) == (&failing_route.fragment, &failing_route.inner.fragment) { @@ -308,7 +316,7 @@ mod test { )])); let fake_transport_connector = - ConnectFn(move |(), _| std::future::ready(Ok::<_, WebSocketConnectError>(()))); + ConnectFn(move |(), _, _| std::future::ready(Ok::<_, WebSocketConnectError>(()))); let state = ConnectState { route_resolver: RouteResolver::default(), @@ -324,6 +332,7 @@ mod test { ws_connector, &resolver, None, + "test".into(), |e| { let e = assert_matches!(e, WebSocketServiceConnectError::Connect(e, _) => e); assert_matches!( diff --git a/rust/net/src/enclave.rs b/rust/net/src/enclave.rs index af7c9a494..212da9d94 100644 --- a/rust/net/src/enclave.rs +++ b/rust/net/src/enclave.rs @@ -4,6 +4,7 @@ // use std::marker::PhantomData; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use attest::svr2::RaftConfig; @@ -311,6 +312,7 @@ impl EnclaveEndpointConnect &self, auth: Auth, transport_connector: T, + log_tag: Arc, ) -> Result<(AttestedConnection, ServiceConnectionInfo), Error> where C: ConnectionManager, @@ -323,6 +325,7 @@ impl EnclaveEndpointConnect &self.endpoint_connection, auth, transport_connector, + log_tag, &move |attestation_message| E::new_handshake(&self.params, attestation_message), ) .await @@ -360,6 +363,7 @@ async fn connect_attested( endpoint_connection: &EndpointConnection, auth: Auth, transport_connector: T, + log_tag: Arc, do_handshake: &(dyn Sync + Fn(&[u8]) -> enclave::Result), ) -> Result<(AttestedConnection, ServiceConnectionInfo), Error> { let connector = WebSocketStreamConnector::new( @@ -387,6 +391,7 @@ async fn connect_attested( let attested = AttestedConnection::connect( websocket, endpoint_connection.config.ws2_config(), + log_tag, do_handshake, ) .await?; @@ -582,6 +587,7 @@ mod test { username: "fdsa".to_string(), }, AlwaysFailingConnector, + "test".into(), ) .await .map(|(connection, _info)| connection) diff --git a/rust/net/src/svr.rs b/rust/net/src/svr.rs index 5638789be..dd6ed9a5c 100644 --- a/rust/net/src/svr.rs +++ b/rust/net/src/svr.rs @@ -46,7 +46,11 @@ where T: TransportConnector, { connection - .connect(auth, transport_connector) + .connect( + auth, + transport_connector, + format!("svr3:{}", std::any::type_name::()).into(), + ) .await .map(|(connection, info)| Self { inner: connection,