diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index 52c1e2f2ce..27950d3a20 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -2,16 +2,15 @@ use parking_lot::Mutex; use pq_proto::StartupMessageParams; use std::fmt; use std::{collections::HashMap, sync::Arc}; - -use futures::TryFutureExt; +use tokio::time; use crate::config; use crate::{auth, console}; use super::sql_over_http::MAX_RESPONSE_SIZE; -use crate::proxy::invalidate_cache; -use crate::proxy::NUM_RETRIES_WAKE_COMPUTE; +use crate::proxy::try_wake; +use crate::proxy::{BASE_RETRY_WAIT_DURATION, NUM_RETRIES_WAKE_COMPUTE}; use tracing::error; use tracing::info; @@ -223,32 +222,59 @@ async fn connect_to_compute( // This code is a copy of `connect_to_compute` from `src/proxy.rs` with // the difference that it uses `tokio_postgres` for the connection. - let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + let mut num_retries = 0; + let mut should_wake = true; loop { match connect_to_compute_once(node_info, conn_info).await { - Err(e) if num_retries > 0 => { - info!("compute node's state has changed; requesting a wake-up"); - match creds.wake_compute(&extra).await? { - // Update `node_info` and try one more time. - Some(new) => { - *node_info = new; + Err(e) if num_retries == NUM_RETRIES_WAKE_COMPUTE => { + if let Some(wait_duration) = retry_connect_in(&e, num_retries) { + error!(error = ?e, "could not connect to compute node"); + if should_wake { + match try_wake(node_info, &extra, &creds).await { + Ok(Some(x)) => should_wake = x, + Ok(None) => return Err(e.into()), + Err(e) => return Err(e.into()), + } } - // Link auth doesn't work that way, so we just exit. - None => return Err(e), + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; + } + } else { + return Err(e.into()); } } - other => return other, + other => return Ok(other?), } - num_retries -= 1; - info!("retrying after wake-up ({num_retries} attempts left)"); + num_retries += 1; + info!(retries_left = num_retries, "retrying connect"); + } +} + +fn retry_connect_in(err: &tokio_postgres::Error, num_retries: u32) -> Option { + use tokio_postgres::error::SqlState; + match err.code() { + // retry all errors at least once immediately + _ if num_retries == 0 => Some(time::Duration::ZERO), + // keep retrying connection errors every 100ms + Some( + &SqlState::CONNECTION_FAILURE + | &SqlState::CONNECTION_EXCEPTION + | &SqlState::CONNECTION_DOES_NOT_EXIST + | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, + ) => { + // 3/2 = 1.5 which seems to be an ok growth factor heuristic + Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)) + } + // otherwise, don't retry + _ => None, } } async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, conn_info: &ConnInfo, -) -> anyhow::Result { +) -> Result { let mut config = (*node_info.config).clone(); let (client, connection) = config @@ -257,15 +283,6 @@ async fn connect_to_compute_once( .dbname(&conn_info.dbname) .max_backend_message_size(MAX_RESPONSE_SIZE) .connect(tokio_postgres::NoTls) - .inspect_err(|e: &tokio_postgres::Error| { - error!( - "failed to connect to compute node hosts={:?} ports={:?}: {}", - node_info.config.get_hosts(), - node_info.config.get_ports(), - e - ); - invalidate_cache(node_info) - }) .await?; tokio::spawn(async move { diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 2433412c4c..5c5353a63e 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -6,12 +6,17 @@ use crate::{ cancellation::{self, CancelMap}, compute::{self, PostgresConnection}, config::{ProxyConfig, TlsConfig}, - console::{self, messages::MetricsAuxInfo}, + console::{ + self, + errors::{ApiError, WakeComputeError}, + messages::MetricsAuxInfo, + }, error::io_error, stream::{PqStream, Stream}, }; use anyhow::{bail, Context}; use futures::TryFutureExt; +use hyper::StatusCode; use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; @@ -25,7 +30,9 @@ use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. -pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1; +/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n +pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10; +pub const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100); const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; @@ -315,7 +322,6 @@ async fn connect_to_compute_once( node_info .config .connect(allow_self_signed_compute, timeout) - .inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info)) .await } @@ -328,7 +334,8 @@ async fn connect_to_compute( extra: &console::ConsoleReqExtra<'_>, creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, ) -> Result { - let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + let mut num_retries = 0; + let mut should_wake = true; loop { // Apply startup params to the (possibly, cached) compute node info. node_info.config.set_startup_params(params); @@ -346,30 +353,83 @@ async fn connect_to_compute( // We only use caching in case of scram proxy backed by the console, so reduce // the timeout only in that case. let is_scram_proxy = matches!(creds, auth::BackendType::Console(_, _)); - let timeout = if is_scram_proxy && num_retries == NUM_RETRIES_WAKE_COMPUTE { + let timeout = if is_scram_proxy && num_retries == 0 { time::Duration::from_secs(2) } else { time::Duration::from_secs(10) }; match connect_to_compute_once(node_info, timeout).await { - Err(e) if num_retries > 0 => { - info!("compute node's state has changed; requesting a wake-up"); - match creds.wake_compute(extra).map_err(io_error).await? { - // Update `node_info` and try one more time. - Some(mut new) => { - new.config.reuse_password(&node_info.config); - *node_info = new; + Err(e) if num_retries < NUM_RETRIES_WAKE_COMPUTE => { + if let Some(wait_duration) = retry_connect_in(&e, num_retries) { + error!(error = ?e, "could not connect to compute node"); + if should_wake { + match try_wake(node_info, extra, creds).await { + Ok(Some(x)) => { + should_wake = x; + } + Ok(None) => return Err(e), + Err(e) => return Err(io_error(e).into()), + } } - // Link auth doesn't work that way, so we just exit. - None => return Err(e), + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; + } + } else { + return Err(e); } } other => return other, } - num_retries -= 1; - info!("retrying after wake-up ({num_retries} attempts left)"); + num_retries += 1; + info!(retries_left = num_retries, "retrying connect"); + } +} + +/// Attempts to wake up the compute node. +/// * Returns Ok(Some(true)) if there was an error waking but retries are acceptable +/// * Returns Ok(Some(false)) if the wakeup succeeded +/// * Returns Ok(None) or Err(e) if there was an error +pub async fn try_wake( + node_info: &mut console::CachedNodeInfo, + extra: &console::ConsoleReqExtra<'_>, + creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, +) -> Result, WakeComputeError> { + info!("compute node's state has likely changed; requesting a wake-up"); + invalidate_cache(node_info); + match creds.wake_compute(extra).await { + // retry wake if the compute was in an invalid state + Err(WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::BAD_REQUEST, + .. + })) => Ok(Some(true)), + // Update `node_info` and try again. + Ok(Some(mut new)) => { + new.config.reuse_password(&node_info.config); + *node_info = new; + Ok(Some(false)) + } + Err(e) => Err(e), + Ok(None) => Ok(None), + } +} + +fn retry_connect_in(err: &compute::ConnectionError, num_retries: u32) -> Option { + use std::io::ErrorKind; + match err { + // retry all errors at least once immediately + _ if num_retries == 0 => Some(time::Duration::ZERO), + // keep retrying connection errors every 100ms + compute::ConnectionError::CouldNotConnect(io_err) => match io_err.kind() { + ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable => { + // 3/2 = 1.5 which seems to be an ok growth factor heuristic + Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)) + } + _ => None, + }, + // otherwise, don't retry + _ => None, } } diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 3373c49676..a1f6cd3ed4 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -1,4 +1,6 @@ //! A group of high-level tests for connection establishing logic and auth. +use std::io; + use super::*; use crate::{auth, sasl, scram}; use async_trait::async_trait; @@ -294,3 +296,18 @@ async fn scram_auth_mock() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn connect_compute_total_wait() { + let err = compute::ConnectionError::CouldNotConnect(io::Error::new( + io::ErrorKind::ConnectionRefused, + "conn refused", + )); + + let mut total_wait = tokio::time::Duration::ZERO; + for num_retries in 0..10 { + total_wait += retry_connect_in(&err, num_retries).unwrap(); + } + assert!(total_wait < tokio::time::Duration::from_secs(12)); + assert!(total_wait > tokio::time::Duration::from_secs(10)); +}