From cbf9a408898d7c5b598842cb2d9b91dd1de9a0a8 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Tue, 4 Jul 2023 14:28:27 +0300 Subject: [PATCH] Set a shorter timeout for the initial connection attempti in proxy. In case we try to connect to an outdated address that is no longer valid, the default behavior of Kubernetes is to drop the packets, causing us to wait for the entire timeout period. We want to fail fast in such cases. A specific case to consider is when we have cached compute node information with a 5-minute TTL (Time To Live), but the user has executed a `/suspend` API call, resulting in the nonexistence of the compute node. --- proxy/src/compute.rs | 13 +++++++------ proxy/src/config.rs | 2 +- proxy/src/proxy.rs | 30 +++++++++++++++++++++++++++--- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 480acb88d9..70b29679b9 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -136,18 +136,17 @@ impl Default for ConnCfg { impl ConnCfg { /// Establish a raw TCP connection to the compute node. - async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream, &str)> { + async fn connect_raw(&self, timeout: Duration) -> io::Result<(SocketAddr, TcpStream, &str)> { use tokio_postgres::config::Host; // wrap TcpStream::connect with timeout let connect_with_timeout = |host, port| { - let connection_timeout = Duration::from_millis(10000); - tokio::time::timeout(connection_timeout, TcpStream::connect((host, port))).map( + tokio::time::timeout(timeout, TcpStream::connect((host, port))).map( move |res| match res { Ok(tcpstream_connect_res) => tcpstream_connect_res, Err(_) => Err(io::Error::new( io::ErrorKind::TimedOut, - format!("exceeded connection timeout {connection_timeout:?}"), + format!("exceeded connection timeout {timeout:?}"), )), }, ) @@ -223,8 +222,9 @@ impl ConnCfg { async fn do_connect( &self, allow_self_signed_compute: bool, + timeout: Duration, ) -> Result { - let (socket_addr, stream, host) = self.connect_raw().await?; + let (socket_addr, stream, host) = self.connect_raw(timeout).await?; let tls_connector = native_tls::TlsConnector::builder() .danger_accept_invalid_certs(allow_self_signed_compute) @@ -264,8 +264,9 @@ impl ConnCfg { pub async fn connect( &self, allow_self_signed_compute: bool, + timeout: Duration, ) -> Result { - self.do_connect(allow_self_signed_compute) + self.do_connect(allow_self_signed_compute, timeout) .inspect_err(|err| { // Immediately log the error we have at our disposal. error!("couldn't connect to compute node: {err}"); diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 6a26cea78e..00f561fcf2 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -212,7 +212,7 @@ pub struct CacheOptions { impl CacheOptions { /// Default options for [`crate::auth::caches::NodeInfoCache`]. - pub const DEFAULT_OPTIONS_NODE_INFO: &str = "size=4000,ttl=5m"; + pub const DEFAULT_OPTIONS_NODE_INFO: &str = "size=4000,ttl=4m"; /// Parse cache options passed via cmdline. /// Example: [`Self::DEFAULT_OPTIONS_NODE_INFO`]. diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 8efb7005c8..2433412c4c 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -16,7 +16,10 @@ use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCou use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::{ + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, + time, +}; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; @@ -305,12 +308,13 @@ pub fn invalidate_cache(node_info: &console::CachedNodeInfo) { #[tracing::instrument(name = "connect_once", skip_all)] async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, + timeout: time::Duration, ) -> Result { let allow_self_signed_compute = node_info.allow_self_signed_compute; node_info .config - .connect(allow_self_signed_compute) + .connect(allow_self_signed_compute, timeout) .inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info)) .await } @@ -328,7 +332,27 @@ async fn connect_to_compute( loop { // Apply startup params to the (possibly, cached) compute node info. node_info.config.set_startup_params(params); - match connect_to_compute_once(node_info).await { + + // Set a shorter timeout for the initial connection attempt. + // + // In case we try to connect to an outdated address that is no longer valid, the + // default behavior of Kubernetes is to drop the packets, causing us to wait for + // the entire timeout period. We want to fail fast in such cases. + // + // A specific case to consider is when we have cached compute node information + // with a 4-minute TTL (Time To Live), but the user has executed a `/suspend` API + // call, resulting in the nonexistence of the compute node. + // + // We only use caching in case of scram proxy backed by the console, so reduce + // the timeout only in that case. + let is_scram_proxy = matches!(creds, auth::BackendType::Console(_, _)); + let timeout = if is_scram_proxy && num_retries == NUM_RETRIES_WAKE_COMPUTE { + time::Duration::from_secs(2) + } else { + time::Duration::from_secs(10) + }; + + match connect_to_compute_once(node_info, timeout).await { Err(e) if num_retries > 0 => { info!("compute node's state has changed; requesting a wake-up"); match creds.wake_compute(extra).map_err(io_error).await? {