From efa6aa134f4190d4ad9c10eaa55b17abd79b9457 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 7 Jul 2023 17:50:50 +0100 Subject: [PATCH 1/2] allow repeated IO errors from compute node (#4624) ## Problem #4598 compute nodes are not accessible some time after wake up due to kubernetes DNS not being fully propagated. ## Summary of changes Update connect retry mechanism to support handling IO errors and sleeping for 100ms ## Checklist before requesting a review - [x] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --- proxy/src/http/conn_pool.rs | 69 +++++++++++++++++----------- proxy/src/proxy.rs | 92 ++++++++++++++++++++++++++++++------- proxy/src/proxy/tests.rs | 17 +++++++ 3 files changed, 136 insertions(+), 42 deletions(-) diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index 52c1e2f2ce..27950d3a20 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -2,16 +2,15 @@ use parking_lot::Mutex; use pq_proto::StartupMessageParams; use std::fmt; use std::{collections::HashMap, sync::Arc}; - -use futures::TryFutureExt; +use tokio::time; use crate::config; use crate::{auth, console}; use super::sql_over_http::MAX_RESPONSE_SIZE; -use crate::proxy::invalidate_cache; -use crate::proxy::NUM_RETRIES_WAKE_COMPUTE; +use crate::proxy::try_wake; +use crate::proxy::{BASE_RETRY_WAIT_DURATION, NUM_RETRIES_WAKE_COMPUTE}; use tracing::error; use tracing::info; @@ -223,32 +222,59 @@ async fn connect_to_compute( // This code is a copy of `connect_to_compute` from `src/proxy.rs` with // the difference that it uses `tokio_postgres` for the connection. - let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + let mut num_retries = 0; + let mut should_wake = true; loop { match connect_to_compute_once(node_info, conn_info).await { - Err(e) if num_retries > 0 => { - info!("compute node's state has changed; requesting a wake-up"); - match creds.wake_compute(&extra).await? { - // Update `node_info` and try one more time. - Some(new) => { - *node_info = new; + Err(e) if num_retries == NUM_RETRIES_WAKE_COMPUTE => { + if let Some(wait_duration) = retry_connect_in(&e, num_retries) { + error!(error = ?e, "could not connect to compute node"); + if should_wake { + match try_wake(node_info, &extra, &creds).await { + Ok(Some(x)) => should_wake = x, + Ok(None) => return Err(e.into()), + Err(e) => return Err(e.into()), + } } - // Link auth doesn't work that way, so we just exit. - None => return Err(e), + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; + } + } else { + return Err(e.into()); } } - other => return other, + other => return Ok(other?), } - num_retries -= 1; - info!("retrying after wake-up ({num_retries} attempts left)"); + num_retries += 1; + info!(retries_left = num_retries, "retrying connect"); + } +} + +fn retry_connect_in(err: &tokio_postgres::Error, num_retries: u32) -> Option { + use tokio_postgres::error::SqlState; + match err.code() { + // retry all errors at least once immediately + _ if num_retries == 0 => Some(time::Duration::ZERO), + // keep retrying connection errors every 100ms + Some( + &SqlState::CONNECTION_FAILURE + | &SqlState::CONNECTION_EXCEPTION + | &SqlState::CONNECTION_DOES_NOT_EXIST + | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, + ) => { + // 3/2 = 1.5 which seems to be an ok growth factor heuristic + Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)) + } + // otherwise, don't retry + _ => None, } } async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, conn_info: &ConnInfo, -) -> anyhow::Result { +) -> Result { let mut config = (*node_info.config).clone(); let (client, connection) = config @@ -257,15 +283,6 @@ async fn connect_to_compute_once( .dbname(&conn_info.dbname) .max_backend_message_size(MAX_RESPONSE_SIZE) .connect(tokio_postgres::NoTls) - .inspect_err(|e: &tokio_postgres::Error| { - error!( - "failed to connect to compute node hosts={:?} ports={:?}: {}", - node_info.config.get_hosts(), - node_info.config.get_ports(), - e - ); - invalidate_cache(node_info) - }) .await?; tokio::spawn(async move { diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 2433412c4c..5c5353a63e 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -6,12 +6,17 @@ use crate::{ cancellation::{self, CancelMap}, compute::{self, PostgresConnection}, config::{ProxyConfig, TlsConfig}, - console::{self, messages::MetricsAuxInfo}, + console::{ + self, + errors::{ApiError, WakeComputeError}, + messages::MetricsAuxInfo, + }, error::io_error, stream::{PqStream, Stream}, }; use anyhow::{bail, Context}; use futures::TryFutureExt; +use hyper::StatusCode; use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; @@ -25,7 +30,9 @@ use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. -pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1; +/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n +pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10; +pub const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100); const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; @@ -315,7 +322,6 @@ async fn connect_to_compute_once( node_info .config .connect(allow_self_signed_compute, timeout) - .inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info)) .await } @@ -328,7 +334,8 @@ async fn connect_to_compute( extra: &console::ConsoleReqExtra<'_>, creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, ) -> Result { - let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + let mut num_retries = 0; + let mut should_wake = true; loop { // Apply startup params to the (possibly, cached) compute node info. node_info.config.set_startup_params(params); @@ -346,30 +353,83 @@ async fn connect_to_compute( // We only use caching in case of scram proxy backed by the console, so reduce // the timeout only in that case. let is_scram_proxy = matches!(creds, auth::BackendType::Console(_, _)); - let timeout = if is_scram_proxy && num_retries == NUM_RETRIES_WAKE_COMPUTE { + let timeout = if is_scram_proxy && num_retries == 0 { time::Duration::from_secs(2) } else { time::Duration::from_secs(10) }; match connect_to_compute_once(node_info, timeout).await { - Err(e) if num_retries > 0 => { - info!("compute node's state has changed; requesting a wake-up"); - match creds.wake_compute(extra).map_err(io_error).await? { - // Update `node_info` and try one more time. - Some(mut new) => { - new.config.reuse_password(&node_info.config); - *node_info = new; + Err(e) if num_retries < NUM_RETRIES_WAKE_COMPUTE => { + if let Some(wait_duration) = retry_connect_in(&e, num_retries) { + error!(error = ?e, "could not connect to compute node"); + if should_wake { + match try_wake(node_info, extra, creds).await { + Ok(Some(x)) => { + should_wake = x; + } + Ok(None) => return Err(e), + Err(e) => return Err(io_error(e).into()), + } } - // Link auth doesn't work that way, so we just exit. - None => return Err(e), + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; + } + } else { + return Err(e); } } other => return other, } - num_retries -= 1; - info!("retrying after wake-up ({num_retries} attempts left)"); + num_retries += 1; + info!(retries_left = num_retries, "retrying connect"); + } +} + +/// Attempts to wake up the compute node. +/// * Returns Ok(Some(true)) if there was an error waking but retries are acceptable +/// * Returns Ok(Some(false)) if the wakeup succeeded +/// * Returns Ok(None) or Err(e) if there was an error +pub async fn try_wake( + node_info: &mut console::CachedNodeInfo, + extra: &console::ConsoleReqExtra<'_>, + creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, +) -> Result, WakeComputeError> { + info!("compute node's state has likely changed; requesting a wake-up"); + invalidate_cache(node_info); + match creds.wake_compute(extra).await { + // retry wake if the compute was in an invalid state + Err(WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::BAD_REQUEST, + .. + })) => Ok(Some(true)), + // Update `node_info` and try again. + Ok(Some(mut new)) => { + new.config.reuse_password(&node_info.config); + *node_info = new; + Ok(Some(false)) + } + Err(e) => Err(e), + Ok(None) => Ok(None), + } +} + +fn retry_connect_in(err: &compute::ConnectionError, num_retries: u32) -> Option { + use std::io::ErrorKind; + match err { + // retry all errors at least once immediately + _ if num_retries == 0 => Some(time::Duration::ZERO), + // keep retrying connection errors every 100ms + compute::ConnectionError::CouldNotConnect(io_err) => match io_err.kind() { + ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable => { + // 3/2 = 1.5 which seems to be an ok growth factor heuristic + Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)) + } + _ => None, + }, + // otherwise, don't retry + _ => None, } } diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 3373c49676..a1f6cd3ed4 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -1,4 +1,6 @@ //! A group of high-level tests for connection establishing logic and auth. +use std::io; + use super::*; use crate::{auth, sasl, scram}; use async_trait::async_trait; @@ -294,3 +296,18 @@ async fn scram_auth_mock() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn connect_compute_total_wait() { + let err = compute::ConnectionError::CouldNotConnect(io::Error::new( + io::ErrorKind::ConnectionRefused, + "conn refused", + )); + + let mut total_wait = tokio::time::Duration::ZERO; + for num_retries in 0..10 { + total_wait += retry_connect_in(&err, num_retries).unwrap(); + } + assert!(total_wait < tokio::time::Duration::from_secs(12)); + assert!(total_wait > tokio::time::Duration::from_secs(10)); +} From 39a28d11083eefc72c5f302c21e0e74742129e14 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 12 Jul 2023 11:38:36 +0100 Subject: [PATCH 2/2] proxy wake_compute loop (#4675) ## Problem If we fail to wake up the compute node, a subsequent connect attempt will definitely fail. However, kubernetes won't fail the connection immediately, instead it hangs until we timeout (10s). ## Summary of changes Refactor the loop to allow fast retries of compute_wake and to skip a connect attempt. --- proxy/src/http/conn_pool.rs | 81 ++++++++++++++++----------- proxy/src/proxy.rs | 108 ++++++++++++++++++++++++------------ proxy/src/proxy/tests.rs | 9 +-- 3 files changed, 121 insertions(+), 77 deletions(-) diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index 27950d3a20..fb53c663c8 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -1,6 +1,7 @@ use parking_lot::Mutex; use pq_proto::StartupMessageParams; use std::fmt; +use std::ops::ControlFlow; use std::{collections::HashMap, sync::Arc}; use tokio::time; @@ -9,8 +10,7 @@ use crate::{auth, console}; use super::sql_over_http::MAX_RESPONSE_SIZE; -use crate::proxy::try_wake; -use crate::proxy::{BASE_RETRY_WAIT_DURATION, NUM_RETRIES_WAKE_COMPUTE}; +use crate::proxy::{invalidate_cache, retry_after, try_wake, NUM_RETRIES_WAKE_COMPUTE}; use tracing::error; use tracing::info; @@ -184,11 +184,10 @@ impl GlobalConnPool { } } -// // Wake up the destination if needed. Code here is a bit involved because // we reuse the code from the usual proxy and we need to prepare few structures // that this code expects. -// +#[tracing::instrument(skip_all)] async fn connect_to_compute( config: &config::ProxyConfig, conn_info: &ConnInfo, @@ -220,54 +219,72 @@ async fn connect_to_compute( let node_info = &mut creds.wake_compute(&extra).await?.expect("msg"); - // This code is a copy of `connect_to_compute` from `src/proxy.rs` with - // the difference that it uses `tokio_postgres` for the connection. let mut num_retries = 0; - let mut should_wake = true; + let mut wait_duration = time::Duration::ZERO; + let mut should_wake_with_error = None; loop { + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; + } + + // try wake the compute node if we have determined it's sensible to do so + if let Some(err) = should_wake_with_error.take() { + match try_wake(node_info, &extra, &creds).await { + // we can't wake up the compute node + Ok(None) => return Err(err), + // there was an error communicating with the control plane + Err(e) => return Err(e.into()), + // failed to wake up but we can continue to retry + Ok(Some(ControlFlow::Continue(()))) => { + wait_duration = retry_after(num_retries); + should_wake_with_error = Some(err); + + num_retries += 1; + info!(num_retries, "retrying wake compute"); + continue; + } + // successfully woke up a compute node and can break the wakeup loop + Ok(Some(ControlFlow::Break(()))) => {} + } + } + match connect_to_compute_once(node_info, conn_info).await { - Err(e) if num_retries == NUM_RETRIES_WAKE_COMPUTE => { - if let Some(wait_duration) = retry_connect_in(&e, num_retries) { - error!(error = ?e, "could not connect to compute node"); - if should_wake { - match try_wake(node_info, &extra, &creds).await { - Ok(Some(x)) => should_wake = x, - Ok(None) => return Err(e.into()), - Err(e) => return Err(e.into()), - } - } - if !wait_duration.is_zero() { - time::sleep(wait_duration).await; - } - } else { + Ok(res) => return Ok(res), + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + if !can_retry_error(&e, num_retries) { return Err(e.into()); } + wait_duration = retry_after(num_retries); + + // after the first connect failure, + // we should invalidate the cache and wake up a new compute node + if num_retries == 0 { + invalidate_cache(node_info); + should_wake_with_error = Some(e.into()); + } } - other => return Ok(other?), } num_retries += 1; - info!(retries_left = num_retries, "retrying connect"); + info!(num_retries, "retrying connect"); } } -fn retry_connect_in(err: &tokio_postgres::Error, num_retries: u32) -> Option { +fn can_retry_error(err: &tokio_postgres::Error, num_retries: u32) -> bool { use tokio_postgres::error::SqlState; match err.code() { - // retry all errors at least once immediately - _ if num_retries == 0 => Some(time::Duration::ZERO), - // keep retrying connection errors every 100ms + // retry all errors at least once + _ if num_retries == 0 => true, + // keep retrying connection errors Some( &SqlState::CONNECTION_FAILURE | &SqlState::CONNECTION_EXCEPTION | &SqlState::CONNECTION_DOES_NOT_EXIST | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, - ) => { - // 3/2 = 1.5 which seems to be an ok growth factor heuristic - Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)) - } + ) if num_retries < NUM_RETRIES_WAKE_COMPUTE => true, // otherwise, don't retry - _ => None, + _ => false, } } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 5c5353a63e..12ca9c5187 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -20,7 +20,7 @@ use hyper::StatusCode; use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; -use std::sync::Arc; +use std::{ops::ControlFlow, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt}, time, @@ -32,7 +32,7 @@ use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. /// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10; -pub const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100); +const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100); const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; @@ -335,11 +335,37 @@ async fn connect_to_compute( creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, ) -> Result { let mut num_retries = 0; - let mut should_wake = true; + let mut wait_duration = time::Duration::ZERO; + let mut should_wake_with_error = None; loop { // Apply startup params to the (possibly, cached) compute node info. node_info.config.set_startup_params(params); + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; + } + + // try wake the compute node if we have determined it's sensible to do so + if let Some(err) = should_wake_with_error.take() { + match try_wake(node_info, extra, creds).await { + // we can't wake up the compute node + Ok(None) => return Err(err), + // there was an error communicating with the control plane + Err(e) => return Err(io_error(e).into()), + // failed to wake up but we can continue to retry + Ok(Some(ControlFlow::Continue(()))) => { + wait_duration = retry_after(num_retries); + should_wake_with_error = Some(err); + + num_retries += 1; + info!(num_retries, "retrying wake compute"); + continue; + } + // successfully woke up a compute node and can break the wakeup loop + Ok(Some(ControlFlow::Break(()))) => {} + } + } + // Set a shorter timeout for the initial connection attempt. // // In case we try to connect to an outdated address that is no longer valid, the @@ -359,31 +385,29 @@ async fn connect_to_compute( time::Duration::from_secs(10) }; + // do this again to ensure we have username? + node_info.config.set_startup_params(params); + match connect_to_compute_once(node_info, timeout).await { - Err(e) if num_retries < NUM_RETRIES_WAKE_COMPUTE => { - if let Some(wait_duration) = retry_connect_in(&e, num_retries) { - error!(error = ?e, "could not connect to compute node"); - if should_wake { - match try_wake(node_info, extra, creds).await { - Ok(Some(x)) => { - should_wake = x; - } - Ok(None) => return Err(e), - Err(e) => return Err(io_error(e).into()), - } - } - if !wait_duration.is_zero() { - time::sleep(wait_duration).await; - } - } else { + Ok(res) => return Ok(res), + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + if !can_retry_error(&e, num_retries) { return Err(e); } + wait_duration = retry_after(num_retries); + + // after the first connect failure, + // we should invalidate the cache and wake up a new compute node + if num_retries == 0 { + invalidate_cache(node_info); + should_wake_with_error = Some(e); + } } - other => return other, } num_retries += 1; - info!(retries_left = num_retries, "retrying connect"); + info!(num_retries, "retrying connect"); } } @@ -395,41 +419,51 @@ pub async fn try_wake( node_info: &mut console::CachedNodeInfo, extra: &console::ConsoleReqExtra<'_>, creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, -) -> Result, WakeComputeError> { +) -> Result>, WakeComputeError> { info!("compute node's state has likely changed; requesting a wake-up"); - invalidate_cache(node_info); match creds.wake_compute(extra).await { // retry wake if the compute was in an invalid state Err(WakeComputeError::ApiError(ApiError::Console { status: StatusCode::BAD_REQUEST, .. - })) => Ok(Some(true)), + })) => Ok(Some(ControlFlow::Continue(()))), // Update `node_info` and try again. Ok(Some(mut new)) => { new.config.reuse_password(&node_info.config); *node_info = new; - Ok(Some(false)) + Ok(Some(ControlFlow::Break(()))) } Err(e) => Err(e), Ok(None) => Ok(None), } } -fn retry_connect_in(err: &compute::ConnectionError, num_retries: u32) -> Option { +fn can_retry_error(err: &compute::ConnectionError, num_retries: u32) -> bool { use std::io::ErrorKind; match err { - // retry all errors at least once immediately - _ if num_retries == 0 => Some(time::Duration::ZERO), - // keep retrying connection errors every 100ms - compute::ConnectionError::CouldNotConnect(io_err) => match io_err.kind() { - ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable => { - // 3/2 = 1.5 which seems to be an ok growth factor heuristic - Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)) - } - _ => None, - }, + // retry all errors at least once + _ if num_retries == 0 => true, + // keep retrying connection errors + compute::ConnectionError::CouldNotConnect(io_err) + if num_retries < NUM_RETRIES_WAKE_COMPUTE => + { + matches!( + io_err.kind(), + ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable + ) + } // otherwise, don't retry - _ => None, + _ => false, + } +} + +pub fn retry_after(num_retries: u32) -> time::Duration { + match num_retries { + 0 => time::Duration::ZERO, + _ => { + // 3/2 = 1.5 which seems to be an ok growth factor heuristic + BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries) + } } } diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index a1f6cd3ed4..b9215cd90e 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -1,6 +1,4 @@ //! A group of high-level tests for connection establishing logic and auth. -use std::io; - use super::*; use crate::{auth, sasl, scram}; use async_trait::async_trait; @@ -299,14 +297,9 @@ async fn scram_auth_mock() -> anyhow::Result<()> { #[test] fn connect_compute_total_wait() { - let err = compute::ConnectionError::CouldNotConnect(io::Error::new( - io::ErrorKind::ConnectionRefused, - "conn refused", - )); - let mut total_wait = tokio::time::Duration::ZERO; for num_retries in 0..10 { - total_wait += retry_connect_in(&err, num_retries).unwrap(); + total_wait += retry_after(num_retries); } assert!(total_wait < tokio::time::Duration::from_secs(12)); assert!(total_wait > tokio::time::Duration::from_secs(10));