net: consolidating timeout constants

This commit is contained in:
Sergey Skrobotov
2024-05-22 16:50:53 -07:00
committed by GitHub
parent 770dd1bbc2
commit 62f347e866
17 changed files with 164 additions and 93 deletions

View File

@@ -152,19 +152,13 @@ describe('chat service api', () => {
}).timeout(10000);
});
it('cannot connect through an invalid proxy', async () => {
it('cannot connect through an invalid proxy', () => {
// The default TLS proxy config doesn't support staging, so we connect to production.
const net = new Net(Environment.Production, userAgent);
expect(() => net.setProxy('signalfoundation.org', 0)).throws(Error);
expect(() => net.setProxy('signalfoundation.org', 100_000)).throws(Error);
expect(() => net.setProxy('signalfoundation.org', -1)).throws(Error);
expect(() => net.setProxy('signalfoundation.org', 0.1)).throws(Error);
const chatService = net.newChatService();
// Make sure we *can't* connect.
await expect(chatService.connectUnauthenticated()).to.be.rejectedWith(
LibSignalErrorBase
);
}).timeout(10000);
});

View File

@@ -71,6 +71,7 @@ strum = { version = "0.26", features = ["derive"] }
[dev-dependencies]
assert_matches = "1.5"
test-case = "3.3.1"
tokio = { version = "1", features = ["test-util", "time", "macros"] }
[features]
ffi = []

View File

