diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 480acb88d9..70b29679b9 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -136,18 +136,17 @@ impl Default for ConnCfg { impl ConnCfg { /// Establish a raw TCP connection to the compute node. - async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream, &str)> { + async fn connect_raw(&self, timeout: Duration) -> io::Result<(SocketAddr, TcpStream, &str)> { use tokio_postgres::config::Host; // wrap TcpStream::connect with timeout let connect_with_timeout = |host, port| { - let connection_timeout = Duration::from_millis(10000); - tokio::time::timeout(connection_timeout, TcpStream::connect((host, port))).map( + tokio::time::timeout(timeout, TcpStream::connect((host, port))).map( move |res| match res { Ok(tcpstream_connect_res) => tcpstream_connect_res, Err(_) => Err(io::Error::new( io::ErrorKind::TimedOut, - format!("exceeded connection timeout {connection_timeout:?}"), + format!("exceeded connection timeout {timeout:?}"), )), }, ) @@ -223,8 +222,9 @@ impl ConnCfg { async fn do_connect( &self, allow_self_signed_compute: bool, + timeout: Duration, ) -> Result { - let (socket_addr, stream, host) = self.connect_raw().await?; + let (socket_addr, stream, host) = self.connect_raw(timeout).await?; let tls_connector = native_tls::TlsConnector::builder() .danger_accept_invalid_certs(allow_self_signed_compute) @@ -264,8 +264,9 @@ impl ConnCfg { pub async fn connect( &self, allow_self_signed_compute: bool, + timeout: Duration, ) -> Result { - self.do_connect(allow_self_signed_compute) + self.do_connect(allow_self_signed_compute, timeout) .inspect_err(|err| { // Immediately log the error we have at our disposal. error!("couldn't connect to compute node: {err}"); diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 6a26cea78e..00f561fcf2 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -212,7 +212,7 @@ pub struct CacheOptions { impl CacheOptions { /// Default options for [`crate::auth::caches::NodeInfoCache`]. - pub const DEFAULT_OPTIONS_NODE_INFO: &str = "size=4000,ttl=5m"; + pub const DEFAULT_OPTIONS_NODE_INFO: &str = "size=4000,ttl=4m"; /// Parse cache options passed via cmdline. /// Example: [`Self::DEFAULT_OPTIONS_NODE_INFO`]. diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 8efb7005c8..2433412c4c 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -16,7 +16,10 @@ use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCou use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::{ + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, + time, +}; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; @@ -305,12 +308,13 @@ pub fn invalidate_cache(node_info: &console::CachedNodeInfo) { #[tracing::instrument(name = "connect_once", skip_all)] async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, + timeout: time::Duration, ) -> Result { let allow_self_signed_compute = node_info.allow_self_signed_compute; node_info .config - .connect(allow_self_signed_compute) + .connect(allow_self_signed_compute, timeout) .inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info)) .await } @@ -328,7 +332,27 @@ async fn connect_to_compute( loop { // Apply startup params to the (possibly, cached) compute node info. node_info.config.set_startup_params(params); - match connect_to_compute_once(node_info).await { + + // Set a shorter timeout for the initial connection attempt. + // + // In case we try to connect to an outdated address that is no longer valid, the + // default behavior of Kubernetes is to drop the packets, causing us to wait for + // the entire timeout period. We want to fail fast in such cases. + // + // A specific case to consider is when we have cached compute node information + // with a 4-minute TTL (Time To Live), but the user has executed a `/suspend` API + // call, resulting in the nonexistence of the compute node. + // + // We only use caching in case of scram proxy backed by the console, so reduce + // the timeout only in that case. + let is_scram_proxy = matches!(creds, auth::BackendType::Console(_, _)); + let timeout = if is_scram_proxy && num_retries == NUM_RETRIES_WAKE_COMPUTE { + time::Duration::from_secs(2) + } else { + time::Duration::from_secs(10) + }; + + match connect_to_compute_once(node_info, timeout).await { Err(e) if num_retries > 0 => { info!("compute node's state has changed; requesting a wake-up"); match creds.wake_compute(extra).map_err(io_error).await? {