mirror of
https://github.com/signalapp/libsignal.git
synced 2026-05-08 16:13:04 +02:00
net: consolidating timeout constants
This commit is contained in:
@@ -152,19 +152,13 @@ describe('chat service api', () => {
|
||||
}).timeout(10000);
|
||||
});
|
||||
|
||||
it('cannot connect through an invalid proxy', async () => {
|
||||
it('cannot connect through an invalid proxy', () => {
|
||||
// The default TLS proxy config doesn't support staging, so we connect to production.
|
||||
const net = new Net(Environment.Production, userAgent);
|
||||
expect(() => net.setProxy('signalfoundation.org', 0)).throws(Error);
|
||||
expect(() => net.setProxy('signalfoundation.org', 100_000)).throws(Error);
|
||||
expect(() => net.setProxy('signalfoundation.org', -1)).throws(Error);
|
||||
expect(() => net.setProxy('signalfoundation.org', 0.1)).throws(Error);
|
||||
|
||||
const chatService = net.newChatService();
|
||||
// Make sure we *can't* connect.
|
||||
await expect(chatService.connectUnauthenticated()).to.be.rejectedWith(
|
||||
LibSignalErrorBase
|
||||
);
|
||||
}).timeout(10000);
|
||||
});
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ strum = { version = "0.26", features = ["derive"] }
|
||||
[dev-dependencies]
|
||||
assert_matches = "1.5"
|
||||
test-case = "3.3.1"
|
||||
tokio = { version = "1", features = ["test-util", "time", "macros"] }
|
||||
|
||||
[features]
|
||||
ffi = []
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
use std::convert::TryInto as _;
|
||||
use std::num::{NonZeroU16, NonZeroU32};
|
||||
use std::panic::RefUnwindSafe;
|
||||
use std::time::Duration;
|
||||
|
||||
use base64::prelude::{Engine, BASE64_STANDARD};
|
||||
use http::uri::PathAndQuery;
|
||||
@@ -26,6 +25,7 @@ use libsignal_net::infra::tcp_ssl::{
|
||||
use libsignal_net::infra::{make_ws_config, EndpointConnection};
|
||||
use libsignal_net::svr::{self, SvrConnection};
|
||||
use libsignal_net::svr3::{self, OpaqueMaskedShareSet, PpssOps as _};
|
||||
use libsignal_net::timeouts::ONE_ROUTE_CONNECTION_TIMEOUT;
|
||||
use rand::rngs::OsRng;
|
||||
|
||||
use crate::support::*;
|
||||
@@ -68,7 +68,6 @@ pub struct ConnectionManager {
|
||||
impl RefUnwindSafe for ConnectionManager {}
|
||||
|
||||
impl ConnectionManager {
|
||||
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
fn new(environment: Environment, user_agent: String) -> Self {
|
||||
log::info!("Initializing connection manager for {}...", &environment);
|
||||
let dns_resolver =
|
||||
@@ -81,11 +80,11 @@ impl ConnectionManager {
|
||||
.chat_domain_config
|
||||
.connection_params_with_fallback();
|
||||
let chat_connection_params = add_user_agent_header(chat_connection_params, &user_agent);
|
||||
let chat_ws_config = make_ws_config(chat_endpoint, Self::DEFAULT_CONNECT_TIMEOUT);
|
||||
let chat_ws_config = make_ws_config(chat_endpoint, ONE_ROUTE_CONNECTION_TIMEOUT);
|
||||
Self {
|
||||
chat: EndpointConnection::new_multi(
|
||||
chat_connection_params,
|
||||
Self::DEFAULT_CONNECT_TIMEOUT,
|
||||
ONE_ROUTE_CONNECTION_TIMEOUT,
|
||||
chat_ws_config,
|
||||
),
|
||||
cdsi: Self::endpoint_connection(&environment.env().cdsi, &user_agent),
|
||||
@@ -107,7 +106,7 @@ impl ConnectionManager {
|
||||
EnclaveEndpointConnection::new_multi(
|
||||
endpoint.mr_enclave,
|
||||
params,
|
||||
Self::DEFAULT_CONNECT_TIMEOUT,
|
||||
ONE_ROUTE_CONNECTION_TIMEOUT,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -271,7 +270,6 @@ async fn svr3_connect<'a>(
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
use test_case::test_case;
|
||||
|
||||
#[test_case(Environment::Staging; "staging")]
|
||||
|
||||
@@ -466,3 +466,36 @@ async fn ServerMessageAck_Send(ack: &ServerMessageAck) -> Result<(), ChatService
|
||||
let future = ack.inner.take().expect("a message is only acked once");
|
||||
future.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::net::{ConnectionManager, ConnectionManager_set_proxy, Environment};
|
||||
use assert_matches::assert_matches;
|
||||
use libsignal_net::chat::ChatServiceError;
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn cannon_connect_through_invalid_proxy() {
|
||||
let cm = ConnectionManager::new(Environment::Staging, "test-user-agent".to_string());
|
||||
|
||||
assert_matches!(
|
||||
ConnectionManager_set_proxy(&cm, "signalfoundation.org".to_string(), 0),
|
||||
Err(_)
|
||||
);
|
||||
assert_matches!(
|
||||
ConnectionManager_set_proxy(&cm, "signalfoundation.org".to_string(), 100_000),
|
||||
Err(_)
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
ConnectionManager_set_proxy(&cm, "signalfoundation.org".to_string(), -1),
|
||||
Err(_)
|
||||
);
|
||||
|
||||
let chat = ChatService_new(&cm, "".to_string(), "".to_string());
|
||||
assert_matches!(
|
||||
ChatService_connect_unauth(&chat).await,
|
||||
Err(ChatServiceError::AllConnectionRoutesFailed { .. })
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,12 +99,13 @@ async fn test_connection(
|
||||
env: &libsignal_net::env::Env<'static, Svr3Env<'static>>,
|
||||
connection_params: Vec<ConnectionParams>,
|
||||
) -> Result<(), ChatServiceError> {
|
||||
let one_route_connect_timeout = Duration::from_secs(5);
|
||||
let dns_resolver = DnsResolver::new_with_static_fallback(env.static_fallback());
|
||||
let transport_connector = DirectConnector::new(dns_resolver);
|
||||
let chat_endpoint = PathAndQuery::from_static(WEB_SOCKET_PATH);
|
||||
let chat_ws_config = make_ws_config(chat_endpoint, Duration::from_secs(5));
|
||||
let chat_ws_config = make_ws_config(chat_endpoint, one_route_connect_timeout);
|
||||
let connection =
|
||||
EndpointConnection::new_multi(connection_params, Duration::from_secs(5), chat_ws_config);
|
||||
EndpointConnection::new_multi(connection_params, one_route_connect_timeout, chat_ws_config);
|
||||
|
||||
let (incoming_tx, _incoming_rx) = mpsc::channel(1);
|
||||
let chat = chat_service(
|
||||
|
||||
@@ -21,7 +21,9 @@ use crate::utils::basic_authorization;
|
||||
|
||||
pub mod chat_reconnect;
|
||||
mod error;
|
||||
use crate::timeouts::MULTI_ROUTE_CONNECTION_TIMEOUT;
|
||||
pub use error::ChatServiceError;
|
||||
|
||||
pub mod server_requests;
|
||||
pub mod ws;
|
||||
|
||||
@@ -30,8 +32,6 @@ pub type RequestProto = proto::chat_websocket::WebSocketRequestMessage;
|
||||
pub type ResponseProto = proto::chat_websocket::WebSocketResponseMessage;
|
||||
pub type ChatMessageType = proto::chat_websocket::web_socket_message::Type;
|
||||
|
||||
const TOTAL_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
#[async_trait]
|
||||
pub trait ChatService {
|
||||
/// Sends request and gets a response from the Chat Service.
|
||||
@@ -440,7 +440,7 @@ fn build_authorized_chat_service(
|
||||
header_auth_decorator.clone(),
|
||||
),
|
||||
connection_manager_ws.clone(),
|
||||
TOTAL_CONNECTION_TIMEOUT,
|
||||
MULTI_ROUTE_CONNECTION_TIMEOUT,
|
||||
);
|
||||
|
||||
AuthorizedChatService {
|
||||
@@ -458,7 +458,7 @@ fn build_anonymous_chat_service(
|
||||
let chat_over_ws_anonymous = ServiceWithReconnect::new(
|
||||
service_connector_ws.clone(),
|
||||
connection_manager_ws.clone(),
|
||||
TOTAL_CONNECTION_TIMEOUT,
|
||||
MULTI_ROUTE_CONNECTION_TIMEOUT,
|
||||
);
|
||||
|
||||
AnonymousChatService {
|
||||
|
||||
@@ -308,13 +308,13 @@ impl<E: EnclaveKind> EnclaveEndpointConnection<E, MultiRouteConnectionManager> {
|
||||
pub fn new_multi(
|
||||
mr_enclave: MrEnclave<&'static [u8], E>,
|
||||
connection_params: impl IntoIterator<Item = ConnectionParams>,
|
||||
connect_timeout: Duration,
|
||||
one_route_connect_timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
endpoint_connection: EndpointConnection::new_multi(
|
||||
connection_params,
|
||||
connect_timeout,
|
||||
make_ws_config(E::url_path(mr_enclave.as_ref()), connect_timeout),
|
||||
one_route_connect_timeout,
|
||||
make_ws_config(E::url_path(mr_enclave.as_ref()), one_route_connect_timeout),
|
||||
),
|
||||
params: EndpointParams {
|
||||
mr_enclave,
|
||||
@@ -411,7 +411,7 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
const CONNECT_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(10);
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
async fn enclave_connect<C: ConnectionManager>(
|
||||
manager: C,
|
||||
|
||||
@@ -7,7 +7,6 @@ use std::collections::HashMap;
|
||||
use std::iter;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
use std::num::NonZeroU16;
|
||||
use std::time::Duration;
|
||||
|
||||
use const_str::ip_addr;
|
||||
use nonzero_ext::nonzero;
|
||||
@@ -21,8 +20,6 @@ use crate::infra::{
|
||||
ConnectionParams, DnsSource, HttpRequestDecorator, HttpRequestDecoratorSeq, RouteType,
|
||||
};
|
||||
|
||||
pub(crate) const WS_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5);
|
||||
pub(crate) const WS_MAX_IDLE_TIME: Duration = Duration::from_secs(15);
|
||||
const DEFAULT_HTTPS_PORT: NonZeroU16 = nonzero!(443_u16);
|
||||
|
||||
const DOMAIN_CONFIG_CHAT: DomainConfig = DomainConfig {
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::string::ToString;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::env::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_TIME};
|
||||
use crate::timeouts::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_INTERVAL};
|
||||
use ::http::uri::PathAndQuery;
|
||||
use ::http::Uri;
|
||||
use async_trait::async_trait;
|
||||
@@ -270,7 +270,7 @@ pub struct EndpointConnection<C> {
|
||||
impl EndpointConnection<MultiRouteConnectionManager> {
|
||||
pub fn new_multi(
|
||||
connection_params: impl IntoIterator<Item = ConnectionParams>,
|
||||
connect_timeout: Duration,
|
||||
one_route_connect_timeout: Duration,
|
||||
config: WebSocketConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -278,7 +278,10 @@ impl EndpointConnection<MultiRouteConnectionManager> {
|
||||
connection_params
|
||||
.into_iter()
|
||||
.map(|params| {
|
||||
SingleRouteThrottlingConnectionManager::new(params, connect_timeout)
|
||||
SingleRouteThrottlingConnectionManager::new(
|
||||
params,
|
||||
one_route_connect_timeout,
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
@@ -296,7 +299,7 @@ pub fn make_ws_config(
|
||||
endpoint: websocket_endpoint,
|
||||
max_connection_time: connect_timeout,
|
||||
keep_alive_interval: WS_KEEP_ALIVE_INTERVAL,
|
||||
max_idle_time: WS_MAX_IDLE_TIME,
|
||||
max_idle_time: WS_MAX_IDLE_INTERVAL,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ use std::panic::RefUnwindSafe;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::timeouts::{CONNECTION_ROUTE_COOLDOWN_INTERVALS, CONNECTION_ROUTE_MAX_COOLDOWN};
|
||||
use async_trait::async_trait;
|
||||
use itertools::Itertools;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -19,19 +20,6 @@ use tokio::time::{timeout_at, Instant};
|
||||
use crate::infra::errors::LogSafeDisplay;
|
||||
use crate::infra::ConnectionParams;
|
||||
|
||||
pub(crate) const MAX_COOLDOWN_INTERVAL: Duration = Duration::from_secs(64);
|
||||
|
||||
pub(crate) const COOLDOWN_INTERVALS: [Duration; 8] = [
|
||||
Duration::from_secs(0),
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(2),
|
||||
Duration::from_secs(4),
|
||||
Duration::from_secs(8),
|
||||
Duration::from_secs(16),
|
||||
Duration::from_secs(32),
|
||||
MAX_COOLDOWN_INTERVAL,
|
||||
];
|
||||
|
||||
/// Represents the outcome of the connection attempt
|
||||
#[derive(Debug)]
|
||||
pub enum ConnectionAttemptOutcome<T, E> {
|
||||
@@ -47,7 +35,7 @@ pub enum ConnectionAttemptOutcome<T, E> {
|
||||
}
|
||||
|
||||
/// Encapsulates the logic that for every connection attempt decides
|
||||
/// whether or not an attempt is to be made in the first place, and, if yes,
|
||||
/// whether an attempt is to be made in the first place, and, if yes,
|
||||
/// which [ConnectionParams] are to be used for the attempt.
|
||||
#[async_trait]
|
||||
pub trait ConnectionManager: Clone + Send + Sync {
|
||||
@@ -115,13 +103,15 @@ impl ThrottlingConnectionManagerState {
|
||||
} else if attempt_start_time > s.latest_attempt || s.consecutive_fails > 0 {
|
||||
s.latest_attempt = max(attempt_start_time, s.latest_attempt);
|
||||
let idx: usize = s.consecutive_fails.into();
|
||||
let cooldown_interval = COOLDOWN_INTERVALS
|
||||
let cooldown_interval = CONNECTION_ROUTE_COOLDOWN_INTERVALS
|
||||
.get(idx)
|
||||
.unwrap_or(&MAX_COOLDOWN_INTERVAL);
|
||||
.unwrap_or(&CONNECTION_ROUTE_MAX_COOLDOWN);
|
||||
s.next_attempt = Instant::now() + *cooldown_interval;
|
||||
s.consecutive_fails = min(
|
||||
s.consecutive_fails.saturating_add(1),
|
||||
(COOLDOWN_INTERVALS.len() - 1).try_into().unwrap(),
|
||||
(CONNECTION_ROUTE_COOLDOWN_INTERVALS.len() - 1)
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
s
|
||||
@@ -130,7 +120,7 @@ impl ThrottlingConnectionManagerState {
|
||||
|
||||
/// A connection manager that only attempts one route (i.e. one [ConnectionParams])
|
||||
/// but keeps track of consecutive failed attempts and after each failure waits for a duration
|
||||
/// chosen according to [COOLDOWN_INTERVALS] list.
|
||||
/// chosen according to [CONNECTION_ROUTE_COOLDOWN_INTERVALS] list.
|
||||
#[derive(Clone)]
|
||||
pub struct SingleRouteThrottlingConnectionManager {
|
||||
state: Arc<Mutex<ThrottlingConnectionManagerState>>,
|
||||
@@ -251,7 +241,7 @@ impl SingleRouteThrottlingConnectionManager {
|
||||
state: Arc::new(Mutex::new(ThrottlingConnectionManagerState {
|
||||
consecutive_fails: 0,
|
||||
next_attempt: Instant::now(),
|
||||
latest_attempt: Instant::now(),
|
||||
latest_attempt: Instant::now() - Duration::from_nanos(1),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -419,7 +409,7 @@ mod test {
|
||||
assert_matches!(attempt_outcome, ConnectionAttemptOutcome::WaitUntil(_));
|
||||
|
||||
// now let's advance the time to the point after the cooldown period
|
||||
time::advance(MAX_COOLDOWN_INTERVAL).await;
|
||||
time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await;
|
||||
let attempt_outcome: ConnectionAttemptOutcome<(), TestError> =
|
||||
manager.connect_or_wait(|_| future::ready(Ok(()))).await;
|
||||
assert_matches!(attempt_outcome, ConnectionAttemptOutcome::Attempted(Ok(())));
|
||||
@@ -450,7 +440,7 @@ mod test {
|
||||
validate_expected_route(&multi_route_manager, true, ROUTE_2).await;
|
||||
|
||||
// and now after a cooldown period, route1 should be used again
|
||||
time::advance(MAX_COOLDOWN_INTERVAL).await;
|
||||
time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await;
|
||||
validate_expected_route(&multi_route_manager, true, ROUTE_1).await;
|
||||
}
|
||||
|
||||
@@ -504,7 +494,9 @@ mod test {
|
||||
n if n < self.attempts_until_cooldown => ConnectionAttemptOutcome::Attempted(
|
||||
connection_fn(&self.connection_params).await,
|
||||
),
|
||||
_ => ConnectionAttemptOutcome::WaitUntil(Instant::now() + MAX_COOLDOWN_INTERVAL),
|
||||
_ => ConnectionAttemptOutcome::WaitUntil(
|
||||
Instant::now() + CONNECTION_ROUTE_MAX_COOLDOWN,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::future;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::timeouts::{DNS_FALLBACK_LOOKUP_TIMEOUTS, DNS_SYSTEM_LOOKUP_TIMEOUT};
|
||||
use nonzero_ext::nonzero;
|
||||
use oneshot_broadcast::Sender;
|
||||
use tokio::time::Instant;
|
||||
@@ -97,14 +98,17 @@ impl DnsResolver {
|
||||
RootCertificates::Native,
|
||||
);
|
||||
let custom_resolver = Box::new(CustomDnsResolver::<DohTransport>::new(connection_params));
|
||||
let fallback_lookups = DNS_FALLBACK_LOOKUP_TIMEOUTS
|
||||
.iter()
|
||||
.map(|timeout| (custom_resolver.clone() as Box<dyn DnsLookup>, *timeout));
|
||||
|
||||
let mut lookup_options: Vec<(Box<dyn DnsLookup>, Duration)> =
|
||||
Vec::with_capacity(fallback_lookups.len() + 2);
|
||||
lookup_options.push((Box::new(SystemDnsLookup), DNS_SYSTEM_LOOKUP_TIMEOUT));
|
||||
lookup_options.extend(fallback_lookups);
|
||||
lookup_options.push((Box::new(StaticDnsMap(static_map)), Duration::from_secs(1)));
|
||||
DnsResolver {
|
||||
lookup_options: Arc::new(vec![
|
||||
(Box::new(SystemDnsLookup), Duration::from_secs(2)),
|
||||
(custom_resolver.clone(), Duration::from_secs(5)),
|
||||
(custom_resolver.clone(), Duration::from_secs(10)),
|
||||
(custom_resolver.clone(), Duration::from_secs(15)),
|
||||
(Box::new(StaticDnsMap(static_map)), Duration::from_secs(1)),
|
||||
]),
|
||||
lookup_options: Arc::new(lookup_options),
|
||||
state: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,8 +7,8 @@ use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::timeouts::{DNS_CALL_BACKGROUND_TIMEOUT, DNS_RESOLUTION_DELAY};
|
||||
use async_trait::async_trait;
|
||||
use either::Either;
|
||||
use futures_util::stream::BoxStream;
|
||||
@@ -27,9 +27,6 @@ pub type DnsIpv4Result = Expiring<Vec<Ipv4Addr>>;
|
||||
pub type DnsIpv6Result = Expiring<Vec<Ipv6Addr>>;
|
||||
pub type DnsQueryResult = Either<DnsIpv4Result, DnsIpv6Result>;
|
||||
|
||||
const RESOLUTION_DELAY: Duration = Duration::from_millis(50);
|
||||
const LONG_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Implementors of this trait encapsulate the logic of sending queries to the DNS server
|
||||
/// and receiving resposnes.
|
||||
#[async_trait]
|
||||
@@ -120,7 +117,7 @@ impl<T: DnsTransport + Sync + 'static> CustomDnsResolver<T> {
|
||||
.await?;
|
||||
let (ipv4_res_rx, ipv6_res_rx) = self.send_dns_queries(transport, request);
|
||||
let (maybe_ipv4, maybe_ipv6) =
|
||||
results_within_interval(ipv4_res_rx, ipv6_res_rx, RESOLUTION_DELAY).await;
|
||||
results_within_interval(ipv4_res_rx, ipv6_res_rx, DNS_RESOLUTION_DELAY).await;
|
||||
let ipv4s = maybe_ipv4.map_or(vec![], |r| r.data);
|
||||
let ipv6s = maybe_ipv6.map_or(vec![], |r| r.data);
|
||||
match LookupResult::new(T::dns_source(), ipv4s, ipv6s) {
|
||||
@@ -153,7 +150,7 @@ impl<T: DnsTransport + Sync + 'static> CustomDnsResolver<T> {
|
||||
// Reference: https://datatracker.ietf.org/doc/html/rfc8305#section-3
|
||||
tokio::spawn(async move {
|
||||
let started_at = Instant::now();
|
||||
let timeout_at = started_at + LONG_TIMEOUT;
|
||||
let timeout_at = started_at + DNS_CALL_BACKGROUND_TIMEOUT;
|
||||
|
||||
let mut stream = match transport.send_queries(request.clone()).await {
|
||||
Ok(stream) => stream,
|
||||
@@ -254,7 +251,8 @@ impl<T: DnsTransport + Sync + 'static> CustomDnsResolver<T> {
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test {
|
||||
use crate::infra::dns::custom_resolver::{
|
||||
CustomDnsResolver, DnsQueryResult, DnsTransport, LONG_TIMEOUT, RESOLUTION_DELAY,
|
||||
CustomDnsResolver, DnsQueryResult, DnsTransport, DNS_CALL_BACKGROUND_TIMEOUT,
|
||||
DNS_RESOLUTION_DELAY,
|
||||
};
|
||||
use crate::infra::dns::dns_errors::Error;
|
||||
use crate::infra::dns::dns_lookup::DnsLookupRequest;
|
||||
@@ -448,8 +446,8 @@ pub(crate) mod test {
|
||||
async fn works_correctly_when_both_results_are_within_resolution_delay() {
|
||||
let (transport, resolver) =
|
||||
TestDnsTransportWithTwoResponses::transport_and_custom_dns_resolver(|_, q_num, txs| {
|
||||
let first = LONG_TIMEOUT / 4;
|
||||
let second = first + RESOLUTION_DELAY / 2;
|
||||
let first = DNS_CALL_BACKGROUND_TIMEOUT / 4;
|
||||
let second = first + DNS_RESOLUTION_DELAY / 2;
|
||||
let (timeout_1, timeout_2) = if q_num == 1 {
|
||||
(first, second)
|
||||
} else {
|
||||
@@ -471,8 +469,8 @@ pub(crate) mod test {
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn works_correctly_when_second_response_is_after_resolution_delay() {
|
||||
let resolver = TestDnsTransportWithTwoResponses::custom_dns_resolver(|_, q_num, txs| {
|
||||
let first = LONG_TIMEOUT / 4;
|
||||
let second = first + RESOLUTION_DELAY * 2;
|
||||
let first = DNS_CALL_BACKGROUND_TIMEOUT / 4;
|
||||
let second = first + DNS_RESOLUTION_DELAY * 2;
|
||||
let (timeout_1, timeout_2) = if q_num == 1 {
|
||||
(first, second)
|
||||
} else {
|
||||
@@ -508,9 +506,9 @@ pub(crate) mod test {
|
||||
let res_1 = ok_query_result_ipv4(Duration::ZERO, IP_V4_LIST_1);
|
||||
let res_2 = ok_query_result_ipv6(Duration::ZERO, IP_V6_LIST_1);
|
||||
let res_3 = Err(Error::NoData);
|
||||
let timeout_1 = LONG_TIMEOUT / 4;
|
||||
let timeout_2 = timeout_1 + RESOLUTION_DELAY / 3;
|
||||
let timeout_3 = timeout_1 + RESOLUTION_DELAY / 2;
|
||||
let timeout_1 = DNS_CALL_BACKGROUND_TIMEOUT / 4;
|
||||
let timeout_2 = timeout_1 + DNS_RESOLUTION_DELAY / 3;
|
||||
let timeout_3 = timeout_1 + DNS_RESOLUTION_DELAY / 2;
|
||||
respond_after_timeout(timeout_1, tx_1, res_1);
|
||||
respond_after_timeout(timeout_2, tx_2, res_2);
|
||||
respond_after_timeout(timeout_3, tx_3, res_3);
|
||||
@@ -522,7 +520,7 @@ pub(crate) mod test {
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn returns_second_result_if_first_result_fails() {
|
||||
let resolver = TestDnsTransportWithTwoResponses::custom_dns_resolver(|_, _, txs| {
|
||||
let timeout_2 = RESOLUTION_DELAY * 2;
|
||||
let timeout_2 = DNS_RESOLUTION_DELAY * 2;
|
||||
let [tx_1, tx_2] = txs;
|
||||
let res_1 = Err(Error::LookupFailed);
|
||||
let res_2 = ok_query_result_ipv6(Duration::ZERO, IP_V6_LIST_1);
|
||||
@@ -578,8 +576,8 @@ pub(crate) mod test {
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn results_cached_even_if_received_late() {
|
||||
// second result is sent within the `LONG_TIMEOUT`, but after the `RESOLUTION_DELAY`
|
||||
let timeout_1 = LONG_TIMEOUT / 4;
|
||||
let timeout_2 = LONG_TIMEOUT / 2;
|
||||
let timeout_1 = DNS_CALL_BACKGROUND_TIMEOUT / 4;
|
||||
let timeout_2 = DNS_CALL_BACKGROUND_TIMEOUT / 2;
|
||||
let all_results_received_time = Instant::now() + timeout_2;
|
||||
let (transport, resolver) =
|
||||
TestDnsTransportWithTwoResponses::transport_and_custom_dns_resolver(
|
||||
|
||||
@@ -505,6 +505,7 @@ mod test {
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::timeouts::{CONNECTION_ROUTE_COOLDOWN_INTERVALS, CONNECTION_ROUTE_MAX_COOLDOWN};
|
||||
use assert_matches::assert_matches;
|
||||
use async_trait::async_trait;
|
||||
use futures_util::FutureExt;
|
||||
@@ -514,9 +515,7 @@ mod test {
|
||||
|
||||
use super::*;
|
||||
use crate::infra::certs::RootCertificates;
|
||||
use crate::infra::connection_manager::{
|
||||
SingleRouteThrottlingConnectionManager, COOLDOWN_INTERVALS, MAX_COOLDOWN_INTERVAL,
|
||||
};
|
||||
use crate::infra::connection_manager::SingleRouteThrottlingConnectionManager;
|
||||
use crate::infra::test::shared::{
|
||||
TestError, LONG_CONNECTION_TIME, NORMAL_CONNECTION_TIME, TIMEOUT_DURATION,
|
||||
TIME_ADVANCE_VALUE,
|
||||
@@ -813,7 +812,7 @@ mod test {
|
||||
|
||||
// At this point, `service_with_reconnect` tried multiple times to connect
|
||||
// and hit the cooldown. Let's advance time to make sure next attempt will be made.
|
||||
time::advance(MAX_COOLDOWN_INTERVAL).await;
|
||||
time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await;
|
||||
|
||||
connector.set_service_healthy(true);
|
||||
let connection_result = service_with_reconnect.connect_from_inactive().await;
|
||||
@@ -833,7 +832,7 @@ mod test {
|
||||
|
||||
// At this point, `service_with_reconnect` tried multiple times to connect
|
||||
// and hit the cooldown. Let's advance time to make sure next attempt will be made.
|
||||
time::advance(MAX_COOLDOWN_INTERVAL).await;
|
||||
time::advance(CONNECTION_ROUTE_MAX_COOLDOWN).await;
|
||||
|
||||
connector.set_time_to_connect(NORMAL_CONNECTION_TIME);
|
||||
let connection_result = service_with_reconnect.connect_from_inactive().await;
|
||||
@@ -859,11 +858,11 @@ mod test {
|
||||
sleep_and_catch_up(NORMAL_CONNECTION_TIME).await;
|
||||
assert_eq!(connector.attempts.load(Ordering::Relaxed), 2);
|
||||
|
||||
sleep_and_catch_up(COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await;
|
||||
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await;
|
||||
assert_eq!(connector.attempts.load(Ordering::Relaxed), 3);
|
||||
assert_matches!(service_with_reconnect.service().await, Err(_));
|
||||
|
||||
sleep_and_catch_up(COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await;
|
||||
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await;
|
||||
assert_eq!(connector.attempts.load(Ordering::Relaxed), 4);
|
||||
assert_matches!(service_with_reconnect.service().await, Err(_));
|
||||
|
||||
@@ -871,7 +870,7 @@ mod test {
|
||||
// letting next cooldown interval pass and checking again
|
||||
connector.set_service_healthy(true);
|
||||
|
||||
sleep_and_catch_up(COOLDOWN_INTERVALS[2] + NORMAL_CONNECTION_TIME).await;
|
||||
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[2] + NORMAL_CONNECTION_TIME).await;
|
||||
assert_eq!(connector.attempts.load(Ordering::Relaxed), 5);
|
||||
assert_matches!(service_with_reconnect.service().await, Ok(_));
|
||||
}
|
||||
@@ -895,18 +894,18 @@ mod test {
|
||||
sleep_and_catch_up(NORMAL_CONNECTION_TIME).await;
|
||||
assert_eq!(connector.attempts.load(Ordering::Relaxed), 2);
|
||||
|
||||
sleep_and_catch_up(COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await;
|
||||
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[0] + NORMAL_CONNECTION_TIME).await;
|
||||
assert_eq!(connector.attempts.load(Ordering::Relaxed), 3);
|
||||
assert_matches!(service_with_reconnect.service().await, Err(_));
|
||||
|
||||
sleep_and_catch_up(COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await;
|
||||
sleep_and_catch_up(CONNECTION_ROUTE_COOLDOWN_INTERVALS[1] + NORMAL_CONNECTION_TIME).await;
|
||||
assert_eq!(connector.attempts.load(Ordering::Relaxed), 4);
|
||||
assert_matches!(service_with_reconnect.service().await, Err(_));
|
||||
|
||||
// now we decide to disconnect, and we need to make sure we're not making
|
||||
// any more attempts
|
||||
service_with_reconnect.disconnect().await;
|
||||
for interval in COOLDOWN_INTERVALS.into_iter().skip(2) {
|
||||
for interval in CONNECTION_ROUTE_COOLDOWN_INTERVALS.into_iter().skip(2) {
|
||||
sleep_and_catch_up(interval + NORMAL_CONNECTION_TIME).await;
|
||||
assert_eq!(connector.attempts.load(Ordering::Relaxed), 4);
|
||||
assert_matches!(
|
||||
|
||||
@@ -6,8 +6,8 @@
|
||||
use std::net::IpAddr;
|
||||
use std::num::NonZeroU16;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::timeouts::TCP_CONNECTION_ATTEMPT_DELAY;
|
||||
use async_trait::async_trait;
|
||||
use boring::ssl::{ConnectConfiguration, SslConnector, SslMethod};
|
||||
use futures_util::TryFutureExt;
|
||||
@@ -24,8 +24,6 @@ use crate::infra::{
|
||||
};
|
||||
use crate::utils::first_ok;
|
||||
|
||||
const CONNECTION_ATTEMPT_DELAY: Duration = Duration::from_millis(200);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum TcpSslConnector {
|
||||
Direct(DirectConnector),
|
||||
@@ -240,7 +238,7 @@ async fn connect_tcp(
|
||||
// that incorporates the delay based on its position in the list.
|
||||
// This way we can start all futures at once and simply wait for the first one to complete successfully.
|
||||
let staggered_futures = dns_lookup.into_iter().enumerate().map(|(idx, ip)| {
|
||||
let delay = CONNECTION_ATTEMPT_DELAY * idx.try_into().unwrap();
|
||||
let delay = TCP_CONNECTION_ATTEMPT_DELAY * idx.try_into().unwrap();
|
||||
async move {
|
||||
if !delay.is_zero() {
|
||||
tokio::time::sleep(delay).await;
|
||||
|
||||
@@ -613,8 +613,8 @@ pub(crate) mod testutil {
|
||||
use tokio::io::DuplexStream;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
|
||||
use crate::env::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_TIME};
|
||||
use crate::infra::{AsyncDuplexStream, DnsSource, RouteType};
|
||||
use crate::timeouts::{WS_KEEP_ALIVE_INTERVAL, WS_MAX_IDLE_INTERVAL};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -645,7 +645,7 @@ pub(crate) mod testutil {
|
||||
channel,
|
||||
mock_connection_info(),
|
||||
WS_KEEP_ALIVE_INTERVAL,
|
||||
WS_MAX_IDLE_TIME,
|
||||
WS_MAX_IDLE_INTERVAL,
|
||||
)
|
||||
.0
|
||||
}
|
||||
|
||||
@@ -12,4 +12,5 @@ pub mod infra;
|
||||
pub mod proto;
|
||||
pub mod svr;
|
||||
pub mod svr3;
|
||||
pub mod timeouts;
|
||||
pub mod utils;
|
||||
|
||||
52
rust/net/src/timeouts.rs
Normal file
52
rust/net/src/timeouts.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use std::time::Duration;
|
||||
|
||||
/// Timeout for a system DNS lookup
|
||||
pub const DNS_SYSTEM_LOOKUP_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
/// A list of timeouts per each fallback DNS lookup attempt
|
||||
pub const DNS_FALLBACK_LOOKUP_TIMEOUTS: &[Duration] = &[
|
||||
Duration::from_secs(5),
|
||||
Duration::from_secs(10),
|
||||
Duration::from_secs(15),
|
||||
];
|
||||
/// If during a DNS resolution we've sent multiple queries (one per IP type)
|
||||
/// and one of them produced a result, we'll wait this time interval
|
||||
/// to let the other query complete before proceeding
|
||||
pub const DNS_RESOLUTION_DELAY: Duration = Duration::from_millis(50);
|
||||
/// When making a DNS query, a caller is given a result future that it may or may not
|
||||
/// await on until the result is ready (callers will likely await with a timeout).
|
||||
/// Regardless of the caller's behavior, DNS resolver will wait this time interval
|
||||
/// for results to arrive to cache them for the future lookups.
|
||||
pub const DNS_CALL_BACKGROUND_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Frequency of the WebSocket `PING` requests
|
||||
pub const WS_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(15);
|
||||
/// Maximum time of incoming packets inactivity allowed on a WebSocket connection
|
||||
pub const WS_MAX_IDLE_INTERVAL: Duration = Duration::from_secs(45);
|
||||
|
||||
/// Timeout for a connect operation that attempts one route
|
||||
/// (this includes DNS resolution, TCP connection, and SSL handshake)
|
||||
pub const ONE_ROUTE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Timeout for a connect operation that attempts multiple routes
|
||||
pub const MULTI_ROUTE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(180);
|
||||
|
||||
/// When establishing a TCP connection, connections to different IP addresses are
|
||||
/// raced between each other with each new attempt being given an additional delay
|
||||
/// before it starts.
|
||||
pub const TCP_CONNECTION_ATTEMPT_DELAY: Duration = Duration::from_millis(200);
|
||||
|
||||
/// A sequence of timeout values to be used as cooldown intervals between attempts
|
||||
/// when a connection to a given route is consecutively failing to establish
|
||||
pub const CONNECTION_ROUTE_COOLDOWN_INTERVALS: [Duration; 8] = [
|
||||
Duration::from_secs(0),
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(2),
|
||||
Duration::from_secs(4),
|
||||
Duration::from_secs(8),
|
||||
Duration::from_secs(16),
|
||||
Duration::from_secs(32),
|
||||
CONNECTION_ROUTE_MAX_COOLDOWN,
|
||||
];
|
||||
|
||||
/// Maximum value of a coolduwn interval between connection attempts
|
||||
pub const CONNECTION_ROUTE_MAX_COOLDOWN: Duration = Duration::from_secs(64);
|
||||
Reference in New Issue
Block a user