From bcebd8ad4af364dd6ccf87a149b4fc71ea0fb264 Mon Sep 17 00:00:00 2001 From: Jordan Rose Date: Tue, 14 Jan 2025 12:54:02 -0800 Subject: [PATCH] net: Thread a "log tag" down into ws2/chat This is mainly important for chat connections, which are often paired auth and unauth, and thus result in two very similar log lines right next to each other. For now the tags are just the constant strings "auth", "unauth", and "cdsi", plus some hardcoded values for the old TransportConnector interfaces (and examples and tests); we may want to refine them in the future. --- rust/bridge/shared/types/src/net/chat.rs | 1 + rust/net/examples/https_proxy.rs | 2 +- rust/net/examples/socks_proxy.rs | 2 +- rust/net/infra/src/route.rs | 6 +- rust/net/infra/src/route/connect.rs | 28 +++++--- rust/net/infra/src/route/connect/throttle.rs | 3 +- rust/net/infra/src/route/describe.rs | 3 +- rust/net/infra/src/tcp_ssl.rs | 15 +++- rust/net/infra/src/tcp_ssl/proxy.rs | 16 +++-- rust/net/infra/src/tcp_ssl/proxy/https.rs | 22 ++++-- rust/net/infra/src/tcp_ssl/proxy/socks.rs | 23 +++--- rust/net/infra/src/tcp_ssl/proxy/tls.rs | 4 +- rust/net/infra/src/ws.rs | 2 + rust/net/infra/src/ws2.rs | 21 +++++- rust/net/infra/src/ws2/attested.rs | 71 +++++++++++++------ rust/net/src/cdsi.rs | 55 ++++++++++----- rust/net/src/chat.rs | 22 +++++- rust/net/src/chat/ws2.rs | 73 +++++++++++++++----- rust/net/src/connect_state.rs | 21 ++++-- rust/net/src/enclave.rs | 6 ++ rust/net/src/svr.rs | 6 +- 21 files changed, 297 insertions(+), 105 deletions(-) diff --git a/rust/bridge/shared/types/src/net/chat.rs b/rust/bridge/shared/types/src/net/chat.rs index 17b85ff32..57e9e7fdb 100644 --- a/rust/bridge/shared/types/src/net/chat.rs +++ b/rust/bridge/shared/types/src/net/chat.rs @@ -445,6 +445,7 @@ async fn establish_chat_connection( initial_request_id: 0, }, auth, + auth_type, ) .inspect(|r| match r { Ok(_) => log::info!("successfully connected {auth_type} chat"), diff --git a/rust/net/examples/https_proxy.rs b/rust/net/examples/https_proxy.rs index d312757ec..7dbf64c7d 100644 --- a/rust/net/examples/https_proxy.rs +++ b/rust/net/examples/https_proxy.rs @@ -137,7 +137,7 @@ async fn main() { async move { tokio::time::sleep(START_NEXT_DELAY * i).await; log::info!("connecting via: {route:?}"); - connector.connect(route).await + connector.connect(route, "main".into()).await } })); let mut connection = connect_attempts diff --git a/rust/net/examples/socks_proxy.rs b/rust/net/examples/socks_proxy.rs index 636e0d495..021a34ead 100644 --- a/rust/net/examples/socks_proxy.rs +++ b/rust/net/examples/socks_proxy.rs @@ -134,7 +134,7 @@ async fn main() { async move { tokio::time::sleep(START_NEXT_DELAY * i).await; log::info!("connecting via: {route:?}"); - connector.connect(route).await + connector.connect(route, "main".into()).await } })); // needed to scope the borrow of `connector` diff --git a/rust/net/infra/src/route.rs b/rust/net/infra/src/route.rs index c731190b7..895617c1e 100644 --- a/rust/net/infra/src/route.rs +++ b/rust/net/infra/src/route.rs @@ -196,6 +196,7 @@ pub async fn connect( ordered_routes: impl Iterator, resolver: &impl Resolver, connector: C, + log_tag: Arc, mut on_error: impl FnMut(C::Error) -> ControlFlow, ) -> ( Result>, @@ -272,7 +273,7 @@ where Event::NextRouteAvailable(Some(route)) => { connects_in_progress.push(async { let started = Instant::now(); - let result = connector.connect(route.clone()).await; + let result = connector.connect(route.clone(), log_tag.clone()).await; (route, result, started) }); poll_schedule_for_next = false; @@ -755,6 +756,7 @@ mod test { &self, (): (), route: R, + _log_tag: Arc, ) -> impl Future> + Send { let (sender, receiver) = oneshot::channel(); self.outgoing @@ -788,6 +790,7 @@ mod test { .map(|(h, _addr)| FakeRoute(UnresolvedHost::from(Arc::from(*h)))), &resolver, connector, + "test".into(), |_err: FakeConnectError| ControlFlow::::Continue(()), ) .await @@ -887,6 +890,7 @@ mod test { .map(|(h, _addr)| FakeRoute(UnresolvedHost::from(Arc::from(*h)))), &resolver, connector, + "test".into(), |_err: FakeConnectError| ControlFlow::::Continue(()), ) .await; diff --git a/rust/net/infra/src/route/connect.rs b/rust/net/infra/src/route/connect.rs index ec2891475..5b09472f8 100644 --- a/rust/net/infra/src/route/connect.rs +++ b/rust/net/infra/src/route/connect.rs @@ -7,6 +7,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::net::IpAddr; +use std::sync::Arc; use derive_where::derive_where; use futures_util::TryFutureExt; @@ -38,6 +39,7 @@ pub trait Connector { &self, over: Inner, route: R, + log_tag: Arc, ) -> impl Future> + Send; } @@ -46,8 +48,9 @@ pub trait ConnectorExt: Connector { fn connect( &self, route: R, + log_tag: Arc, ) -> impl Future> + Send { - self.connect_over((), route) + self.connect_over((), route, log_tag) } } impl> ConnectorExt for C {} @@ -139,6 +142,7 @@ where &self, over: Inner, route: WebSocketRoute>, + log_tag: Arc, ) -> impl Future> + Send { let Self { outer, @@ -155,11 +159,11 @@ where } = route; async move { let inner = inner - .connect_over(over, tls_route) + .connect_over(over, tls_route, log_tag.clone()) .await .map_err(Into::into)?; outer - .connect_over(inner, (ws_fragment, http_fragment)) + .connect_over(inner, (ws_fragment, http_fragment), log_tag) .await .map_err(Into::into) } @@ -184,6 +188,7 @@ where &self, over: Inner, route: TlsRoute, + log_tag: Arc, ) -> impl Future> + Send { let Self { outer, @@ -196,11 +201,11 @@ where } = route; async move { let inner = inner - .connect_over(over, tcp_route) + .connect_over(over, tcp_route, log_tag.clone()) .await .map_err(Into::into)?; outer - .connect_over(inner, tls_fragment) + .connect_over(inner, tls_fragment, log_tag) .await .map_err(Into::into) } @@ -228,17 +233,18 @@ where &self, over: Inner, route: DirectOrProxyRoute, + log_tag: Arc, ) -> impl Future> + Send { match route { DirectOrProxyRoute::Direct(d) => Either::Left( self.direct - .connect_over(over, d) + .connect_over(over, d, log_tag) .map_ok(Either::Left) .map_err(Into::into), ), DirectOrProxyRoute::Proxy(p) => Either::Right( self.proxy - .connect_over(over, p) + .connect_over(over, p, log_tag) .map_ok(Either::Right) .map_err(Into::into), ), @@ -255,8 +261,9 @@ impl, R, Inner> Connector for &C { &self, over: Inner, route: R, + log_tag: Arc, ) -> impl Future> + Send { - (*self).connect_over(over, route) + (*self).connect_over(over, route, log_tag) } } @@ -278,7 +285,7 @@ pub mod testutils { impl Connector for ConnectFn where - F: Fn(Inner, R) -> Fut, + F: Fn(Inner, R, Arc) -> Fut, Fut: Future> + Send, { type Connection = C; @@ -289,8 +296,9 @@ pub mod testutils { &self, over: Inner, route: R, + log_tag: Arc, ) -> impl Future> + Send { - self.0(over, route) + self.0(over, route, log_tag) } } } diff --git a/rust/net/infra/src/route/connect/throttle.rs b/rust/net/infra/src/route/connect/throttle.rs index 499b026f8..08c2e6c38 100644 --- a/rust/net/infra/src/route/connect/throttle.rs +++ b/rust/net/infra/src/route/connect/throttle.rs @@ -61,6 +61,7 @@ where &self, over: Inner, route: R, + log_tag: Arc, ) -> impl Future> + Send { let Self { inner, permits } = self; async move { @@ -68,7 +69,7 @@ where .acquire_owned() .await .expect("semaphore not closed"); - let connection = inner.connect_over(over, route).await?; + let connection = inner.connect_over(over, route, log_tag).await?; Ok(ThrottledConnection(connection, permit)) } } diff --git a/rust/net/infra/src/route/describe.rs b/rust/net/infra/src/route/describe.rs index 37d48fe9d..472e50b5e 100644 --- a/rust/net/infra/src/route/describe.rs +++ b/rust/net/infra/src/route/describe.rs @@ -113,9 +113,10 @@ impl, D: Send> &self, over: Inner, route: WithLoggableDescription, + log_tag: Arc, ) -> impl Future> + Send { self.0 - .connect_over(over, route.route) + .connect_over(over, route.route, log_tag) .map_ok(|c| (c, route.description)) } } diff --git a/rust/net/infra/src/tcp_ssl.rs b/rust/net/infra/src/tcp_ssl.rs index e21fe3370..863281552 100644 --- a/rust/net/infra/src/tcp_ssl.rs +++ b/rust/net/infra/src/tcp_ssl.rs @@ -116,15 +116,17 @@ impl TransportConnector for DirectConnector { connection_params: &TransportConnectionParams, alpn: Alpn, ) -> Result, TransportConnectError> { + let log_tag: Arc = "DirectConnector".into(); let StreamAndInfo(tcp_stream, remote_address) = connect_tcp( &self.dns_resolver, RouteType::Direct, connection_params.tcp_host.as_deref(), connection_params.port, + log_tag.clone(), ) .await?; - let ssl_stream = connect_tls(tcp_stream, connection_params, alpn).await?; + let ssl_stream = connect_tls(tcp_stream, connection_params, alpn, log_tag).await?; Ok(StreamAndInfo(ssl_stream, remote_address)) } @@ -150,6 +152,7 @@ impl Connector, ()> for StatelessDirect { &self, (): (), route: TcpRoute, + _log_tag: Arc, ) -> impl Future> { let TcpRoute { address, port } = route; @@ -170,6 +173,7 @@ where &self, inner: Inner, fragment: TlsRouteFragment, + _log_tag: Arc, ) -> impl Future> + Send { let TlsRouteFragment { root_certs, @@ -239,6 +243,7 @@ async fn connect_tls( transport: S, connection_params: &TransportConnectionParams, alpn: Alpn, + log_tag: Arc, ) -> Result, TransportConnectError> { let route = TlsRouteFragment { root_certs: connection_params.certs.clone(), @@ -246,7 +251,9 @@ async fn connect_tls( alpn: Some(alpn), }; - StatelessDirect.connect_over(transport, route).await + StatelessDirect + .connect_over(transport, route, log_tag) + .await } async fn connect_tcp( @@ -254,6 +261,7 @@ async fn connect_tcp( route_type: RouteType, host: Host<&str>, port: NonZeroU16, + log_tag: Arc, ) -> Result, TransportConnectError> { let dns_lookup = match host { Host::Ip(ip) => { @@ -291,13 +299,14 @@ async fn connect_tcp( let staggered_futures = dns_lookup.into_iter().enumerate().map(|(idx, ip)| { let delay = TCP_CONNECTION_ATTEMPT_DELAY * idx.try_into().unwrap(); let connector = &connector; + let log_tag = log_tag.clone(); async move { if !delay.is_zero() { tokio::time::sleep(delay).await; } let route = TcpRoute { address: ip, port }; connector - .connect(route) + .connect(route, log_tag) .inspect_err(|e| { log::debug!("failed to connect to IP [{ip}] with an error: {e:?}"); }) diff --git a/rust/net/infra/src/tcp_ssl/proxy.rs b/rust/net/infra/src/tcp_ssl/proxy.rs index d42faf351..f6dbca48d 100644 --- a/rust/net/infra/src/tcp_ssl/proxy.rs +++ b/rust/net/infra/src/tcp_ssl/proxy.rs @@ -4,6 +4,7 @@ // use std::net::IpAddr; +use std::sync::Arc; use futures_util::TryFutureExt; use tokio::net::TcpStream; @@ -33,6 +34,7 @@ impl Connector, ()> for StatelessProxied { &self, (): (), route: ConnectionProxyRoute, + log_tag: Arc, ) -> Result { match route { ConnectionProxyRoute::Tls { proxy } => { @@ -43,21 +45,25 @@ impl Connector, ()> for StatelessProxied { let connector = super::StatelessDirect; - let tcp = connector.connect(inner).await?; + let tcp = connector.connect(inner, log_tag.clone()).await?; connector - .connect_over(tcp, tls_fragment) + .connect_over(tcp, tls_fragment, log_tag) .await .map(Into::into) } ConnectionProxyRoute::Tcp { proxy } => { let connector = super::StatelessDirect; - match connector.connect(proxy).await { + match connector.connect(proxy, log_tag).await { Ok(connection) => Ok(connection.into()), Err(_io_error) => Err(TransportConnectError::TcpConnectionFailed), } } - ConnectionProxyRoute::Socks(route) => self.connect(route).map_ok(Into::into).await, - ConnectionProxyRoute::Https(route) => self.connect(route).map_ok(Into::into).await, + ConnectionProxyRoute::Socks(route) => { + self.connect(route, log_tag).map_ok(Into::into).await + } + ConnectionProxyRoute::Https(route) => { + self.connect(route, log_tag).map_ok(Into::into).await + } } } } diff --git a/rust/net/infra/src/tcp_ssl/proxy/https.rs b/rust/net/infra/src/tcp_ssl/proxy/https.rs index 8941288bd..56314f5ad 100644 --- a/rust/net/infra/src/tcp_ssl/proxy/https.rs +++ b/rust/net/infra/src/tcp_ssl/proxy/https.rs @@ -7,6 +7,7 @@ use std::fmt::Display; use std::future::Future; use std::net::IpAddr; use std::num::NonZeroU16; +use std::sync::Arc; use bytes::Bytes; use futures_util::TryFutureExt as _; @@ -57,6 +58,7 @@ impl Connector, ()> for super::StatelessProxied { &self, (): (), route: HttpsProxyRoute, + log_tag: Arc, ) -> impl Future> + Send { let HttpsProxyRoute { fragment, inner } = route; async move { @@ -64,8 +66,16 @@ impl Connector, ()> for super::StatelessProxied { let tcp_connector = StatelessTcpConnector::default(); let inner = inner .map_either( - |tls| tls_connector.connect(tls).map_ok(Either::Left), - |tcp| tcp_connector.connect(tcp).map_ok(Either::Right), + |tls| { + tls_connector + .connect(tls, log_tag.clone()) + .map_ok(Either::Left) + }, + |tcp| { + tcp_connector + .connect(tcp, log_tag.clone()) + .map_ok(Either::Right) + }, ) .await?; let info = inner.transport_info(); @@ -93,7 +103,7 @@ impl Connector, ()> for super::StatelessProxied { info, }), Err(e) => { - log::info!("failed to connect via HTTP proxy: {e}"); + log::info!("[{log_tag}] failed to connect via HTTP proxy: {e}"); Err(TransportConnectError::ProxyProtocol) } } @@ -411,7 +421,7 @@ mod test { }; let mut client_stream = super::super::StatelessProxied - .connect(route) + .connect(route, "test".into()) .await .expect("can connect"); @@ -491,7 +501,9 @@ mod test { inner: Either::Right(route_to_proxy), }; - let connect_result = super::super::StatelessProxied.connect(route).await; + let connect_result = super::super::StatelessProxied + .connect(route, "test".into()) + .await; assert_matches!(connect_result, Err(TransportConnectError::ProxyProtocol)); } diff --git a/rust/net/infra/src/tcp_ssl/proxy/socks.rs b/rust/net/infra/src/tcp_ssl/proxy/socks.rs index c148c8677..2b1b2c305 100644 --- a/rust/net/infra/src/tcp_ssl/proxy/socks.rs +++ b/rust/net/infra/src/tcp_ssl/proxy/socks.rs @@ -79,11 +79,14 @@ impl TransportConnector for SocksConnector { log::info!("connecting to {which_protocol:?} proxy over TCP"); log::debug!("connecting to {which_protocol:?} proxy at {proxy_host}:{proxy_port} over TCP"); + let log_tag: Arc = "SocksConnector".into(); + let StreamAndInfo(tcp_stream, remote_address) = crate::tcp_ssl::connect_tcp( dns_resolver, RouteType::SocksProxy, proxy_host.as_deref(), *proxy_port, + log_tag.clone(), ) .await?; let is_ipv6 = tcp_stream @@ -135,7 +138,8 @@ impl TransportConnector for SocksConnector { })?; log::debug!("connecting TLS through proxy"); - let stream = crate::tcp_ssl::connect_tls(socks_stream, connection_params, alpn).await?; + let stream = + crate::tcp_ssl::connect_tls(socks_stream, connection_params, alpn, log_tag).await?; log::info!("connection through SOCKS proxy established successfully"); Ok(StreamAndInfo( @@ -158,6 +162,7 @@ impl Connector, ()> for super::StatelessProxied { &self, (): (), route: SocksRoute, + log_tag: Arc, ) -> impl Future> + Send { let SocksRoute { protocol, @@ -167,18 +172,18 @@ impl Connector, ()> for super::StatelessProxied { } = route; async move { - log::info!("establishing connection to host over SOCKS proxy"); + log::info!("[{log_tag}] establishing connection to host over SOCKS proxy"); log::debug!( - "establishing connection to {:?} over SOCKS proxy", + "[{log_tag}] establishing connection to {:?} over SOCKS proxy", target_addr ); - log::info!("connecting to {protocol:?} proxy over TCP"); + log::info!("[{log_tag}] connecting to {protocol:?} proxy over TCP"); let TcpRoute { address: proxy_host, port: proxy_port, } = &proxy; - log::debug!("connecting to {protocol:?} proxy at {proxy_host}:{proxy_port} over TCP"); + log::debug!("[{log_tag}] connecting to {protocol:?} proxy at {proxy_host}:{proxy_port} over TCP"); let target = match &target_addr { crate::route::ProxyTarget::ResolvedLocally(ip) => { @@ -189,9 +194,11 @@ impl Connector, ()> for super::StatelessProxied { } }; - let stream = super::super::StatelessDirect.connect(proxy).await?; - log::info!("performing proxy handshake"); - log::debug!("performing proxy handshake with {target:?}"); + let stream = super::super::StatelessDirect + .connect(proxy, log_tag.clone()) + .await?; + log::info!("[{log_tag}] performing proxy handshake"); + log::debug!("[{log_tag}] performing proxy handshake with {target:?}"); protocol .connect_to_proxy(stream, target) .await diff --git a/rust/net/infra/src/tcp_ssl/proxy/tls.rs b/rust/net/infra/src/tcp_ssl/proxy/tls.rs index fb3ae7cda..151beef02 100644 --- a/rust/net/infra/src/tcp_ssl/proxy/tls.rs +++ b/rust/net/infra/src/tcp_ssl/proxy/tls.rs @@ -55,11 +55,13 @@ impl TransportConnector for TlsProxyConnector { connection_params: &TransportConnectionParams, alpn: Alpn, ) -> Result, TransportConnectError> { + let log_tag: Arc = "TlsProxyConnector".into(); let StreamAndInfo(tcp_stream, remote_address) = connect_tcp( &self.dns_resolver, RouteType::TlsProxy, self.proxy_host.as_deref(), self.proxy_port, + log_tag.clone(), ) .await?; @@ -92,7 +94,7 @@ impl TransportConnector for TlsProxyConnector { } }; - let tls_stream = connect_tls(inner_stream, connection_params, alpn).await?; + let tls_stream = connect_tls(inner_stream, connection_params, alpn, log_tag).await?; Ok(StreamAndInfo( tls_stream, diff --git a/rust/net/infra/src/ws.rs b/rust/net/infra/src/ws.rs index fef12c4e3..9e3ac1451 100644 --- a/rust/net/infra/src/ws.rs +++ b/rust/net/infra/src/ws.rs @@ -166,6 +166,7 @@ where &self, inner: Inner, route: (WebSocketRouteFragment, HttpRouteFragment), + _log_tag: Arc, ) -> impl std::future::Future> + Send { let ( WebSocketRouteFragment { @@ -638,6 +639,7 @@ pub mod testutil { front_name: None, }, ), + "test".into(), ); let server_future = tokio_tungstenite::accept_async(server); let (client_res, server_res) = tokio::join!(client_future, server_future); diff --git a/rust/net/infra/src/ws2.rs b/rust/net/infra/src/ws2.rs index 1aa673a33..054719e2e 100644 --- a/rust/net/infra/src/ws2.rs +++ b/rust/net/infra/src/ws2.rs @@ -6,6 +6,7 @@ use std::fmt::Display; use std::io::Error as IoError; use std::pin::Pin; +use std::sync::Arc; use futures_util::{Sink, SinkExt as _, Stream, StreamExt as _}; use pin_project::pin_project; @@ -100,6 +101,9 @@ pub struct Connection { /// Configuration for this websocket client's behavior. config: Config, + + /// A tag to include in log lines, to disambiguate multiple websockets. + log_tag: Arc, } /// Fatal error that causes a connection to be closed. @@ -201,7 +205,7 @@ impl Connection where R: Stream, { - pub fn new(stream: S, outgoing_rx: R, config: Config) -> Self { + pub fn new(stream: S, outgoing_rx: R, config: Config, log_tag: Arc) -> Self { Self { stream, outgoing_rx, @@ -211,6 +215,7 @@ where last_heard_from_server: None, last_sent_to_server: None, last_sent_ping_to_server: None, + log_tag, } } @@ -254,6 +259,7 @@ where last_sent_to_server, last_sent_ping_to_server, last_heard_from_server, + log_tag, } = self.project(); // For the first call this function, assume we just heard from & sent to @@ -386,7 +392,9 @@ where } Message::Close(close) => { let code = close.as_ref().map(|c| c.code); - log::info!("received a close frame from the server with code {code:?}",); + log::info!( + "[{log_tag}] received a close frame from the server with code {code:?}", + ); match close { None | Some(CloseFrame { @@ -651,6 +659,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -686,6 +695,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -735,6 +745,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -802,6 +813,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -824,6 +836,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -856,6 +869,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -887,6 +901,7 @@ mod test { remote_idle_ping_timeout: FOREVER, remote_idle_disconnect_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -929,6 +944,7 @@ mod test { remote_idle_disconnect_timeout: REMOTE_DISCONNECT_TIMEOUT, local_idle_timeout: FOREVER, }, + "test".into(), ); pin_mut!(connection); @@ -964,6 +980,7 @@ mod test { remote_idle_ping_timeout: REMOTE_IDLE_TIMEOUT, remote_idle_disconnect_timeout: REMOTE_DISCONNECT_TIMEOUT, }, + "test".into(), ); pin_mut!(connection); diff --git a/rust/net/infra/src/ws2/attested.rs b/rust/net/infra/src/ws2/attested.rs index d1967f43f..87989581c 100644 --- a/rust/net/infra/src/ws2/attested.rs +++ b/rust/net/infra/src/ws2/attested.rs @@ -4,6 +4,7 @@ // use std::borrow::Cow; +use std::sync::Arc; use attest::client_connection::ClientConnection; use tokio::sync::{mpsc, oneshot}; @@ -71,9 +72,10 @@ impl AttestedConnection { pub async fn connect( ws: WebSocketStream, ws_config: crate::ws2::Config, + log_tag: Arc, new_handshake: impl FnOnce(&[u8]) -> attest::enclave::Result, ) -> Result { - let mut ws_client = WsClient::new(ws, ws_config); + let mut ws_client = WsClient::new(ws, ws_config, log_tag); let client_connection = authenticate(&mut ws_client, new_handshake).await?; @@ -163,11 +165,18 @@ impl WsClient { fn new( ws: WebSocketStream, ws_config: crate::ws2::Config, + log_tag: Arc, ) -> Self { let (outgoing_tx, outgoing_rx) = mpsc::channel(WS_MESSAGE_BUFFER); let (incoming_tx, incoming_rx) = mpsc::channel(WS_MESSAGE_BUFFER); - let _task = tokio::spawn(spawned_task_body(ws, outgoing_rx, incoming_tx, ws_config)); + let _task = tokio::spawn(spawned_task_body( + ws, + outgoing_rx, + incoming_tx, + ws_config, + log_tag, + )); Self { outgoing_tx, @@ -240,9 +249,14 @@ async fn spawned_task_body( outgoing_rx: mpsc::Receiver<(TextOrBinary, oneshot::Sender>)>, incoming_tx: mpsc::Sender, ReceiveError>>, config: crate::ws2::Config, + log_tag: Arc, ) -> Result<(), TaskExitError> { - let mut connection = - crate::ws2::Connection::new(stream, ReceiverStream::new(outgoing_rx), config); + let mut connection = crate::ws2::Connection::new( + stream, + ReceiverStream::new(outgoing_rx), + config, + log_tag.clone(), + ); let mut connection = std::pin::pin!(connection); loop { @@ -250,7 +264,9 @@ async fn spawned_task_body( Outcome::Continue(event) => match event { MessageEvent::SentMessage(response_sender) => { if response_sender.send(Ok(())).is_err() { - log::debug!("failed to signal send because the sender was dropped"); + log::debug!( + "[{log_tag}] failed to signal send because the sender was dropped" + ); } } MessageEvent::SendFailed(response_sender, tungstenite_send_error) => { @@ -259,7 +275,7 @@ async fn spawned_task_body( .send(Err(tungstenite_send_error.into())) .is_err() { - log::debug!("failed to signal send error because the sender was dropped"); + log::debug!("[{log_tag}] failed to signal send error because the sender was dropped"); } return Err(task_err); } @@ -270,7 +286,7 @@ async fn spawned_task_body( .is_err() { log::debug!( - "failed to forward received message because the receiver was dropped" + "[{log_tag}] failed to forward received message because the receiver was dropped" ); // The receiver has been dropped, so we should exit. return Ok(()); @@ -284,7 +300,9 @@ async fn spawned_task_body( .await .is_err() { - log::debug!("failed to send close event because the receiver was dropped") + log::debug!( + "[{log_tag}] failed to send close event because the receiver was dropped" + ) } return Ok(()); } @@ -303,7 +321,7 @@ async fn spawned_task_body( .await .is_err() { - log::debug!("failed to send abnormal close event because the receiver was dropped"); + log::debug!("[{log_tag}] failed to send abnormal close event because the receiver was dropped"); } return Err(TaskExitError::AbnormalServerClose { code }); } @@ -329,7 +347,9 @@ async fn spawned_task_body( ), }; if incoming_tx.send(Err(tx_error)).await.is_err() { - log::debug!("failed to signal send error because the receiver was dropped") + log::debug!( + "[{log_tag}] failed to signal send error because the receiver was dropped" + ) } return Err(exit_error); } @@ -650,13 +670,17 @@ mod test { attest::sgx_session::testutil::private_key(), )); - let mut connection = - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { + let mut connection = AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { assert_eq!(fake_attestation, FAKE_ATTESTATION); attest::sgx_session::testutil::handshake_from_tests_data() - }) - .await - .unwrap(); + }, + ) + .await + .unwrap(); connection.send(Vec::from(ECHO_BYTES)).await.unwrap(); let response: Vec = connection.receive().await.unwrap().unwrap_next(); @@ -681,7 +705,8 @@ mod test { } assert_matches!( - AttestedConnection::connect(client, FAKE_WS_CONFIG, fail_to_handshake).await, + AttestedConnection::connect(client, FAKE_WS_CONFIG, "test".into(), fail_to_handshake) + .await, Err(_) ); } @@ -695,13 +720,17 @@ mod test { attest::sgx_session::testutil::private_key(), )); - let mut connection = - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { + let mut connection = AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { assert_eq!(fake_attestation, FAKE_ATTESTATION); attest::sgx_session::testutil::handshake_from_tests_data() - }) - .await - .unwrap(); + }, + ) + .await + .unwrap(); connection.send(Vec::from(ECHO_BYTES)).await.unwrap(); // Decoding a vec as a 32-bit float shouldn't work. diff --git a/rust/net/src/cdsi.rs b/rust/net/src/cdsi.rs index cc597fbd2..bf1fc1480 100644 --- a/rust/net/src/cdsi.rs +++ b/rust/net/src/cdsi.rs @@ -337,7 +337,7 @@ impl CdsiConnection { { log::info!("connecting to CDSI endpoint"); let (connection, _info) = endpoint - .connect(auth, transport_connector) + .connect(auth, transport_connector, "cdsi".into()) .inspect_err(|e| { log::warn!("CDSI connection failed: {e}"); }) @@ -363,6 +363,7 @@ impl CdsiConnection { resolver, confirmation_header_name, ws_config, + "cdsi".into(), move |attestation_message| Cdsi::new_handshake(params, attestation_message), ) .await?; @@ -736,10 +737,15 @@ mod test { )); let cdsi_connection = CdsiConnection( - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { - assert_eq!(fake_attestation, FAKE_ATTESTATION); - attest::sgx_session::testutil::handshake_from_tests_data() - }) + AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { + assert_eq!(fake_attestation, FAKE_ATTESTATION); + attest::sgx_session::testutil::handshake_from_tests_data() + }, + ) .await .expect("handshake failed"), ); @@ -790,10 +796,15 @@ mod test { )); let cdsi_connection = CdsiConnection( - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { - assert_eq!(fake_attestation, FAKE_ATTESTATION); - attest::sgx_session::testutil::handshake_from_tests_data() - }) + AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { + assert_eq!(fake_attestation, FAKE_ATTESTATION); + attest::sgx_session::testutil::handshake_from_tests_data() + }, + ) .await .expect("handshake failed"), ); @@ -836,10 +847,15 @@ mod test { )); let cdsi_connection = CdsiConnection( - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { - assert_eq!(fake_attestation, FAKE_ATTESTATION); - attest::sgx_session::testutil::handshake_from_tests_data() - }) + AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { + assert_eq!(fake_attestation, FAKE_ATTESTATION); + attest::sgx_session::testutil::handshake_from_tests_data() + }, + ) .await .expect("handshake failed"), ); @@ -912,10 +928,15 @@ mod test { )); let cdsi_connection = CdsiConnection( - AttestedConnection::connect(client, FAKE_WS_CONFIG, |fake_attestation| { - assert_eq!(fake_attestation, FAKE_ATTESTATION); - attest::sgx_session::testutil::handshake_from_tests_data() - }) + AttestedConnection::connect( + client, + FAKE_WS_CONFIG, + "test".into(), + |fake_attestation| { + assert_eq!(fake_attestation, FAKE_ATTESTATION); + attest::sgx_session::testutil::handshake_from_tests_data() + }, + ) .await .expect("handshake failed"), ); diff --git a/rust/net/src/chat.rs b/rust/net/src/chat.rs index a6a9f2708..464e42055 100644 --- a/rust/net/src/chat.rs +++ b/rust/net/src/chat.rs @@ -535,6 +535,7 @@ pub struct PendingChatConnection { connection: ChatWebSocketConnection, ws_config: ws2::Config, connection_info: ConnectionInfo, + log_tag: Arc, } pub struct AuthenticatedChatHeaders { @@ -553,6 +554,7 @@ impl ChatConnection { user_agent: &UserAgent, ws_config: self::ws2::Config, auth: Option, + log_tag: &str, ) -> Result { let headers = auth .into_iter() @@ -573,6 +575,7 @@ impl ChatConnection { fragment: ws_fragment.clone(), }); + let log_tag: Arc = log_tag.into(); let result = ConnectState::connect_ws( connect, ws_routes, @@ -585,8 +588,9 @@ impl ChatConnection { ThrottlingConnector::new(crate::infra::ws::Stateless, 1), resolver, confirmation_header_name.as_ref(), + log_tag.clone(), |error| { - log::debug!("connection attempt failed with {error}"); + log::debug!("[{log_tag}] connection attempt failed with {error}"); match error.classify() { ErrorClass::Intermittent => ControlFlow::Continue(()), ErrorClass::Fatal | ErrorClass::RetryAt(_) => { @@ -608,6 +612,7 @@ impl ChatConnection { connection: ws_connection, connection_info, ws_config, + log_tag, } }), Err(e) => { @@ -634,9 +639,16 @@ impl ChatConnection { connection, ws_config, connection_info, + log_tag, } = pending; Self { - inner: crate::chat::ws2::Chat::new(tokio_runtime, connection, ws_config, listener), + inner: crate::chat::ws2::Chat::new( + tokio_runtime, + connection, + ws_config, + log_tag, + listener, + ), connection_info, } } @@ -668,7 +680,10 @@ impl PendingChatConnection { pub async fn disconnect(&mut self) { if let Err(error) = self.connection.close().await { - log::error!("pending chat connection disconnect failed with {error}"); + log::error!( + "[{}] pending chat connection disconnect failed with {error}", + &self.log_tag + ); } } } @@ -754,6 +769,7 @@ pub mod test_support { &user_agent, ws_config, None, + "test", ) .await?; diff --git a/rust/net/src/chat/ws2.rs b/rust/net/src/chat/ws2.rs index 7aa961ea4..59b85d31d 100644 --- a/rust/net/src/chat/ws2.rs +++ b/rust/net/src/chat/ws2.rs @@ -9,6 +9,7 @@ use std::future::Future; use std::io::ErrorKind as IoErrorKind; use std::panic::AssertUnwindSafe; use std::pin::Pin; +use std::sync::Arc; use futures_util::{pin_mut, Sink, Stream, StreamExt as _}; use http::uri::PathAndQuery; @@ -147,6 +148,7 @@ impl Chat { tokio_runtime: tokio::runtime::Handle, transport: T, config: Config, + log_tag: Arc, listener: EventListener, ) -> Self where @@ -173,6 +175,7 @@ impl Chat { }, ), initial_request_id, + log_tag, listener, tokio_runtime, ) @@ -274,6 +277,7 @@ impl Chat { fn new_inner( into_inner_connection: impl IntoInnerConnection, initial_request_id: u64, + log_tag: Arc, listener: EventListener, tokio_runtime: tokio::runtime::Handle, ) -> Self { @@ -282,6 +286,7 @@ impl Chat { let requests_in_flight = InFlightRequests { outstanding_reqs: Default::default(), + log_tag: log_tag.clone(), }; let mut request_id = initial_request_id; @@ -294,15 +299,21 @@ impl Chat { (message, meta) }); - let response_rx = UnboundedReceiverStream::new(response_rx).map(|response| { + let log_tag_for_responses = log_tag.clone(); + let response_rx = UnboundedReceiverStream::new(response_rx).map(move |response| { let OutgoingResponse { id, status } = response; - log::debug!("sending response for incoming request {}", id); + log::debug!( + "[{log_tag_for_responses}] sending response for incoming request {}", + id + ); let message = response_for_status(id, status); (message, OutgoingMeta::ResponseToIncoming) }); - let inner_connection = into_inner_connection - .into_inner_connection(tokio_stream::StreamExt::merge(request_rx, response_rx)); + let inner_connection = into_inner_connection.into_inner_connection( + tokio_stream::StreamExt::merge(request_rx, response_rx), + log_tag.clone(), + ); let connection = ConnectionImpl { inner: inner_connection, @@ -311,6 +322,7 @@ impl Chat { let task = tokio_runtime.spawn(spawned_task_body( connection, + log_tag, listener, response_tx.downgrade(), )); @@ -364,6 +376,7 @@ enum TaskState { struct InFlightRequests { outstanding_reqs: HashMap>>, + log_tag: Arc, } /// Why the task finished unexpectedly. @@ -576,6 +589,7 @@ impl ListenerState { /// [`Outcome::Finished`]. async fn spawned_task_body( connection: ConnectionImpl, + log_tag: Arc, listener: EventListener, weak_response_tx: mpsc::WeakUnboundedSender, ) -> Result { @@ -586,7 +600,7 @@ async fn spawned_task_body( // In case the task panics, make sure the callback at least knows about the // disconnection. let mut listener_state = scopeguard::guard_on_unwind(listener_state, |mut listener_state| { - log::error!("chat handler task exited abnormally"); + log::error!("[{log_tag}] chat handler task exited abnormally"); listener_state.send_event_blocking(ListenerEvent::Finished(Err(FinishError::Unknown))); }); let result = loop { @@ -598,7 +612,7 @@ async fn spawned_task_body( Outcome::Finished(result) => break result, }; - log::debug!("received incoming request from server: {id}"); + log::debug!("[{log_tag}] received incoming request from server: {id}"); let event = ListenerEvent::ReceivedMessage( incoming_request, @@ -610,8 +624,8 @@ async fn spawned_task_body( listener_state.send_event(&tokio_rt, event).await; }; match &result { - Ok(reason) => log::info!("chat handler task finishing after {reason}"), - Err(err) => log::info!("chat handler task is stopping due to {err}"), + Ok(reason) => log::info!("[{log_tag}] chat handler task finishing after {reason}"), + Err(err) => log::info!("[{log_tag}] chat handler task is stopping due to {err}"), } let task_result = result.as_ref().map_err(Into::into).copied(); @@ -746,7 +760,10 @@ impl InFlightRequests { id: RequestId, response_sender: oneshot::Sender>, ) { - let Self { outstanding_reqs } = self; + let Self { + outstanding_reqs, + log_tag: _, + } = self; let prev = outstanding_reqs.insert(id, response_sender); assert!( prev.is_none(), @@ -756,11 +773,17 @@ impl InFlightRequests { } fn finish_send(&mut self, id: RequestId, result: Result) { - let Self { outstanding_reqs } = self; + let Self { + outstanding_reqs, + log_tag, + } = self; if let Some(sender) = outstanding_reqs.remove(&id) { let _ignore_send_error = sender.send(result); } else { - log::error!("tried to send response to nonexistent request {}", id.0); + log::error!( + "[{log_tag}] tried to send response to nonexistent request {}", + id.0 + ); } } } @@ -774,7 +797,11 @@ impl InFlightRequests { /// [`InnerConnection`]. trait IntoInnerConnection { /// Turn `self` and an outgoing stream into an `InnerConnection` impl. - fn into_inner_connection(self, outgoing_stream: R) -> impl InnerConnection + Send + 'static + fn into_inner_connection( + self, + outgoing_stream: R, + log_tag: Arc, + ) -> impl InnerConnection + Send + 'static where R: Stream + Send + 'static; } @@ -786,12 +813,16 @@ where + Send + 'static, { - fn into_inner_connection(self, outgoing_stream: R) -> impl InnerConnection + Send + 'static + fn into_inner_connection( + self, + outgoing_stream: R, + log_tag: Arc, + ) -> impl InnerConnection + Send + 'static where R: Stream + Send + 'static, { let (stream, config) = self; - crate::infra::ws2::Connection::new(stream, outgoing_stream, config) + crate::infra::ws2::Connection::new(stream, outgoing_stream, config, log_tag) } } @@ -841,6 +872,7 @@ impl ConnectionImpl { requests_in_flight: &mut InFlightRequests, event: Outcome, Result>, ) -> Outcome, Result> { + let log_tag = &requests_in_flight.log_tag; match event { Outcome::Finished(Ok(finish)) => return Outcome::Finished(Ok(finish)), Outcome::Finished(Err(err)) => { @@ -875,7 +907,7 @@ impl ConnectionImpl { Err(TaskExitError::SendProtocol(protocol_error.clone())) } }; - log::warn!("shutting down after send failed: {send_error}"); + log::warn!("[{log_tag}] shutting down after send failed: {send_error}"); match meta { OutgoingMeta::SentRequest(_request_id, response_sender) => { // The server isn't going to get our response to an @@ -907,11 +939,11 @@ impl ConnectionImpl { // there's nothing to do here. We could be strict here // and close the connection, or ignore the message and // keep going. We choose the latter. - log::warn!("received invalid message: {e}"); + log::warn!("[{log_tag}] received invalid message: {e}"); } Err(ChatProtocolError::InvalidResponse(id)) => { log::warn!( - "received invalid response for outgoing request {id}", + "[{log_tag}] received invalid response for outgoing request {id}", id = id.0 ); requests_in_flight.finish_send(id, Err(TaskSendError::InvalidResponse)); @@ -920,7 +952,10 @@ impl ConnectionImpl { // incoming requests. } Ok(ChatMessage::Response(id, response)) => { - log::debug!("received response for outgoing request {id}", id = id.0); + log::debug!( + "[{log_tag}] received response for outgoing request {id}", + id = id.0 + ); requests_in_flight.finish_send(id, Ok(response)) } Ok(ChatMessage::Request(id, request_proto)) => { @@ -1219,6 +1254,7 @@ mod test { incoming_events: incoming_events_rx, }, initial_request_id, + "test".into(), listener, tokio::runtime::Handle::current(), ); @@ -1304,6 +1340,7 @@ mod test { fn into_inner_connection( self, outgoing_stream: R, + _log_tag: Arc, ) -> impl InnerConnection + Send + 'static where R: Stream + Send + 'static, diff --git a/rust/net/src/connect_state.rs b/rust/net/src/connect_state.rs index 49190357e..1c7f63f7e 100644 --- a/rust/net/src/connect_state.rs +++ b/rust/net/src/connect_state.rs @@ -5,6 +5,7 @@ use std::default::Default; use std::ops::ControlFlow; +use std::sync::Arc; use http::HeaderName; use itertools::Itertools as _; @@ -78,6 +79,7 @@ where ws_connector: WC, resolver: &DnsResolver, confirmation_header_name: Option<&HeaderName>, + log_tag: Arc, mut on_error: impl FnMut(WebSocketServiceConnectError) -> ControlFlow, ) -> Result<(WC::Connection, RouteInfo), ConnectError> where @@ -94,7 +96,10 @@ where .routes(&connect_read.route_provider_context) .collect_vec(); - log::info!("starting connection attempt with {} routes", routes.len()); + log::info!( + "[{log_tag}] starting connection attempt with {} routes", + routes.len() + ); let route_provider = routes.into_iter().map(ResolveWithSavedDescription); let connector = DescribedRouteConnector(ComposedConnector::new( @@ -110,6 +115,7 @@ where route_provider, resolver, connector, + log_tag.clone(), |error| { let error = WebSocketServiceConnectError::from_websocket_error( error, @@ -128,10 +134,10 @@ where match &result { Ok((_connection, route)) => log::info!( - "connection through {route} succeeded after {:.3?}", + "[{log_tag}] connection through {route} succeeded after {:.3?}", start.elapsed() ), - Err(e) => log::info!("connection failed with {e}"), + Err(e) => log::info!("[{log_tag}] connection failed with {e}"), } this.write().await.attempts_record.apply_outcome_updates( @@ -164,6 +170,7 @@ where resolver: &DnsResolver, confirmation_header_name: Option, ws_config: libsignal_net_infra::ws2::Config, + log_tag: Arc, new_handshake: impl FnOnce(&[u8]) -> Result, ) -> Result<(AttestedConnection, RouteInfo), crate::enclave::Error> where @@ -180,6 +187,7 @@ where crate::infra::ws::Stateless, resolver, confirmation_header_name.as_ref(), + log_tag.clone(), |error| match error.classify() { ErrorClass::Intermittent => ControlFlow::Continue(()), ErrorClass::RetryAt(_) | ErrorClass::Fatal => { @@ -195,7 +203,7 @@ where ConnectError::FatalConnect(e) => e, })?; - let connection = AttestedConnection::connect(ws, ws_config, new_handshake).await?; + let connection = AttestedConnection::connect(ws, ws_config, log_tag, new_handshake).await?; Ok((connection, route_info)) } } @@ -292,7 +300,7 @@ mod test { }, }; - let ws_connector = ConnectFn(|(), route| { + let ws_connector = ConnectFn(|(), route, _log_tag| { let (ws, http) = &route; std::future::ready( if (ws, http) == (&failing_route.fragment, &failing_route.inner.fragment) { @@ -308,7 +316,7 @@ mod test { )])); let fake_transport_connector = - ConnectFn(move |(), _| std::future::ready(Ok::<_, WebSocketConnectError>(()))); + ConnectFn(move |(), _, _| std::future::ready(Ok::<_, WebSocketConnectError>(()))); let state = ConnectState { route_resolver: RouteResolver::default(), @@ -324,6 +332,7 @@ mod test { ws_connector, &resolver, None, + "test".into(), |e| { let e = assert_matches!(e, WebSocketServiceConnectError::Connect(e, _) => e); assert_matches!( diff --git a/rust/net/src/enclave.rs b/rust/net/src/enclave.rs index af7c9a494..212da9d94 100644 --- a/rust/net/src/enclave.rs +++ b/rust/net/src/enclave.rs @@ -4,6 +4,7 @@ // use std::marker::PhantomData; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use attest::svr2::RaftConfig; @@ -311,6 +312,7 @@ impl EnclaveEndpointConnect &self, auth: Auth, transport_connector: T, + log_tag: Arc, ) -> Result<(AttestedConnection, ServiceConnectionInfo), Error> where C: ConnectionManager, @@ -323,6 +325,7 @@ impl EnclaveEndpointConnect &self.endpoint_connection, auth, transport_connector, + log_tag, &move |attestation_message| E::new_handshake(&self.params, attestation_message), ) .await @@ -360,6 +363,7 @@ async fn connect_attested( endpoint_connection: &EndpointConnection, auth: Auth, transport_connector: T, + log_tag: Arc, do_handshake: &(dyn Sync + Fn(&[u8]) -> enclave::Result), ) -> Result<(AttestedConnection, ServiceConnectionInfo), Error> { let connector = WebSocketStreamConnector::new( @@ -387,6 +391,7 @@ async fn connect_attested( let attested = AttestedConnection::connect( websocket, endpoint_connection.config.ws2_config(), + log_tag, do_handshake, ) .await?; @@ -582,6 +587,7 @@ mod test { username: "fdsa".to_string(), }, AlwaysFailingConnector, + "test".into(), ) .await .map(|(connection, _info)| connection) diff --git a/rust/net/src/svr.rs b/rust/net/src/svr.rs index 5638789be..dd6ed9a5c 100644 --- a/rust/net/src/svr.rs +++ b/rust/net/src/svr.rs @@ -46,7 +46,11 @@ where T: TransportConnector, { connection - .connect(auth, transport_connector) + .connect( + auth, + transport_connector, + format!("svr3:{}", std::any::type_name::()).into(), + ) .await .map(|(connection, info)| Self { inner: connection,