diff --git a/node/ts/test/NetTest.ts b/node/ts/test/NetTest.ts index 61c54ef88..3a2686030 100644 --- a/node/ts/test/NetTest.ts +++ b/node/ts/test/NetTest.ts @@ -152,19 +152,13 @@ describe('chat service api', () => { }).timeout(10000); }); - it('cannot connect through an invalid proxy', async () => { + it('cannot connect through an invalid proxy', () => { // The default TLS proxy config doesn't support staging, so we connect to production. const net = new Net(Environment.Production, userAgent); expect(() => net.setProxy('signalfoundation.org', 0)).throws(Error); expect(() => net.setProxy('signalfoundation.org', 100_000)).throws(Error); expect(() => net.setProxy('signalfoundation.org', -1)).throws(Error); expect(() => net.setProxy('signalfoundation.org', 0.1)).throws(Error); - - const chatService = net.newChatService(); - // Make sure we *can't* connect. - await expect(chatService.connectUnauthenticated()).to.be.rejectedWith( - LibSignalErrorBase - ); }).timeout(10000); }); diff --git a/rust/bridge/shared/Cargo.toml b/rust/bridge/shared/Cargo.toml index 2d114127e..307c4503f 100644 --- a/rust/bridge/shared/Cargo.toml +++ b/rust/bridge/shared/Cargo.toml @@ -71,6 +71,7 @@ strum = { version = "0.26", features = ["derive"] } [dev-dependencies] assert_matches = "1.5" test-case = "3.3.1" +tokio = { version = "1", features = ["test-util", "time", "macros"] } [features] ffi = [] diff --git a/rust/bridge/shared/src/net.rs b/rust/bridge/shared/src/net.rs index f7119a161..e544fbf88 100644 --- a/rust/bridge/shared/src/net.rs +++ b/rust/bridge/shared/src/net.rs @@ -6,7 +6,6 @@ use std::convert::TryInto as _; use std::num::{NonZeroU16, NonZeroU32}; use std::panic::RefUnwindSafe; -use std::time::Duration; use base64::prelude::{Engine, BASE64_STANDARD}; use http::uri::PathAndQuery; @@ -26,6 +25,7 @@ use libsignal_net::infra::tcp_ssl::{ use libsignal_net::infra::{make_ws_config, EndpointConnection}; use libsignal_net::svr::{self, SvrConnection}; use libsignal_net::svr3::{self, OpaqueMaskedShareSet, PpssOps as _}; +use libsignal_net::timeouts::ONE_ROUTE_CONNECTION_TIMEOUT; use rand::rngs::OsRng; use crate::support::*; @@ -68,7 +68,6 @@ pub struct ConnectionManager { impl RefUnwindSafe for ConnectionManager {} impl ConnectionManager { - const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); fn new(environment: Environment, user_agent: String) -> Self { log::info!("Initializing connection manager for {}...", &environment); let dns_resolver = @@ -81,11 +80,11 @@ impl ConnectionManager { .chat_domain_config .connection_params_with_fallback(); let chat_connection_params = add_user_agent_header(chat_connection_params, &user_agent); - let chat_ws_config = make_ws_config(chat_endpoint, Self::DEFAULT_CONNECT_TIMEOUT); + let chat_ws_config = make_ws_config(chat_endpoint, ONE_ROUTE_CONNECTION_TIMEOUT); Self { chat: EndpointConnection::new_multi( chat_connection_params, - Self::DEFAULT_CONNECT_TIMEOUT, + ONE_ROUTE_CONNECTION_TIMEOUT, chat_ws_config, ), cdsi: Self::endpoint_connection(&environment.env().cdsi, &user_agent), @@ -107,7 +106,7 @@ impl ConnectionManager { EnclaveEndpointConnection::new_multi( endpoint.mr_enclave, params, - Self::DEFAULT_CONNECT_TIMEOUT, + ONE_ROUTE_CONNECTION_TIMEOUT, ) } } @@ -271,7 +270,6 @@ async fn svr3_connect<'a>( #[cfg(test)] mod test { use super::*; - use test_case::test_case; #[test_case(Environment::Staging; "staging")] diff --git a/rust/bridge/shared/src/net/chat.rs b/rust/bridge/shared/src/net/chat.rs index 46d6cca43..18a4185d8 100644 --- a/rust/bridge/shared/src/net/chat.rs +++ b/rust/bridge/shared/src/net/chat.rs @@ -466,3 +466,36 @@ async fn ServerMessageAck_Send(ack: &ServerMessageAck) -> Result<(), ChatService let future = ack.inner.take().expect("a message is only acked once"); future.await } + +#[cfg(test)] +mod test { + use super::*; + use crate::net::{ConnectionManager, ConnectionManager_set_proxy, Environment}; + use assert_matches::assert_matches; + use libsignal_net::chat::ChatServiceError; + + #[tokio::test(start_paused = true)] + async fn cannon_connect_through_invalid_proxy() { + let cm = ConnectionManager::new(Environment::Staging, "test-user-agent".to_string()); + + assert_matches!( + ConnectionManager_set_proxy(&cm, "signalfoundation.org".to_string(), 0), + Err(_) + ); + assert_matches!( + ConnectionManager_set_proxy(&cm, "signalfoundation.org".to_string(), 100_000), + Err(_) + ); + + assert_matches!( + ConnectionManager_set_proxy(&cm, "signalfoundation.org".to_string(), -1), + Err(_) + ); + + let chat = ChatService_new(&cm, "".to_string(), "".to_string()); + assert_matches!( + ChatService_connect_unauth(&chat).await, + Err(ChatServiceError::AllConnectionRoutesFailed { .. }) + ); + } +} diff --git a/rust/net/examples/chat_smoke_test.rs b/rust/net/examples/chat_smoke_test.rs index a9f457f81..7ada62ef2 100644 --- a/rust/net/examples/chat_smoke_test.rs +++ b/rust/net/examples/chat_smoke_test.rs @@ -99,12 +99,13 @@ async fn test_connection( env: &libsignal_net::env::Env<'static, Svr3Env<'static>>, connection_params: Vec, ) -> Result<(), ChatServiceError> { + let one_route_connect_timeout = Duration::from_secs(5); let dns_resolver = DnsResolver::new_with_static_fallback(env.static_fallback()); let transport_connector = DirectConnector::new(dns_resolver); let chat_endpoint = PathAndQuery::from_static(WEB_SOCKET_PATH); - let chat_ws_config = make_ws_config(chat_endpoint, Duration::from_secs(5)); + let chat_ws_config = make_ws_config(chat_endpoint, one_route_connect_timeout); let connection = - EndpointConnection::new_multi(connection_params, Duration::from_secs(5), chat_ws_config); + EndpointConnection::new_multi(connection_params, one_route_connect_timeout, chat_ws_config); let (incoming_tx, _incoming_rx) = mpsc::channel(1); let chat = chat_service( diff --git a/rust/net/src/chat.rs b/rust/net/src/chat.rs index eb64ce498..76d3025e8 100644 --- a/rust/net/src/chat.rs +++ b/rust/net/src/chat.rs @@ -21,7 +21,9 @@ use crate::utils::basic_authorization; pub mod chat_reconnect; mod error; +use crate::timeouts::MULTI_ROUTE_CONNECTION_TIMEOUT; pub use error::ChatServiceError; + pub mod server_requests; pub mod ws; @@ -30,8 +32,6 @@ pub type RequestProto = proto::chat_websocket::WebSocketRequestMessage; pub type ResponseProto = proto::chat_websocket::WebSocketResponseMessage; pub type ChatMessageType = proto::chat_websocket::web_socket_message::Type; -const TOTAL_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); - #[async_trait] pub trait ChatService { /// Sends request and gets a response from the Chat Service. @@ -440,7 +440,7 @@ fn build_authorized_chat_service( header_auth_decorator.clone(), ), connection_manager_ws.clone(), - TOTAL_CONNECTION_TIMEOUT, + MULTI_ROUTE_CONNECTION_TIMEOUT, ); AuthorizedChatService { @@ -458,7 +458,7 @@ fn build_anonymous_chat_service( let chat_over_ws_anonymous = ServiceWithReconnect::new( service_connector_ws.clone(), connection_manager_ws.clone(), - TOTAL_CONNECTION_TIMEOUT, + MULTI_ROUTE_CONNECTION_TIMEOUT, ); AnonymousChatService { diff --git a/rust/net/src/enclave.rs b/rust/net/src/enclave.rs index 82c623bd7..bfc352130 100644 --- a/rust/net/src/enclave.rs +++ b/rust/net/src/enclave.rs @@ -308,13 +308,13 @@ impl EnclaveEndpointConnection { pub fn new_multi( mr_enclave: MrEnclave<&'static [u8], E>, connection_params: impl IntoIterator, - connect_timeout: Duration, + one_route_connect_timeout: Duration, ) -> Self { Self { endpoint_connection: EndpointConnection::new_multi( connection_params, - connect_timeout, - make_ws_config(E::url_path(mr_enclave.as_ref()), connect_timeout), + one_route_connect_timeout, + make_ws_config(E::url_path(mr_enclave.as_ref()), one_route_connect_timeout), ), params: EndpointParams { mr_enclave, @@ -411,7 +411,7 @@ mod test { } } - const CONNECT_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(10); + const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); async fn enclave_connect( manager: C, diff --git a/rust/net/src/env.rs b/rust/net/src/env.rs index 6666179c3..0256d6da5 100644 --- a/rust/net/src/env.rs +++ b/rust/net/src/env.rs @@ -7,7 +7,6 @@ use std::collections::HashMap; use std::iter; use std::net::{Ipv4Addr, Ipv6Addr}; use std::num::NonZeroU16; -use std::time::Duration; use const_str::ip_addr; use nonzero_ext::nonzero; @@ -21,8 +20,6 @@ use crate::infra::{ ConnectionParams, DnsSource, HttpRequestDecorator, HttpRequestDecoratorSeq, RouteType, }; -pub(crate) const WS_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5); -pub(crate) const WS_MAX_IDLE_TIME: Duration = Duration::from_secs(15); const DEFAULT_HTTPS_PORT: NonZeroU16 = nonzero!(443_u16); const DOMAIN_CONFIG_CHAT: DomainConfig = DomainConfig { diff --git a/rust/net/src/infra.rs b/rust/net/src/infra.rs index 85fe6a7e7..126375789 100644 --- a/rust/net/src/infra.rs +++ b/rust/net/src/infra.rs @@ -9,7 +9,7 @@ use std::string::ToString; use std::sync::Arc; use std::time::Duration; -use crate::env::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_TIME}; +use crate::timeouts::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_INTERVAL}; use ::http::uri::PathAndQuery; use ::http::Uri; use async_trait::async_trait; @@ -270,7 +270,7 @@ pub struct EndpointConnection { impl EndpointConnection { pub fn new_multi( connection_params: impl IntoIterator, - connect_timeout: Duration, + one_route_connect_timeout: Duration, config: WebSocketConfig, ) -> Self { Self { @@ -278,7 +278,10 @@ impl EndpointConnection { connection_params .into_iter() .map(|params| { - SingleRouteThrottlingConnectionManager::new(params, connect_timeout) + SingleRouteThrottlingConnectionManager::new( + params, + one_route_connect_timeout, + ) }) .collect(), ), @@ -296,7 +299,7 @@ pub fn make_ws_config( endpoint: websocket_endpoint, max_connection_time: connect_timeout, keep_alive_interval: WS_KEEP_ALIVE_INTERVAL, - max_idle_time: WS_MAX_IDLE_TIME, + max_idle_time: WS_MAX_IDLE_INTERVAL, } } diff --git a/rust/net/src/infra/connection_manager.rs b/rust/net/src/infra/connection_manager.rs index 976d17db9..733d29bf5 100644 --- a/rust/net/src/infra/connection_manager.rs +++ b/rust/net/src/infra/connection_manager.rs @@ -11,6 +11,7 @@ use std::panic::RefUnwindSafe; use std::sync::Arc; use std::time::Duration; +use crate::timeouts::{CONNECTION_ROUTE_COOLDOWN_INTERVALS, CONNECTION_ROUTE_MAX_COOLDOWN}; use async_trait::async_trait; use itertools::Itertools; use tokio::sync::Mutex; @@ -19,19 +20,6 @@ use tokio::time::{timeout_at, Instant}; use crate::infra::errors::LogSafeDisplay; use crate::infra::ConnectionParams; -pub(crate) const MAX_COOLDOWN_INTERVAL: Duration = Duration::from_secs(64); - -pub(crate) const COOLDOWN_INTERVALS: [Duration; 8] = [ - Duration::from_secs(0), - Duration::from_secs(1), - Duration::from_secs(2), - Duration::from_secs(4), - Duration::from_secs(8), - Duration::from_secs(16), - Duration::from_secs(32), - MAX_COOLDOWN_INTERVAL, -]; - /// Represents the outcome of the connection attempt #[derive(Debug)] pub enum ConnectionAttemptOutcome { @@ -47,7 +35,7 @@ pub enum ConnectionAttemptOutcome { } /// Encapsulates the logic that for every connection attempt decides -/// whether or not an attempt is to be made in the first place, and, if yes, +/// whether an attempt is to be made in the first place, and, if yes, /// which [ConnectionParams] are to be used for the attempt. #[async_trait] pub trait ConnectionManager: Clone + Send + Sync { @@ -115,13 +103,15 @@ impl ThrottlingConnectionManagerState { } else if attempt_start_time > s.latest_attempt || s.consecutive_fails > 0 { s.latest_attempt = max(attempt_start_time, s.latest_attempt); let idx: usize = s.consecutive_fails.into(); - let cooldown_interval = COOLDOWN_INTERVALS + let cooldown_interval = CONNECTION_ROUTE_COOLDOWN_INTERVALS .get(idx) - .unwrap_or(&MAX_COOLDOWN_INTERVAL); + .unwrap_or(&CONNECTION_ROUTE_MAX_COOLDOWN); s.next_attempt = Instant::now() + *cooldown_interval; s.consecutive_fails = min( s.consecutive_fails.saturating_add(1), - (COOLDOWN_INTERVALS.len() - 1).try_into().unwrap(), + (CONNECTION_ROUTE_COOLDOWN_INTERVALS.len() - 1) + .try_into() + .unwrap(), ); } s @@ -130,7 +120,7 @@ impl ThrottlingConnectionManagerState { /// A connection manager that only attempts one route (i.e. one [ConnectionParams]) /// but keeps track of consecutive failed attempts and after each failure waits for a duration -/// chosen according to [COOLDOWN_INTERVALS] list. +/// chosen according to [CONNECTION_ROUTE_COOLDOWN_INTERVALS] list. #[derive(Clone)] pub struct SingleRouteThrottlingConnectionManager { state: Arc>, @@ -251,7 +241,7 @@ impl SingleRouteThrottlingConnectionManager { state: Arc::new(Mutex::new(ThrottlingConnectionManagerState { consecutive_fails: 0, next_attempt: Instant::now(), - latest_attempt: Instant::now(), + latest_attempt: Instant::now() - Duration::from_nanos(1), })), } } @@ -419,7 +409,7 @@ mod test { assert_matches!(attempt_outcome, ConnectionAttemptOutcome::WaitUntil(_)); // now let's advance the time to the point after the cooldown period - time::advance(MAX_COOLDOWN_INTERVAL).await; + time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await; let attempt_outcome: ConnectionAttemptOutcome<(), TestError> = manager.connect_or_wait(|_| future::ready(Ok(()))).await; assert_matches!(attempt_outcome, ConnectionAttemptOutcome::Attempted(Ok(()))); @@ -450,7 +440,7 @@ mod test { validate_expected_route(&multi_route_manager, true, ROUTE_2).await; // and now after a cooldown period, route1 should be used again - time::advance(MAX_COOLDOWN_INTERVAL).await; + time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await; validate_expected_route(&multi_route_manager, true, ROUTE_1).await; } @@ -504,7 +494,9 @@ mod test { n if n < self.attempts_until_cooldown => ConnectionAttemptOutcome::Attempted( connection_fn(&self.connection_params).await, ), - _ => ConnectionAttemptOutcome::WaitUntil(Instant::now() + MAX_COOLDOWN_INTERVAL), + _ => ConnectionAttemptOutcome::WaitUntil( + Instant::now() + CONNECTION_ROUTE_MAX_COOLDOWN, + ), } } diff --git a/rust/net/src/infra/dns.rs b/rust/net/src/infra/dns.rs index 848d08c94..15a9ed451 100644 --- a/rust/net/src/infra/dns.rs +++ b/rust/net/src/infra/dns.rs @@ -9,6 +9,7 @@ use std::future; use std::sync::{Arc, Mutex}; use std::time::Duration; +use crate::timeouts::{DNS_FALLBACK_LOOKUP_TIMEOUTS, DNS_SYSTEM_LOOKUP_TIMEOUT}; use nonzero_ext::nonzero; use oneshot_broadcast::Sender; use tokio::time::Instant; @@ -97,14 +98,17 @@ impl DnsResolver { RootCertificates::Native, ); let custom_resolver = Box::new(CustomDnsResolver::::new(connection_params)); + let fallback_lookups = DNS_FALLBACK_LOOKUP_TIMEOUTS + .iter() + .map(|timeout| (custom_resolver.clone() as Box, *timeout)); + + let mut lookup_options: Vec<(Box, Duration)> = + Vec::with_capacity(fallback_lookups.len() + 2); + lookup_options.push((Box::new(SystemDnsLookup), DNS_SYSTEM_LOOKUP_TIMEOUT)); + lookup_options.extend(fallback_lookups); + lookup_options.push((Box::new(StaticDnsMap(static_map)), Duration::from_secs(1))); DnsResolver { - lookup_options: Arc::new(vec![ - (Box::new(SystemDnsLookup), Duration::from_secs(2)), - (custom_resolver.clone(), Duration::from_secs(5)), - (custom_resolver.clone(), Duration::from_secs(10)), - (custom_resolver.clone(), Duration::from_secs(15)), - (Box::new(StaticDnsMap(static_map)), Duration::from_secs(1)), - ]), + lookup_options: Arc::new(lookup_options), state: Default::default(), } } diff --git a/rust/net/src/infra/dns/custom_resolver.rs b/rust/net/src/infra/dns/custom_resolver.rs index 1f1889ab4..f4477fa6c 100644 --- a/rust/net/src/infra/dns/custom_resolver.rs +++ b/rust/net/src/infra/dns/custom_resolver.rs @@ -7,8 +7,8 @@ use std::cmp::min; use std::collections::HashMap; use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::Arc; -use std::time::Duration; +use crate::timeouts::{DNS_CALL_BACKGROUND_TIMEOUT, DNS_RESOLUTION_DELAY}; use async_trait::async_trait; use either::Either; use futures_util::stream::BoxStream; @@ -27,9 +27,6 @@ pub type DnsIpv4Result = Expiring>; pub type DnsIpv6Result = Expiring>; pub type DnsQueryResult = Either; -const RESOLUTION_DELAY: Duration = Duration::from_millis(50); -const LONG_TIMEOUT: Duration = Duration::from_secs(30); - /// Implementors of this trait encapsulate the logic of sending queries to the DNS server /// and receiving resposnes. #[async_trait] @@ -120,7 +117,7 @@ impl CustomDnsResolver { .await?; let (ipv4_res_rx, ipv6_res_rx) = self.send_dns_queries(transport, request); let (maybe_ipv4, maybe_ipv6) = - results_within_interval(ipv4_res_rx, ipv6_res_rx, RESOLUTION_DELAY).await; + results_within_interval(ipv4_res_rx, ipv6_res_rx, DNS_RESOLUTION_DELAY).await; let ipv4s = maybe_ipv4.map_or(vec![], |r| r.data); let ipv6s = maybe_ipv6.map_or(vec![], |r| r.data); match LookupResult::new(T::dns_source(), ipv4s, ipv6s) { @@ -153,7 +150,7 @@ impl CustomDnsResolver { // Reference: https://datatracker.ietf.org/doc/html/rfc8305#section-3 tokio::spawn(async move { let started_at = Instant::now(); - let timeout_at = started_at + LONG_TIMEOUT; + let timeout_at = started_at + DNS_CALL_BACKGROUND_TIMEOUT; let mut stream = match transport.send_queries(request.clone()).await { Ok(stream) => stream, @@ -254,7 +251,8 @@ impl CustomDnsResolver { #[cfg(test)] pub(crate) mod test { use crate::infra::dns::custom_resolver::{ - CustomDnsResolver, DnsQueryResult, DnsTransport, LONG_TIMEOUT, RESOLUTION_DELAY, + CustomDnsResolver, DnsQueryResult, DnsTransport, DNS_CALL_BACKGROUND_TIMEOUT, + DNS_RESOLUTION_DELAY, }; use crate::infra::dns::dns_errors::Error; use crate::infra::dns::dns_lookup::DnsLookupRequest; @@ -448,8 +446,8 @@ pub(crate) mod test { async fn works_correctly_when_both_results_are_within_resolution_delay() { let (transport, resolver) = TestDnsTransportWithTwoResponses::transport_and_custom_dns_resolver(|_, q_num, txs| { - let first = LONG_TIMEOUT / 4; - let second = first + RESOLUTION_DELAY / 2; + let first = DNS_CALL_BACKGROUND_TIMEOUT / 4; + let second = first + DNS_RESOLUTION_DELAY / 2; let (timeout_1, timeout_2) = if q_num == 1 { (first, second) } else { @@ -471,8 +469,8 @@ pub(crate) mod test { #[tokio::test(start_paused = true)] async fn works_correctly_when_second_response_is_after_resolution_delay() { let resolver = TestDnsTransportWithTwoResponses::custom_dns_resolver(|_, q_num, txs| { - let first = LONG_TIMEOUT / 4; - let second = first + RESOLUTION_DELAY * 2; + let first = DNS_CALL_BACKGROUND_TIMEOUT / 4; + let second = first + DNS_RESOLUTION_DELAY * 2; let (timeout_1, timeout_2) = if q_num == 1 { (first, second) } else { @@ -508,9 +506,9 @@ pub(crate) mod test { let res_1 = ok_query_result_ipv4(Duration::ZERO, IP_V4_LIST_1); let res_2 = ok_query_result_ipv6(Duration::ZERO, IP_V6_LIST_1); let res_3 = Err(Error::NoData); - let timeout_1 = LONG_TIMEOUT / 4; - let timeout_2 = timeout_1 + RESOLUTION_DELAY / 3; - let timeout_3 = timeout_1 + RESOLUTION_DELAY / 2; + let timeout_1 = DNS_CALL_BACKGROUND_TIMEOUT / 4; + let timeout_2 = timeout_1 + DNS_RESOLUTION_DELAY / 3; + let timeout_3 = timeout_1 + DNS_RESOLUTION_DELAY / 2; respond_after_timeout(timeout_1, tx_1, res_1); respond_after_timeout(timeout_2, tx_2, res_2); respond_after_timeout(timeout_3, tx_3, res_3); @@ -522,7 +520,7 @@ pub(crate) mod test { #[tokio::test(start_paused = true)] async fn returns_second_result_if_first_result_fails() { let resolver = TestDnsTransportWithTwoResponses::custom_dns_resolver(|_, _, txs| { - let timeout_2 = RESOLUTION_DELAY * 2; + let timeout_2 = DNS_RESOLUTION_DELAY * 2; let [tx_1, tx_2] = txs; let res_1 = Err(Error::LookupFailed); let res_2 = ok_query_result_ipv6(Duration::ZERO, IP_V6_LIST_1); @@ -578,8 +576,8 @@ pub(crate) mod test { #[tokio::test(start_paused = true)] async fn results_cached_even_if_received_late() { // second result is sent within the `LONG_TIMEOUT`, but after the `RESOLUTION_DELAY` - let timeout_1 = LONG_TIMEOUT / 4; - let timeout_2 = LONG_TIMEOUT / 2; + let timeout_1 = DNS_CALL_BACKGROUND_TIMEOUT / 4; + let timeout_2 = DNS_CALL_BACKGROUND_TIMEOUT / 2; let all_results_received_time = Instant::now() + timeout_2; let (transport, resolver) = TestDnsTransportWithTwoResponses::transport_and_custom_dns_resolver( diff --git a/rust/net/src/infra/reconnect.rs b/rust/net/src/infra/reconnect.rs index d9d43a7ac..dee98d4bf 100644 --- a/rust/net/src/infra/reconnect.rs +++ b/rust/net/src/infra/reconnect.rs @@ -505,6 +505,7 @@ mod test { use std::sync::{Arc, Mutex}; use std::time::Duration; + use crate::timeouts::{CONNECTION_ROUTE_COOLDOWN_INTERVALS, CONNECTION_ROUTE_MAX_COOLDOWN}; use assert_matches::assert_matches; use async_trait::async_trait; use futures_util::FutureExt; @@ -514,9 +515,7 @@ mod test { use super::*; use crate::infra::certs::RootCertificates; - use crate::infra::connection_manager::{ - SingleRouteThrottlingConnectionManager, COOLDOWN_INTERVALS, MAX_COOLDOWN_INTERVAL, - }; + use crate::infra::connection_manager::SingleRouteThrottlingConnectionManager; use crate::infra::test::shared::{ TestError, LONG_CONNECTION_TIME, NORMAL_CONNECTION_TIME, TIMEOUT_DURATION, TIME_ADVANCE_VALUE, @@ -813,7 +812,7 @@ mod test { // At this point, `service_with_reconnect` tried multiple times to connect // and hit the cooldown. Let's advance time to make sure next attempt will be made. - time::advance(MAX_COOLDOWN_INTERVAL).await; + time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await; connector.set_service_healthy(true); let connection_result = service_with_reconnect.connect_from_inactive().await; @@ -833,7 +832,7 @@ mod test { // At this point, `service_with_reconnect` tried multiple times to connect // and hit the cooldown. Let's advance time to make sure next attempt will be made. - time::advance(MAX_COOLDOWN_INTERVAL).await; + time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await; connector.set_time_to_connect(NORMAL_CONNECTION_TIME); let connection_result = service_with_reconnect.connect_from_inactive().await; @@ -859,11 +858,11 @@ mod test { sleep_and_catch_up(NORMAL_CONNECTION_TIME).await; assert_eq!(connector.attempts.load(Ordering::Relaxed), 2); - sleep_and_catch_up(COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await; + sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await; assert_eq!(connector.attempts.load(Ordering::Relaxed), 3); assert_matches!(service_with_reconnect.service().await, Err(_)); - sleep_and_catch_up(COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await; + sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await; assert_eq!(connector.attempts.load(Ordering::Relaxed), 4); assert_matches!(service_with_reconnect.service().await, Err(_)); @@ -871,7 +870,7 @@ mod test { // letting next cooldown interval pass and checking again connector.set_service_healthy(true); - sleep_and_catch_up(COOLDOWN_INTERVALS[2] + NORMAL_CONNECTION_TIME).await; + sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[2] + NORMAL_CONNECTION_TIME).await; assert_eq!(connector.attempts.load(Ordering::Relaxed), 5); assert_matches!(service_with_reconnect.service().await, Ok(_)); } @@ -895,18 +894,18 @@ mod test { sleep_and_catch_up(NORMAL_CONNECTION_TIME).await; assert_eq!(connector.attempts.load(Ordering::Relaxed), 2); - sleep_and_catch_up(COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await; + sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await; assert_eq!(connector.attempts.load(Ordering::Relaxed), 3); assert_matches!(service_with_reconnect.service().await, Err(_)); - sleep_and_catch_up(COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await; + sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await; assert_eq!(connector.attempts.load(Ordering::Relaxed), 4); assert_matches!(service_with_reconnect.service().await, Err(_)); // now we decide to disconnect, and we need to make sure we're not making // any more attempts service_with_reconnect.disconnect().await; - for interval in COOLDOWN_INTERVALS.into_iter().skip(2) { + for interval in CONNECTION_ROUTE_COOLDOWN_INTERVALS.into_iter().skip(2) { sleep_and_catch_up(interval + NORMAL_CONNECTION_TIME).await; assert_eq!(connector.attempts.load(Ordering::Relaxed), 4); assert_matches!( diff --git a/rust/net/src/infra/tcp_ssl.rs b/rust/net/src/infra/tcp_ssl.rs index f28feed62..21dd1c9a6 100644 --- a/rust/net/src/infra/tcp_ssl.rs +++ b/rust/net/src/infra/tcp_ssl.rs @@ -6,8 +6,8 @@ use std::net::IpAddr; use std::num::NonZeroU16; use std::sync::Arc; -use std::time::Duration; +use crate::timeouts::TCP_CONNECTION_ATTEMPT_DELAY; use async_trait::async_trait; use boring::ssl::{ConnectConfiguration, SslConnector, SslMethod}; use futures_util::TryFutureExt; @@ -24,8 +24,6 @@ use crate::infra::{ }; use crate::utils::first_ok; -const CONNECTION_ATTEMPT_DELAY: Duration = Duration::from_millis(200); - #[derive(Clone)] pub enum TcpSslConnector { Direct(DirectConnector), @@ -240,7 +238,7 @@ async fn connect_tcp( // that incorporates the delay based on its position in the list. // This way we can start all futures at once and simply wait for the first one to complete successfully. let staggered_futures = dns_lookup.into_iter().enumerate().map(|(idx, ip)| { - let delay = CONNECTION_ATTEMPT_DELAY * idx.try_into().unwrap(); + let delay = TCP_CONNECTION_ATTEMPT_DELAY * idx.try_into().unwrap(); async move { if !delay.is_zero() { tokio::time::sleep(delay).await; diff --git a/rust/net/src/infra/ws.rs b/rust/net/src/infra/ws.rs index 548a9e480..3987d2016 100644 --- a/rust/net/src/infra/ws.rs +++ b/rust/net/src/infra/ws.rs @@ -613,8 +613,8 @@ pub(crate) mod testutil { use tokio::io::DuplexStream; use tokio_tungstenite::WebSocketStream; - use crate::env::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_TIME}; use crate::infra::{AsyncDuplexStream, DnsSource, RouteType}; + use crate::timeouts::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_INTERVAL}; use super::*; @@ -645,7 +645,7 @@ pub(crate) mod testutil { channel, mock_connection_info(), WS_KEEP_ALIVE_INTERVAL, - WS_MAX_IDLE_TIME, + WS_MAX_IDLE_INTERVAL, ) .0 } diff --git a/rust/net/src/lib.rs b/rust/net/src/lib.rs index 01de458d3..223e8979f 100644 --- a/rust/net/src/lib.rs +++ b/rust/net/src/lib.rs @@ -12,4 +12,5 @@ pub mod infra; pub mod proto; pub mod svr; pub mod svr3; +pub mod timeouts; pub mod utils; diff --git a/rust/net/src/timeouts.rs b/rust/net/src/timeouts.rs new file mode 100644 index 000000000..a7e2dacec --- /dev/null +++ b/rust/net/src/timeouts.rs @@ -0,0 +1,52 @@ +use std::time::Duration; + +/// Timeout for a system DNS lookup +pub const DNS_SYSTEM_LOOKUP_TIMEOUT: Duration = Duration::from_secs(5); +/// A list of timeouts per each fallback DNS lookup attempt +pub const DNS_FALLBACK_LOOKUP_TIMEOUTS: &[Duration] = &[ + Duration::from_secs(5), + Duration::from_secs(10), + Duration::from_secs(15), +]; +/// If during a DNS resolution we've sent multiple queries (one per IP type) +/// and one of them produced a result, we'll wait this time interval +/// to let the other query complete before proceeding +pub const DNS_RESOLUTION_DELAY: Duration = Duration::from_millis(50); +/// When making a DNS query, a caller is given a result future that it may or may not +/// await on until the result is ready (callers will likely await with a timeout). +/// Regardless of the caller's behavior, DNS resolver will wait this time interval +/// for results to arrive to cache them for the future lookups. +pub const DNS_CALL_BACKGROUND_TIMEOUT: Duration = Duration::from_secs(30); + +/// Frequency of the WebSocket `PING` requests +pub const WS_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(15); +/// Maximum time of incoming packets inactivity allowed on a WebSocket connection +pub const WS_MAX_IDLE_INTERVAL: Duration = Duration::from_secs(45); + +/// Timeout for a connect operation that attempts one route +/// (this includes DNS resolution, TCP connection, and SSL handshake) +pub const ONE_ROUTE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60); + +/// Timeout for a connect operation that attempts multiple routes +pub const MULTI_ROUTE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(180); + +/// When establishing a TCP connection, connections to different IP addresses are +/// raced between each other with each new attempt being given an additional delay +/// before it starts. +pub const TCP_CONNECTION_ATTEMPT_DELAY: Duration = Duration::from_millis(200); + +/// A sequence of timeout values to be used as cooldown intervals between attempts +/// when a connection to a given route is consecutively failing to establish +pub const CONNECTION_ROUTE_COOLDOWN_INTERVALS: [Duration; 8] = [ + Duration::from_secs(0), + Duration::from_secs(1), + Duration::from_secs(2), + Duration::from_secs(4), + Duration::from_secs(8), + Duration::from_secs(16), + Duration::from_secs(32), + CONNECTION_ROUTE_MAX_COOLDOWN, +]; + +/// Maximum value of a coolduwn interval between connection attempts +pub const CONNECTION_ROUTE_MAX_COOLDOWN: Duration = Duration::from_secs(64);