diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index 52c1e2f2ce..fb53c663c8 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -1,17 +1,16 @@ use parking_lot::Mutex; use pq_proto::StartupMessageParams; use std::fmt; +use std::ops::ControlFlow; use std::{collections::HashMap, sync::Arc}; - -use futures::TryFutureExt; +use tokio::time; use crate::config; use crate::{auth, console}; use super::sql_over_http::MAX_RESPONSE_SIZE; -use crate::proxy::invalidate_cache; -use crate::proxy::NUM_RETRIES_WAKE_COMPUTE; +use crate::proxy::{invalidate_cache, retry_after, try_wake, NUM_RETRIES_WAKE_COMPUTE}; use tracing::error; use tracing::info; @@ -185,11 +184,10 @@ impl GlobalConnPool { } } -// // Wake up the destination if needed. Code here is a bit involved because // we reuse the code from the usual proxy and we need to prepare few structures // that this code expects. -// +#[tracing::instrument(skip_all)] async fn connect_to_compute( config: &config::ProxyConfig, conn_info: &ConnInfo, @@ -221,34 +219,79 @@ async fn connect_to_compute( let node_info = &mut creds.wake_compute(&extra).await?.expect("msg"); - // This code is a copy of `connect_to_compute` from `src/proxy.rs` with - // the difference that it uses `tokio_postgres` for the connection. - let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + let mut num_retries = 0; + let mut wait_duration = time::Duration::ZERO; + let mut should_wake_with_error = None; loop { - match connect_to_compute_once(node_info, conn_info).await { - Err(e) if num_retries > 0 => { - info!("compute node's state has changed; requesting a wake-up"); - match creds.wake_compute(&extra).await? { - // Update `node_info` and try one more time. - Some(new) => { - *node_info = new; - } - // Link auth doesn't work that way, so we just exit. - None => return Err(e), - } - } - other => return other, + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; } - num_retries -= 1; - info!("retrying after wake-up ({num_retries} attempts left)"); + // try wake the compute node if we have determined it's sensible to do so + if let Some(err) = should_wake_with_error.take() { + match try_wake(node_info, &extra, &creds).await { + // we can't wake up the compute node + Ok(None) => return Err(err), + // there was an error communicating with the control plane + Err(e) => return Err(e.into()), + // failed to wake up but we can continue to retry + Ok(Some(ControlFlow::Continue(()))) => { + wait_duration = retry_after(num_retries); + should_wake_with_error = Some(err); + + num_retries += 1; + info!(num_retries, "retrying wake compute"); + continue; + } + // successfully woke up a compute node and can break the wakeup loop + Ok(Some(ControlFlow::Break(()))) => {} + } + } + + match connect_to_compute_once(node_info, conn_info).await { + Ok(res) => return Ok(res), + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + if !can_retry_error(&e, num_retries) { + return Err(e.into()); + } + wait_duration = retry_after(num_retries); + + // after the first connect failure, + // we should invalidate the cache and wake up a new compute node + if num_retries == 0 { + invalidate_cache(node_info); + should_wake_with_error = Some(e.into()); + } + } + } + + num_retries += 1; + info!(num_retries, "retrying connect"); + } +} + +fn can_retry_error(err: &tokio_postgres::Error, num_retries: u32) -> bool { + use tokio_postgres::error::SqlState; + match err.code() { + // retry all errors at least once + _ if num_retries == 0 => true, + // keep retrying connection errors + Some( + &SqlState::CONNECTION_FAILURE + | &SqlState::CONNECTION_EXCEPTION + | &SqlState::CONNECTION_DOES_NOT_EXIST + | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, + ) if num_retries < NUM_RETRIES_WAKE_COMPUTE => true, + // otherwise, don't retry + _ => false, } } async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, conn_info: &ConnInfo, -) -> anyhow::Result { +) -> Result { let mut config = (*node_info.config).clone(); let (client, connection) = config @@ -257,15 +300,6 @@ async fn connect_to_compute_once( .dbname(&conn_info.dbname) .max_backend_message_size(MAX_RESPONSE_SIZE) .connect(tokio_postgres::NoTls) - .inspect_err(|e: &tokio_postgres::Error| { - error!( - "failed to connect to compute node hosts={:?} ports={:?}: {}", - node_info.config.get_hosts(), - node_info.config.get_ports(), - e - ); - invalidate_cache(node_info) - }) .await?; tokio::spawn(async move { diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 2433412c4c..12ca9c5187 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -6,16 +6,21 @@ use crate::{ cancellation::{self, CancelMap}, compute::{self, PostgresConnection}, config::{ProxyConfig, TlsConfig}, - console::{self, messages::MetricsAuxInfo}, + console::{ + self, + errors::{ApiError, WakeComputeError}, + messages::MetricsAuxInfo, + }, error::io_error, stream::{PqStream, Stream}, }; use anyhow::{bail, Context}; use futures::TryFutureExt; +use hyper::StatusCode; use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; -use std::sync::Arc; +use std::{ops::ControlFlow, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt}, time, @@ -25,7 +30,9 @@ use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. -pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1; +/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n +pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10; +const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100); const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; @@ -315,7 +322,6 @@ async fn connect_to_compute_once( node_info .config .connect(allow_self_signed_compute, timeout) - .inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info)) .await } @@ -328,11 +334,38 @@ async fn connect_to_compute( extra: &console::ConsoleReqExtra<'_>, creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, ) -> Result { - let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + let mut num_retries = 0; + let mut wait_duration = time::Duration::ZERO; + let mut should_wake_with_error = None; loop { // Apply startup params to the (possibly, cached) compute node info. node_info.config.set_startup_params(params); + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; + } + + // try wake the compute node if we have determined it's sensible to do so + if let Some(err) = should_wake_with_error.take() { + match try_wake(node_info, extra, creds).await { + // we can't wake up the compute node + Ok(None) => return Err(err), + // there was an error communicating with the control plane + Err(e) => return Err(io_error(e).into()), + // failed to wake up but we can continue to retry + Ok(Some(ControlFlow::Continue(()))) => { + wait_duration = retry_after(num_retries); + should_wake_with_error = Some(err); + + num_retries += 1; + info!(num_retries, "retrying wake compute"); + continue; + } + // successfully woke up a compute node and can break the wakeup loop + Ok(Some(ControlFlow::Break(()))) => {} + } + } + // Set a shorter timeout for the initial connection attempt. // // In case we try to connect to an outdated address that is no longer valid, the @@ -346,30 +379,91 @@ async fn connect_to_compute( // We only use caching in case of scram proxy backed by the console, so reduce // the timeout only in that case. let is_scram_proxy = matches!(creds, auth::BackendType::Console(_, _)); - let timeout = if is_scram_proxy && num_retries == NUM_RETRIES_WAKE_COMPUTE { + let timeout = if is_scram_proxy && num_retries == 0 { time::Duration::from_secs(2) } else { time::Duration::from_secs(10) }; + // do this again to ensure we have username? + node_info.config.set_startup_params(params); + match connect_to_compute_once(node_info, timeout).await { - Err(e) if num_retries > 0 => { - info!("compute node's state has changed; requesting a wake-up"); - match creds.wake_compute(extra).map_err(io_error).await? { - // Update `node_info` and try one more time. - Some(mut new) => { - new.config.reuse_password(&node_info.config); - *node_info = new; - } - // Link auth doesn't work that way, so we just exit. - None => return Err(e), + Ok(res) => return Ok(res), + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + if !can_retry_error(&e, num_retries) { + return Err(e); + } + wait_duration = retry_after(num_retries); + + // after the first connect failure, + // we should invalidate the cache and wake up a new compute node + if num_retries == 0 { + invalidate_cache(node_info); + should_wake_with_error = Some(e); } } - other => return other, } - num_retries -= 1; - info!("retrying after wake-up ({num_retries} attempts left)"); + num_retries += 1; + info!(num_retries, "retrying connect"); + } +} + +/// Attempts to wake up the compute node. +/// * Returns Ok(Some(true)) if there was an error waking but retries are acceptable +/// * Returns Ok(Some(false)) if the wakeup succeeded +/// * Returns Ok(None) or Err(e) if there was an error +pub async fn try_wake( + node_info: &mut console::CachedNodeInfo, + extra: &console::ConsoleReqExtra<'_>, + creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, +) -> Result>, WakeComputeError> { + info!("compute node's state has likely changed; requesting a wake-up"); + match creds.wake_compute(extra).await { + // retry wake if the compute was in an invalid state + Err(WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::BAD_REQUEST, + .. + })) => Ok(Some(ControlFlow::Continue(()))), + // Update `node_info` and try again. + Ok(Some(mut new)) => { + new.config.reuse_password(&node_info.config); + *node_info = new; + Ok(Some(ControlFlow::Break(()))) + } + Err(e) => Err(e), + Ok(None) => Ok(None), + } +} + +fn can_retry_error(err: &compute::ConnectionError, num_retries: u32) -> bool { + use std::io::ErrorKind; + match err { + // retry all errors at least once + _ if num_retries == 0 => true, + // keep retrying connection errors + compute::ConnectionError::CouldNotConnect(io_err) + if num_retries < NUM_RETRIES_WAKE_COMPUTE => + { + matches!( + io_err.kind(), + ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable + ) + } + // otherwise, don't retry + _ => false, + } +} + +pub fn retry_after(num_retries: u32) -> time::Duration { + match num_retries { + 0 => time::Duration::ZERO, + _ => { + // 3/2 = 1.5 which seems to be an ok growth factor heuristic + BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries) + } } } diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 3373c49676..b9215cd90e 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -294,3 +294,13 @@ async fn scram_auth_mock() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn connect_compute_total_wait() { + let mut total_wait = tokio::time::Duration::ZERO; + for num_retries in 0..10 { + total_wait += retry_after(num_retries); + } + assert!(total_wait < tokio::time::Duration::from_secs(12)); + assert!(total_wait > tokio::time::Duration::from_secs(10)); +}