@@ -6,7 +6,6 @@
use std::convert::TryInto as _;
use std::num::{NonZeroU16, NonZeroU32};
use std::panic::RefUnwindSafe;
use std::time::Duration;
use base64::prelude::{Engine, BASE64_STANDARD};
use http::uri::PathAndQuery;
@@ -26,6 +25,7 @@ use libsignal_net::infra::tcp_ssl::{
use libsignal_net::infra::{make_ws_config, EndpointConnection};
use libsignal_net::svr::{self, SvrConnection};
use libsignal_net::svr3::{self, OpaqueMaskedShareSet, PpssOps as _};
use libsignal_net::timeouts::ONE_ROUTE_CONNECTION_TIMEOUT;
use rand::rngs::OsRng;
use crate::support::*;
@@ -68,7 +68,6 @@ pub struct ConnectionManager {
impl RefUnwindSafe for ConnectionManager {}
impl ConnectionManager {
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
fn new(environment: Environment, user_agent: String) -> Self {
log::info!("Initializing connection manager for {}...", &environment);
let dns_resolver =
@@ -81,11 +80,11 @@ impl ConnectionManager {
.chat_domain_config
.connection_params_with_fallback();
let chat_connection_params = add_user_agent_header(chat_connection_params, &user_agent);
let chat_ws_config = make_ws_config(chat_endpoint, Self::DEFAULT_CONNECT_TIMEOUT);
let chat_ws_config = make_ws_config(chat_endpoint, ONE_ROUTE_CONNECTION_TIMEOUT);
Self {
chat: EndpointConnection::new_multi(
chat_connection_params,
Self::DEFAULT_CONNECT_TIMEOUT,
ONE_ROUTE_CONNECTION_TIMEOUT,
chat_ws_config,
),
cdsi: Self::endpoint_connection(&environment.env().cdsi, &user_agent),
@@ -107,7 +106,7 @@ impl ConnectionManager {
EnclaveEndpointConnection::new_multi(
endpoint.mr_enclave,
params,
Self::DEFAULT_CONNECT_TIMEOUT,
ONE_ROUTE_CONNECTION_TIMEOUT,
)
}
}
@@ -271,7 +270,6 @@ async fn svr3_connect<'a>(
#[cfg(test)]
mod test {
use super::*;
use test_case::test_case;
#[test_case(Environment::Staging; "staging")]

View File

@@ -466,3 +466,36 @@ async fn ServerMessageAck_Send(ack: &ServerMessageAck) -> Result<(), ChatService
let future = ack.inner.take().expect("a message is only acked once");
future.await
}
#[cfg(test)]
mod test {
use super::*;
use crate::net::{ConnectionManager, ConnectionManager_set_proxy, Environment};
use assert_matches::assert_matches;
use libsignal_net::chat::ChatServiceError;
#[tokio::test(start_paused = true)]
async fn cannon_connect_through_invalid_proxy() {
let cm = ConnectionManager::new(Environment::Staging, "test-user-agent".to_string());
assert_matches!(
ConnectionManager_set_proxy(&cm, "signalfoundation.org".to_string(), 0),
Err(_)
);
assert_matches!(
ConnectionManager_set_proxy(&cm, "signalfoundation.org".to_string(), 100_000),
Err(_)
);
assert_matches!(
ConnectionManager_set_proxy(&cm, "signalfoundation.org".to_string(), -1),
Err(_)
);
let chat = ChatService_new(&cm, "".to_string(), "".to_string());
assert_matches!(
ChatService_connect_unauth(&chat).await,
Err(ChatServiceError::AllConnectionRoutesFailed { .. })
);
}
}

View File

@@ -99,12 +99,13 @@ async fn test_connection(
env: &libsignal_net::env::Env<'static, Svr3Env<'static>>,
connection_params: Vec<ConnectionParams>,
) -> Result<(), ChatServiceError> {
let one_route_connect_timeout = Duration::from_secs(5);
let dns_resolver = DnsResolver::new_with_static_fallback(env.static_fallback());
let transport_connector = DirectConnector::new(dns_resolver);
let chat_endpoint = PathAndQuery::from_static(WEB_SOCKET_PATH);
let chat_ws_config = make_ws_config(chat_endpoint, Duration::from_secs(5));
let chat_ws_config = make_ws_config(chat_endpoint, one_route_connect_timeout);
let connection =
EndpointConnection::new_multi(connection_params, Duration::from_secs(5), chat_ws_config);
EndpointConnection::new_multi(connection_params, one_route_connect_timeout, chat_ws_config);
let (incoming_tx, _incoming_rx) = mpsc::channel(1);
let chat = chat_service(

View File

@@ -21,7 +21,9 @@ use crate::utils::basic_authorization;
pub mod chat_reconnect;
mod error;
use crate::timeouts::MULTI_ROUTE_CONNECTION_TIMEOUT;
pub use error::ChatServiceError;
pub mod server_requests;
pub mod ws;
@@ -30,8 +32,6 @@ pub type RequestProto = proto::chat_websocket::WebSocketRequestMessage;
pub type ResponseProto = proto::chat_websocket::WebSocketResponseMessage;
pub type ChatMessageType = proto::chat_websocket::web_socket_message::Type;
const TOTAL_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
#[async_trait]
pub trait ChatService {
/// Sends request and gets a response from the Chat Service.
@@ -440,7 +440,7 @@ fn build_authorized_chat_service(
header_auth_decorator.clone(),
),
connection_manager_ws.clone(),
TOTAL_CONNECTION_TIMEOUT,
MULTI_ROUTE_CONNECTION_TIMEOUT,
);
AuthorizedChatService {
@@ -458,7 +458,7 @@ fn build_anonymous_chat_service(
let chat_over_ws_anonymous = ServiceWithReconnect::new(
service_connector_ws.clone(),
connection_manager_ws.clone(),
TOTAL_CONNECTION_TIMEOUT,
MULTI_ROUTE_CONNECTION_TIMEOUT,
);
AnonymousChatService {

View File

@@ -308,13 +308,13 @@ impl<E: EnclaveKind> EnclaveEndpointConnection<E, MultiRouteConnectionManager> {
pub fn new_multi(
mr_enclave: MrEnclave<&'static [u8], E>,
connection_params: impl IntoIterator<Item = ConnectionParams>,
connect_timeout: Duration,
one_route_connect_timeout: Duration,
) -> Self {
Self {
endpoint_connection: EndpointConnection::new_multi(
connection_params,
connect_timeout,
make_ws_config(E::url_path(mr_enclave.as_ref()), connect_timeout),
one_route_connect_timeout,
make_ws_config(E::url_path(mr_enclave.as_ref()), one_route_connect_timeout),
),
params: EndpointParams {
mr_enclave,
@@ -411,7 +411,7 @@ mod test {
}
}
const CONNECT_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(10);
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
async fn enclave_connect<C: ConnectionManager>(
manager: C,

View File

@@ -7,7 +7,6 @@ use std::collections::HashMap;
use std::iter;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::num::NonZeroU16;
use std::time::Duration;
use const_str::ip_addr;
use nonzero_ext::nonzero;
@@ -21,8 +20,6 @@ use crate::infra::{
ConnectionParams, DnsSource, HttpRequestDecorator, HttpRequestDecoratorSeq, RouteType,
};
pub(crate) const WS_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5);
pub(crate) const WS_MAX_IDLE_TIME: Duration = Duration::from_secs(15);
const DEFAULT_HTTPS_PORT: NonZeroU16 = nonzero!(443_u16);
const DOMAIN_CONFIG_CHAT: DomainConfig = DomainConfig {

View File

@@ -9,7 +9,7 @@ use std::string::ToString;
use std::sync::Arc;
use std::time::Duration;
use crate::env::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_TIME};
use crate::timeouts::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_INTERVAL};
use ::http::uri::PathAndQuery;
use ::http::Uri;
use async_trait::async_trait;
@@ -270,7 +270,7 @@ pub struct EndpointConnection<C> {
impl EndpointConnection<MultiRouteConnectionManager> {
pub fn new_multi(
connection_params: impl IntoIterator<Item = ConnectionParams>,
connect_timeout: Duration,
one_route_connect_timeout: Duration,
config: WebSocketConfig,
) -> Self {
Self {
@@ -278,7 +278,10 @@ impl EndpointConnection<MultiRouteConnectionManager> {
connection_params
.into_iter()
.map(|params| {
SingleRouteThrottlingConnectionManager::new(params, connect_timeout)
SingleRouteThrottlingConnectionManager::new(
params,
one_route_connect_timeout,
)
})
.collect(),
),
@@ -296,7 +299,7 @@ pub fn make_ws_config(
endpoint: websocket_endpoint,
max_connection_time: connect_timeout,
keep_alive_interval: WS_KEEP_ALIVE_INTERVAL,
max_idle_time: WS_MAX_IDLE_TIME,
max_idle_time: WS_MAX_IDLE_INTERVAL,
}
}

View File

@@ -11,6 +11,7 @@ use std::panic::RefUnwindSafe;
use std::sync::Arc;
use std::time::Duration;
use crate::timeouts::{CONNECTION_ROUTE_COOLDOWN_INTERVALS, CONNECTION_ROUTE_MAX_COOLDOWN};
use async_trait::async_trait;
use itertools::Itertools;
use tokio::sync::Mutex;
@@ -19,19 +20,6 @@ use tokio::time::{timeout_at, Instant};
use crate::infra::errors::LogSafeDisplay;
use crate::infra::ConnectionParams;
pub(crate) const MAX_COOLDOWN_INTERVAL: Duration = Duration::from_secs(64);
pub(crate) const COOLDOWN_INTERVALS: [Duration; 8] = [
Duration::from_secs(0),
Duration::from_secs(1),
Duration::from_secs(2),
Duration::from_secs(4),
Duration::from_secs(8),
Duration::from_secs(16),
Duration::from_secs(32),
MAX_COOLDOWN_INTERVAL,
];
/// Represents the outcome of the connection attempt
#[derive(Debug)]
pub enum ConnectionAttemptOutcome<T, E> {
@@ -47,7 +35,7 @@ pub enum ConnectionAttemptOutcome<T, E> {
}
/// Encapsulates the logic that for every connection attempt decides
/// whether or not an attempt is to be made in the first place, and, if yes,
/// whether an attempt is to be made in the first place, and, if yes,
/// which [ConnectionParams] are to be used for the attempt.
#[async_trait]
pub trait ConnectionManager: Clone + Send + Sync {
@@ -115,13 +103,15 @@ impl ThrottlingConnectionManagerState {
} else if attempt_start_time > s.latest_attempt || s.consecutive_fails > 0 {
s.latest_attempt = max(attempt_start_time, s.latest_attempt);
let idx: usize = s.consecutive_fails.into();
let cooldown_interval = COOLDOWN_INTERVALS
let cooldown_interval = CONNECTION_ROUTE_COOLDOWN_INTERVALS
.get(idx)
.unwrap_or(&MAX_COOLDOWN_INTERVAL);
.unwrap_or(&CONNECTION_ROUTE_MAX_COOLDOWN);
s.next_attempt = Instant::now() + *cooldown_interval;
s.consecutive_fails = min(
s.consecutive_fails.saturating_add(1),
(COOLDOWN_INTERVALS.len() - 1).try_into().unwrap(),
(CONNECTION_ROUTE_COOLDOWN_INTERVALS.len() - 1)
.try_into()
.unwrap(),
);
}
s
@@ -130,7 +120,7 @@ impl ThrottlingConnectionManagerState {
/// A connection manager that only attempts one route (i.e. one [ConnectionParams])
/// but keeps track of consecutive failed attempts and after each failure waits for a duration
/// chosen according to [COOLDOWN_INTERVALS] list.
/// chosen according to [CONNECTION_ROUTE_COOLDOWN_INTERVALS] list.
#[derive(Clone)]
pub struct SingleRouteThrottlingConnectionManager {
state: Arc<Mutex<ThrottlingConnectionManagerState>>,
@@ -251,7 +241,7 @@ impl SingleRouteThrottlingConnectionManager {
state: Arc::new(Mutex::new(ThrottlingConnectionManagerState {
consecutive_fails: 0,
next_attempt: Instant::now(),
latest_attempt: Instant::now(),
latest_attempt: Instant::now() - Duration::from_nanos(1),
})),
}
}
@@ -419,7 +409,7 @@ mod test {
assert_matches!(attempt_outcome, ConnectionAttemptOutcome::WaitUntil(_));
// now let's advance the time to the point after the cooldown period
time::advance(MAX_COOLDOWN_INTERVAL).await;
time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await;
let attempt_outcome: ConnectionAttemptOutcome<(), TestError> =
manager.connect_or_wait(|_| future::ready(Ok(()))).await;
assert_matches!(attempt_outcome, ConnectionAttemptOutcome::Attempted(Ok(())));
@@ -450,7 +440,7 @@ mod test {
validate_expected_route(&multi_route_manager, true, ROUTE_2).await;
// and now after a cooldown period, route1 should be used again
time::advance(MAX_COOLDOWN_INTERVAL).await;
time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await;
validate_expected_route(&multi_route_manager, true, ROUTE_1).await;
}
@@ -504,7 +494,9 @@ mod test {
n if n < self.attempts_until_cooldown => ConnectionAttemptOutcome::Attempted(
connection_fn(&self.connection_params).await,
),
_ => ConnectionAttemptOutcome::WaitUntil(Instant::now() + MAX_COOLDOWN_INTERVAL),
_ => ConnectionAttemptOutcome::WaitUntil(
Instant::now() + CONNECTION_ROUTE_MAX_COOLDOWN,
),
}
}

View File

@@ -9,6 +9,7 @@ use std::future;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::timeouts::{DNS_FALLBACK_LOOKUP_TIMEOUTS, DNS_SYSTEM_LOOKUP_TIMEOUT};
use nonzero_ext::nonzero;
use oneshot_broadcast::Sender;
use tokio::time::Instant;
@@ -97,14 +98,17 @@ impl DnsResolver {
RootCertificates::Native,
);
let custom_resolver = Box::new(CustomDnsResolver::<DohTransport>::new(connection_params));
let fallback_lookups = DNS_FALLBACK_LOOKUP_TIMEOUTS
.iter()
.map(|timeout| (custom_resolver.clone() as Box<dyn DnsLookup>, *timeout));
let mut lookup_options: Vec<(Box<dyn DnsLookup>, Duration)> =
Vec::with_capacity(fallback_lookups.len() + 2);
lookup_options.push((Box::new(SystemDnsLookup), DNS_SYSTEM_LOOKUP_TIMEOUT));
lookup_options.extend(fallback_lookups);
lookup_options.push((Box::new(StaticDnsMap(static_map)), Duration::from_secs(1)));
DnsResolver {
lookup_options: Arc::new(vec![
(Box::new(SystemDnsLookup), Duration::from_secs(2)),
(custom_resolver.clone(), Duration::from_secs(5)),
(custom_resolver.clone(), Duration::from_secs(10)),
(custom_resolver.clone(), Duration::from_secs(15)),
(Box::new(StaticDnsMap(static_map)), Duration::from_secs(1)),
]),
lookup_options: Arc::new(lookup_options),
state: Default::default(),
}
}

View File

@@ -7,8 +7,8 @@ use std::cmp::min;
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::time::Duration;
use crate::timeouts::{DNS_CALL_BACKGROUND_TIMEOUT, DNS_RESOLUTION_DELAY};
use async_trait::async_trait;
use either::Either;
use futures_util::stream::BoxStream;
@@ -27,9 +27,6 @@ pub type DnsIpv4Result = Expiring<Vec<Ipv4Addr>>;
pub type DnsIpv6Result = Expiring<Vec<Ipv6Addr>>;
pub type DnsQueryResult = Either<DnsIpv4Result, DnsIpv6Result>;
const RESOLUTION_DELAY: Duration = Duration::from_millis(50);
const LONG_TIMEOUT: Duration = Duration::from_secs(30);
/// Implementors of this trait encapsulate the logic of sending queries to the DNS server
/// and receiving resposnes.
#[async_trait]
@@ -120,7 +117,7 @@ impl<T: DnsTransport + Sync + 'static> CustomDnsResolver<T> {
.await?;
let (ipv4_res_rx, ipv6_res_rx) = self.send_dns_queries(transport, request);
let (maybe_ipv4, maybe_ipv6) =
results_within_interval(ipv4_res_rx, ipv6_res_rx, RESOLUTION_DELAY).await;
results_within_interval(ipv4_res_rx, ipv6_res_rx, DNS_RESOLUTION_DELAY).await;
let ipv4s = maybe_ipv4.map_or(vec![], |r| r.data);
let ipv6s = maybe_ipv6.map_or(vec![], |r| r.data);
match LookupResult::new(T::dns_source(), ipv4s, ipv6s) {
@@ -153,7 +150,7 @@ impl<T: DnsTransport + Sync + 'static> CustomDnsResolver<T> {
// Reference: https://datatracker.ietf.org/doc/html/rfc8305#section-3
tokio::spawn(async move {
let started_at = Instant::now();
let timeout_at = started_at + LONG_TIMEOUT;
let timeout_at = started_at + DNS_CALL_BACKGROUND_TIMEOUT;
let mut stream = match transport.send_queries(request.clone()).await {
Ok(stream) => stream,
@@ -254,7 +251,8 @@ impl<T: DnsTransport + Sync + 'static> CustomDnsResolver<T> {
#[cfg(test)]
pub(crate) mod test {
use crate::infra::dns::custom_resolver::{
CustomDnsResolver, DnsQueryResult, DnsTransport, LONG_TIMEOUT, RESOLUTION_DELAY,
CustomDnsResolver, DnsQueryResult, DnsTransport, DNS_CALL_BACKGROUND_TIMEOUT,
DNS_RESOLUTION_DELAY,
};
use crate::infra::dns::dns_errors::Error;
use crate::infra::dns::dns_lookup::DnsLookupRequest;
@@ -448,8 +446,8 @@ pub(crate) mod test {
async fn works_correctly_when_both_results_are_within_resolution_delay() {
let (transport, resolver) =
TestDnsTransportWithTwoResponses::transport_and_custom_dns_resolver(|_, q_num, txs| {
let first = LONG_TIMEOUT / 4;
let second = first + RESOLUTION_DELAY / 2;
let first = DNS_CALL_BACKGROUND_TIMEOUT / 4;
let second = first + DNS_RESOLUTION_DELAY / 2;
let (timeout_1, timeout_2) = if q_num == 1 {
(first, second)
} else {
@@ -471,8 +469,8 @@ pub(crate) mod test {
#[tokio::test(start_paused = true)]
async fn works_correctly_when_second_response_is_after_resolution_delay() {
let resolver = TestDnsTransportWithTwoResponses::custom_dns_resolver(|_, q_num, txs| {
let first = LONG_TIMEOUT / 4;
let second = first + RESOLUTION_DELAY * 2;
let first = DNS_CALL_BACKGROUND_TIMEOUT / 4;
let second = first + DNS_RESOLUTION_DELAY * 2;
let (timeout_1, timeout_2) = if q_num == 1 {
(first, second)
} else {
@@ -508,9 +506,9 @@ pub(crate) mod test {
let res_1 = ok_query_result_ipv4(Duration::ZERO, IP_V4_LIST_1);
let res_2 = ok_query_result_ipv6(Duration::ZERO, IP_V6_LIST_1);
let res_3 = Err(Error::NoData);
let timeout_1 = LONG_TIMEOUT / 4;
let timeout_2 = timeout_1 + RESOLUTION_DELAY / 3;
let timeout_3 = timeout_1 + RESOLUTION_DELAY / 2;
let timeout_1 = DNS_CALL_BACKGROUND_TIMEOUT / 4;
let timeout_2 = timeout_1 + DNS_RESOLUTION_DELAY / 3;
let timeout_3 = timeout_1 + DNS_RESOLUTION_DELAY / 2;
respond_after_timeout(timeout_1, tx_1, res_1);
respond_after_timeout(timeout_2, tx_2, res_2);
respond_after_timeout(timeout_3, tx_3, res_3);
@@ -522,7 +520,7 @@ pub(crate) mod test {
#[tokio::test(start_paused = true)]
async fn returns_second_result_if_first_result_fails() {
let resolver = TestDnsTransportWithTwoResponses::custom_dns_resolver(|_, _, txs| {
let timeout_2 = RESOLUTION_DELAY * 2;
let timeout_2 = DNS_RESOLUTION_DELAY * 2;
let [tx_1, tx_2] = txs;
let res_1 = Err(Error::LookupFailed);
let res_2 = ok_query_result_ipv6(Duration::ZERO, IP_V6_LIST_1);
@@ -578,8 +576,8 @@ pub(crate) mod test {
#[tokio::test(start_paused = true)]
async fn results_cached_even_if_received_late() {
// second result is sent within the `LONG_TIMEOUT`, but after the `RESOLUTION_DELAY`
let timeout_1 = LONG_TIMEOUT / 4;
let timeout_2 = LONG_TIMEOUT / 2;
let timeout_1 = DNS_CALL_BACKGROUND_TIMEOUT / 4;
let timeout_2 = DNS_CALL_BACKGROUND_TIMEOUT / 2;
let all_results_received_time = Instant::now() + timeout_2;
let (transport, resolver) =
TestDnsTransportWithTwoResponses::transport_and_custom_dns_resolver(

View File

@@ -505,6 +505,7 @@ mod test {
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::timeouts::{CONNECTION_ROUTE_COOLDOWN_INTERVALS, CONNECTION_ROUTE_MAX_COOLDOWN};
use assert_matches::assert_matches;
use async_trait::async_trait;
use futures_util::FutureExt;
@@ -514,9 +515,7 @@ mod test {
use super::*;
use crate::infra::certs::RootCertificates;
use crate::infra::connection_manager::{
SingleRouteThrottlingConnectionManager, COOLDOWN_INTERVALS, MAX_COOLDOWN_INTERVAL,
};
use crate::infra::connection_manager::SingleRouteThrottlingConnectionManager;
use crate::infra::test::shared::{
TestError, LONG_CONNECTION_TIME, NORMAL_CONNECTION_TIME, TIMEOUT_DURATION,
TIME_ADVANCE_VALUE,
@@ -813,7 +812,7 @@ mod test {
// At this point, `service_with_reconnect` tried multiple times to connect
// and hit the cooldown. Let's advance time to make sure next attempt will be made.
time::advance(MAX_COOLDOWN_INTERVAL).await;
time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await;
connector.set_service_healthy(true);
let connection_result = service_with_reconnect.connect_from_inactive().await;
@@ -833,7 +832,7 @@ mod test {
// At this point, `service_with_reconnect` tried multiple times to connect
// and hit the cooldown. Let's advance time to make sure next attempt will be made.
time::advance(MAX_COOLDOWN_INTERVAL).await;
time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await;
connector.set_time_to_connect(NORMAL_CONNECTION_TIME);
let connection_result = service_with_reconnect.connect_from_inactive().await;
@@ -859,11 +858,11 @@ mod test {
sleep_and_catch_up(NORMAL_CONNECTION_TIME).await;
assert_eq!(connector.attempts.load(Ordering::Relaxed), 2);
sleep_and_catch_up(COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await;
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await;
assert_eq!(connector.attempts.load(Ordering::Relaxed), 3);
assert_matches!(service_with_reconnect.service().await, Err(_));
sleep_and_catch_up(COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await;
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await;
assert_eq!(connector.attempts.load(Ordering::Relaxed), 4);
assert_matches!(service_with_reconnect.service().await, Err(_));
@@ -871,7 +870,7 @@ mod test {
// letting next cooldown interval pass and checking again
connector.set_service_healthy(true);
sleep_and_catch_up(COOLDOWN_INTERVALS[2] + NORMAL_CONNECTION_TIME).await;
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[2] + NORMAL_CONNECTION_TIME).await;
assert_eq!(connector.attempts.load(Ordering::Relaxed), 5);
assert_matches!(service_with_reconnect.service().await, Ok(_));
}
@@ -895,18 +894,18 @@ mod test {
sleep_and_catch_up(NORMAL_CONNECTION_TIME).await;
assert_eq!(connector.attempts.load(Ordering::Relaxed), 2);
sleep_and_catch_up(COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await;
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await;
assert_eq!(connector.attempts.load(Ordering::Relaxed), 3);
assert_matches!(service_with_reconnect.service().await, Err(_));
sleep_and_catch_up(COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await;
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await;
assert_eq!(connector.attempts.load(Ordering::Relaxed), 4);
assert_matches!(service_with_reconnect.service().await, Err(_));
// now we decide to disconnect, and we need to make sure we're not making
// any more attempts
service_with_reconnect.disconnect().await;
for interval in COOLDOWN_INTERVALS.into_iter().skip(2) {
for interval in CONNECTION_ROUTE_COOLDOWN_INTERVALS.into_iter().skip(2) {
sleep_and_catch_up(interval + NORMAL_CONNECTION_TIME).await;
assert_eq!(connector.attempts.load(Ordering::Relaxed), 4);
assert_matches!(

View File

@@ -6,8 +6,8 @@
use std::net::IpAddr;
use std::num::NonZeroU16;
use std::sync::Arc;
use std::time::Duration;
use crate::timeouts::TCP_CONNECTION_ATTEMPT_DELAY;
use async_trait::async_trait;
use boring::ssl::{ConnectConfiguration, SslConnector, SslMethod};
use futures_util::TryFutureExt;
@@ -24,8 +24,6 @@ use crate::infra::{
};
use crate::utils::first_ok;
const CONNECTION_ATTEMPT_DELAY: Duration = Duration::from_millis(200);
#[derive(Clone)]
pub enum TcpSslConnector {
Direct(DirectConnector),
@@ -240,7 +238,7 @@ async fn connect_tcp(
// that incorporates the delay based on its position in the list.
// This way we can start all futures at once and simply wait for the first one to complete successfully.
let staggered_futures = dns_lookup.into_iter().enumerate().map(|(idx, ip)| {
let delay = CONNECTION_ATTEMPT_DELAY * idx.try_into().unwrap();
let delay = TCP_CONNECTION_ATTEMPT_DELAY * idx.try_into().unwrap();
async move {
if !delay.is_zero() {
tokio::time::sleep(delay).await;

View File

@@ -613,8 +613,8 @@ pub(crate) mod testutil {
use tokio::io::DuplexStream;
use tokio_tungstenite::WebSocketStream;
use crate::env::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_TIME};
use crate::infra::{AsyncDuplexStream, DnsSource, RouteType};
use crate::timeouts::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_INTERVAL};
use super::*;
@@ -645,7 +645,7 @@ pub(crate) mod testutil {
channel,
mock_connection_info(),
WS_KEEP_ALIVE_INTERVAL,
WS_MAX_IDLE_TIME,
WS_MAX_IDLE_INTERVAL,
)
.0
}

View File

@@ -12,4 +12,5 @@ pub mod infra;
pub mod proto;
pub mod svr;
pub mod svr3;
pub mod timeouts;
pub mod utils;

52
rust/net/src/timeouts.rs Normal file
View File

@@ -0,0 +1,52 @@
use std::time::Duration;
/// Timeout for a system DNS lookup
pub const DNS_SYSTEM_LOOKUP_TIMEOUT: Duration = Duration::from_secs(5);
/// A list of timeouts per each fallback DNS lookup attempt
pub const DNS_FALLBACK_LOOKUP_TIMEOUTS: &[Duration] = &[
Duration::from_secs(5),
Duration::from_secs(10),
Duration::from_secs(15),
];
/// If during a DNS resolution we've sent multiple queries (one per IP type)
/// and one of them produced a result, we'll wait this time interval
/// to let the other query complete before proceeding
pub const DNS_RESOLUTION_DELAY: Duration = Duration::from_millis(50);
/// When making a DNS query, a caller is given a result future that it may or may not
/// await on until the result is ready (callers will likely await with a timeout).
/// Regardless of the caller's behavior, DNS resolver will wait this time interval
/// for results to arrive to cache them for the future lookups.
pub const DNS_CALL_BACKGROUND_TIMEOUT: Duration = Duration::from_secs(30);
/// Frequency of the WebSocket `PING` requests
pub const WS_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(15);
/// Maximum time of incoming packets inactivity allowed on a WebSocket connection
pub const WS_MAX_IDLE_INTERVAL: Duration = Duration::from_secs(45);
/// Timeout for a connect operation that attempts one route
/// (this includes DNS resolution, TCP connection, and SSL handshake)
pub const ONE_ROUTE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
/// Timeout for a connect operation that attempts multiple routes
pub const MULTI_ROUTE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(180);
/// When establishing a TCP connection, connections to different IP addresses are
/// raced between each other with each new attempt being given an additional delay
/// before it starts.
pub const TCP_CONNECTION_ATTEMPT_DELAY: Duration = Duration::from_millis(200);
/// A sequence of timeout values to be used as cooldown intervals between attempts
/// when a connection to a given route is consecutively failing to establish
pub const CONNECTION_ROUTE_COOLDOWN_INTERVALS: [Duration; 8] = [
Duration::from_secs(0),
Duration::from_secs(1),
Duration::from_secs(2),
Duration::from_secs(4),
Duration::from_secs(8),
Duration::from_secs(16),
Duration::from_secs(32),
CONNECTION_ROUTE_MAX_COOLDOWN,
];
/// Maximum value of a coolduwn interval between connection attempts
pub const CONNECTION_ROUTE_MAX_COOLDOWN: Duration = Duration::from_secs(64);