From 39a28d11083eefc72c5f302c21e0e74742129e14 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 12 Jul 2023 11:38:36 +0100 Subject: [PATCH] proxy wake_compute loop (#4675) ## Problem If we fail to wake up the compute node, a subsequent connect attempt will definitely fail. However, kubernetes won't fail the connection immediately, instead it hangs until we timeout (10s). ## Summary of changes Refactor the loop to allow fast retries of compute_wake and to skip a connect attempt. --- proxy/src/http/conn_pool.rs | 81 ++++++++++++++++----------- proxy/src/proxy.rs | 108 ++++++++++++++++++++++++------------ proxy/src/proxy/tests.rs | 9 +-- 3 files changed, 121 insertions(+), 77 deletions(-) diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index 27950d3a20..fb53c663c8 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -1,6 +1,7 @@ use parking_lot::Mutex; use pq_proto::StartupMessageParams; use std::fmt; +use std::ops::ControlFlow; use std::{collections::HashMap, sync::Arc}; use tokio::time; @@ -9,8 +10,7 @@ use crate::{auth, console}; use super::sql_over_http::MAX_RESPONSE_SIZE; -use crate::proxy::try_wake; -use crate::proxy::{BASE_RETRY_WAIT_DURATION, NUM_RETRIES_WAKE_COMPUTE}; +use crate::proxy::{invalidate_cache, retry_after, try_wake, NUM_RETRIES_WAKE_COMPUTE}; use tracing::error; use tracing::info; @@ -184,11 +184,10 @@ impl GlobalConnPool { } } -// // Wake up the destination if needed. Code here is a bit involved because // we reuse the code from the usual proxy and we need to prepare few structures // that this code expects. -// +#[tracing::instrument(skip_all)] async fn connect_to_compute( config: &config::ProxyConfig, conn_info: &ConnInfo, @@ -220,54 +219,72 @@ async fn connect_to_compute( let node_info = &mut creds.wake_compute(&extra).await?.expect("msg"); - // This code is a copy of `connect_to_compute` from `src/proxy.rs` with - // the difference that it uses `tokio_postgres` for the connection. let mut num_retries = 0; - let mut should_wake = true; + let mut wait_duration = time::Duration::ZERO; + let mut should_wake_with_error = None; loop { + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; + } + + // try wake the compute node if we have determined it's sensible to do so + if let Some(err) = should_wake_with_error.take() { + match try_wake(node_info, &extra, &creds).await { + // we can't wake up the compute node + Ok(None) => return Err(err), + // there was an error communicating with the control plane + Err(e) => return Err(e.into()), + // failed to wake up but we can continue to retry + Ok(Some(ControlFlow::Continue(()))) => { + wait_duration = retry_after(num_retries); + should_wake_with_error = Some(err); + + num_retries += 1; + info!(num_retries, "retrying wake compute"); + continue; + } + // successfully woke up a compute node and can break the wakeup loop + Ok(Some(ControlFlow::Break(()))) => {} + } + } + match connect_to_compute_once(node_info, conn_info).await { - Err(e) if num_retries == NUM_RETRIES_WAKE_COMPUTE => { - if let Some(wait_duration) = retry_connect_in(&e, num_retries) { - error!(error = ?e, "could not connect to compute node"); - if should_wake { - match try_wake(node_info, &extra, &creds).await { - Ok(Some(x)) => should_wake = x, - Ok(None) => return Err(e.into()), - Err(e) => return Err(e.into()), - } - } - if !wait_duration.is_zero() { - time::sleep(wait_duration).await; - } - } else { + Ok(res) => return Ok(res), + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + if !can_retry_error(&e, num_retries) { return Err(e.into()); } + wait_duration = retry_after(num_retries); + + // after the first connect failure, + // we should invalidate the cache and wake up a new compute node + if num_retries == 0 { + invalidate_cache(node_info); + should_wake_with_error = Some(e.into()); + } } - other => return Ok(other?), } num_retries += 1; - info!(retries_left = num_retries, "retrying connect"); + info!(num_retries, "retrying connect"); } } -fn retry_connect_in(err: &tokio_postgres::Error, num_retries: u32) -> Option { +fn can_retry_error(err: &tokio_postgres::Error, num_retries: u32) -> bool { use tokio_postgres::error::SqlState; match err.code() { - // retry all errors at least once immediately - _ if num_retries == 0 => Some(time::Duration::ZERO), - // keep retrying connection errors every 100ms + // retry all errors at least once + _ if num_retries == 0 => true, + // keep retrying connection errors Some( &SqlState::CONNECTION_FAILURE | &SqlState::CONNECTION_EXCEPTION | &SqlState::CONNECTION_DOES_NOT_EXIST | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, - ) => { - // 3/2 = 1.5 which seems to be an ok growth factor heuristic - Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)) - } + ) if num_retries < NUM_RETRIES_WAKE_COMPUTE => true, // otherwise, don't retry - _ => None, + _ => false, } } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 5c5353a63e..12ca9c5187 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -20,7 +20,7 @@ use hyper::StatusCode; use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; -use std::sync::Arc; +use std::{ops::ControlFlow, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt}, time, @@ -32,7 +32,7 @@ use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. /// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10; -pub const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100); +const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100); const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; @@ -335,11 +335,37 @@ async fn connect_to_compute( creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, ) -> Result { let mut num_retries = 0; - let mut should_wake = true; + let mut wait_duration = time::Duration::ZERO; + let mut should_wake_with_error = None; loop { // Apply startup params to the (possibly, cached) compute node info. node_info.config.set_startup_params(params); + if !wait_duration.is_zero() { + time::sleep(wait_duration).await; + } + + // try wake the compute node if we have determined it's sensible to do so + if let Some(err) = should_wake_with_error.take() { + match try_wake(node_info, extra, creds).await { + // we can't wake up the compute node + Ok(None) => return Err(err), + // there was an error communicating with the control plane + Err(e) => return Err(io_error(e).into()), + // failed to wake up but we can continue to retry + Ok(Some(ControlFlow::Continue(()))) => { + wait_duration = retry_after(num_retries); + should_wake_with_error = Some(err); + + num_retries += 1; + info!(num_retries, "retrying wake compute"); + continue; + } + // successfully woke up a compute node and can break the wakeup loop + Ok(Some(ControlFlow::Break(()))) => {} + } + } + // Set a shorter timeout for the initial connection attempt. // // In case we try to connect to an outdated address that is no longer valid, the @@ -359,31 +385,29 @@ async fn connect_to_compute( time::Duration::from_secs(10) }; + // do this again to ensure we have username? + node_info.config.set_startup_params(params); + match connect_to_compute_once(node_info, timeout).await { - Err(e) if num_retries < NUM_RETRIES_WAKE_COMPUTE => { - if let Some(wait_duration) = retry_connect_in(&e, num_retries) { - error!(error = ?e, "could not connect to compute node"); - if should_wake { - match try_wake(node_info, extra, creds).await { - Ok(Some(x)) => { - should_wake = x; - } - Ok(None) => return Err(e), - Err(e) => return Err(io_error(e).into()), - } - } - if !wait_duration.is_zero() { - time::sleep(wait_duration).await; - } - } else { + Ok(res) => return Ok(res), + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + if !can_retry_error(&e, num_retries) { return Err(e); } + wait_duration = retry_after(num_retries); + + // after the first connect failure, + // we should invalidate the cache and wake up a new compute node + if num_retries == 0 { + invalidate_cache(node_info); + should_wake_with_error = Some(e); + } } - other => return other, } num_retries += 1; - info!(retries_left = num_retries, "retrying connect"); + info!(num_retries, "retrying connect"); } } @@ -395,41 +419,51 @@ pub async fn try_wake( node_info: &mut console::CachedNodeInfo, extra: &console::ConsoleReqExtra<'_>, creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, -) -> Result, WakeComputeError> { +) -> Result>, WakeComputeError> { info!("compute node's state has likely changed; requesting a wake-up"); - invalidate_cache(node_info); match creds.wake_compute(extra).await { // retry wake if the compute was in an invalid state Err(WakeComputeError::ApiError(ApiError::Console { status: StatusCode::BAD_REQUEST, .. - })) => Ok(Some(true)), + })) => Ok(Some(ControlFlow::Continue(()))), // Update `node_info` and try again. Ok(Some(mut new)) => { new.config.reuse_password(&node_info.config); *node_info = new; - Ok(Some(false)) + Ok(Some(ControlFlow::Break(()))) } Err(e) => Err(e), Ok(None) => Ok(None), } } -fn retry_connect_in(err: &compute::ConnectionError, num_retries: u32) -> Option { +fn can_retry_error(err: &compute::ConnectionError, num_retries: u32) -> bool { use std::io::ErrorKind; match err { - // retry all errors at least once immediately - _ if num_retries == 0 => Some(time::Duration::ZERO), - // keep retrying connection errors every 100ms - compute::ConnectionError::CouldNotConnect(io_err) => match io_err.kind() { - ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable => { - // 3/2 = 1.5 which seems to be an ok growth factor heuristic - Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)) - } - _ => None, - }, + // retry all errors at least once + _ if num_retries == 0 => true, + // keep retrying connection errors + compute::ConnectionError::CouldNotConnect(io_err) + if num_retries < NUM_RETRIES_WAKE_COMPUTE => + { + matches!( + io_err.kind(), + ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable + ) + } // otherwise, don't retry - _ => None, + _ => false, + } +} + +pub fn retry_after(num_retries: u32) -> time::Duration { + match num_retries { + 0 => time::Duration::ZERO, + _ => { + // 3/2 = 1.5 which seems to be an ok growth factor heuristic + BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries) + } } } diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index a1f6cd3ed4..b9215cd90e 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -1,6 +1,4 @@ //! A group of high-level tests for connection establishing logic and auth. -use std::io; - use super::*; use crate::{auth, sasl, scram}; use async_trait::async_trait; @@ -299,14 +297,9 @@ async fn scram_auth_mock() -> anyhow::Result<()> { #[test] fn connect_compute_total_wait() { - let err = compute::ConnectionError::CouldNotConnect(io::Error::new( - io::ErrorKind::ConnectionRefused, - "conn refused", - )); - let mut total_wait = tokio::time::Duration::ZERO; for num_retries in 0..10 { - total_wait += retry_connect_in(&err, num_retries).unwrap(); + total_wait += retry_after(num_retries); } assert!(total_wait < tokio::time::Duration::from_secs(12)); assert!(total_wait > tokio::time::Duration::from_secs(10));