From 6a5650d40c82496ea5d3fc7b870cf0e6e130e91f Mon Sep 17 00:00:00 2001 From: Anna Khanova <32508607+khanova@users.noreply.github.com> Date: Mon, 22 Apr 2024 13:37:22 +0200 Subject: [PATCH] proxy: Make retries configurable and record it. (#7438) ## Problem Currently we cannot configure retries, also, we don't really have visibility of what's going on there. ## Summary of changes * Added cli params * Improved logging * Decrease the number of retries: it feels like most of retries doesn't help. Once there would be better errors handling, we can increase it back. --- proxy/src/bin/proxy.rs | 10 ++++ proxy/src/config.rs | 55 ++++++++++++++++++ proxy/src/metrics.rs | 17 ++++++ proxy/src/proxy.rs | 2 + proxy/src/proxy/connect_compute.rs | 40 ++++++++++++-- proxy/src/proxy/retry.rs | 18 +++--- proxy/src/proxy/tests.rs | 89 +++++++++++++++++++++++------- proxy/src/proxy/wake_compute.rs | 36 ++++++++++-- proxy/src/serverless/backend.rs | 2 + 9 files changed, 226 insertions(+), 43 deletions(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index b54f8c131c..7df320fd42 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -200,6 +200,12 @@ struct ProxyCliArgs { /// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression. #[clap(long, default_value = "4194304")] metric_backup_collection_chunk_size: usize, + /// Whether to retry the connection to the compute node + #[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)] + connect_to_compute_retry: String, + /// Whether to retry the wake_compute request + #[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)] + wake_compute_retry: String, } #[derive(clap::Args, Clone, Copy, Debug)] @@ -584,6 +590,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { handshake_timeout: args.handshake_timeout, region: args.region.clone(), aws_region: args.aws_region.clone(), + wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?, + connect_to_compute_retry_config: config::RetryConfig::parse( + &args.connect_to_compute_retry, + )?, })); Ok(config) diff --git a/proxy/src/config.rs b/proxy/src/config.rs index f9519c7645..ae7606e5d4 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -33,6 +33,8 @@ pub struct ProxyConfig { pub region: String, pub handshake_timeout: Duration, pub aws_region: String, + pub wake_compute_retry_config: RetryConfig, + pub connect_to_compute_retry_config: RetryConfig, } #[derive(Debug)] @@ -517,6 +519,59 @@ impl FromStr for ProjectInfoCacheOptions { } } +/// This is a config for connect to compute and wake compute. +#[derive(Clone, Copy, Debug)] +pub struct RetryConfig { + /// Number of times we should retry. + pub max_retries: u32, + /// Retry duration is base_delay * backoff_factor ^ n, where n starts at 0 + pub base_delay: tokio::time::Duration, + /// Exponential base for retry wait duration + pub backoff_factor: f64, +} + +impl RetryConfig { + /// Default options for RetryConfig. + + /// Total delay for 4 retries with 1s base delay and 2.0 backoff factor is 7s. + pub const CONNECT_TO_COMPUTE_DEFAULT_VALUES: &'static str = + "num_retries=4,base_retry_wait_duration=1s,retry_wait_exponent_base=2.0"; + /// Total delay for 4 retries with 1s base delay and 2.0 backoff factor is 7s. + /// Cplane has timeout of 60s on each request. + pub const WAKE_COMPUTE_DEFAULT_VALUES: &'static str = + "num_retries=4,base_retry_wait_duration=1s,retry_wait_exponent_base=2.0"; + + /// Parse retry options passed via cmdline. + /// Example: [`Self::CONNECT_TO_COMPUTE_DEFAULT_VALUES`]. + pub fn parse(options: &str) -> anyhow::Result { + let mut num_retries = None; + let mut base_retry_wait_duration = None; + let mut retry_wait_exponent_base = None; + + for option in options.split(',') { + let (key, value) = option + .split_once('=') + .with_context(|| format!("bad key-value pair: {option}"))?; + + match key { + "num_retries" => num_retries = Some(value.parse()?), + "base_retry_wait_duration" => { + base_retry_wait_duration = Some(humantime::parse_duration(value)?) + } + "retry_wait_exponent_base" => retry_wait_exponent_base = Some(value.parse()?), + unknown => bail!("unknown key: {unknown}"), + } + } + + Ok(Self { + max_retries: num_retries.context("missing `num_retries`")?, + base_delay: base_retry_wait_duration.context("missing `base_retry_wait_duration`")?, + backoff_factor: retry_wait_exponent_base + .context("missing `retry_wait_exponent_base`")?, + }) + } +} + /// Helper for cmdline cache options parsing. pub struct WakeComputeLockOptions { /// The number of shards the lock map should have diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 3a4e54aea0..530350008c 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -119,6 +119,10 @@ pub struct ProxyMetrics { /// Number of invalid endpoints (per protocol, per rejected). pub invalid_endpoints_total: CounterVec, + + /// Number of retries (per outcome, per retry_type). + #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))] + pub retries_metric: HistogramVec, } #[derive(MetricGroup)] @@ -480,3 +484,16 @@ pub struct InvalidEndpointsGroup { pub rejected: Bool, pub outcome: ConnectOutcome, } + +#[derive(LabelGroup)] +#[label(set = RetriesMetricSet)] +pub struct RetriesMetricGroup { + pub outcome: ConnectOutcome, + pub retry_type: RetryType, +} + +#[derive(FixedCardinalityLabel, Clone, Copy, Debug)] +pub enum RetryType { + WakeCompute, + ConnectToCompute, +} diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 4321bad968..a4554eef38 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -308,6 +308,8 @@ pub async fn handle_client( &TcpMechanism { params: ¶ms }, &user_info, mode.allow_self_signed_compute(config), + config.wake_compute_retry_config, + config.connect_to_compute_retry_config, ) .or_else(|e| stream.throw_error(e)) .await?; diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index 33f394c550..8a220aaa0c 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -1,10 +1,11 @@ use crate::{ auth::backend::ComputeCredentialKeys, compute::{self, PostgresConnection}, + config::RetryConfig, console::{self, errors::WakeComputeError, CachedNodeInfo, NodeInfo}, context::RequestMonitoring, error::ReportableError, - metrics::{ConnectionFailureKind, Metrics}, + metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType}, proxy::{ retry::{retry_after, ShouldRetry}, wake_compute::wake_compute, @@ -93,19 +94,23 @@ pub async fn connect_to_compute( mechanism: &M, user_info: &B, allow_self_signed_compute: bool, + wake_compute_retry_config: RetryConfig, + connect_to_compute_retry_config: RetryConfig, ) -> Result where M::ConnectError: ShouldRetry + std::fmt::Debug, M::Error: From, { let mut num_retries = 0; - let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?; + let mut node_info = + wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; if let Some(keys) = user_info.get_keys() { node_info.set_keys(keys); } node_info.allow_self_signed_compute = allow_self_signed_compute; // let mut node_info = credentials.get_node_info(ctx, user_info).await?; mechanism.update_connect_config(&mut node_info.config); + let retry_type = RetryType::ConnectToCompute; // try once let err = match mechanism @@ -114,6 +119,13 @@ where { Ok(res) => { ctx.latency_timer.success(); + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Success, + retry_type, + }, + num_retries.into(), + ); return Ok(res); } Err(e) => e, @@ -124,7 +136,7 @@ where let node_info = if !node_info.cached() { // If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry. // Do not need to retrieve a new node_info, just return the old one. - if !err.should_retry(num_retries) { + if !err.should_retry(num_retries, connect_to_compute_retry_config) { return Err(err.into()); } node_info @@ -132,7 +144,8 @@ where // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node info!("compute node's state has likely changed; requesting a wake-up"); let old_node_info = invalidate_cache(node_info); - let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?; + let mut node_info = + wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; node_info.reuse_settings(old_node_info); mechanism.update_connect_config(&mut node_info.config); @@ -151,19 +164,34 @@ where { Ok(res) => { ctx.latency_timer.success(); + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Success, + retry_type, + }, + num_retries.into(), + ); + info!(?num_retries, "connected to compute node after"); return Ok(res); } Err(e) => { - let retriable = e.should_retry(num_retries); + let retriable = e.should_retry(num_retries, connect_to_compute_retry_config); if !retriable { error!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Failed, + retry_type, + }, + num_retries.into(), + ); return Err(e.into()); } warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); } } - let wait_duration = retry_after(num_retries); + let wait_duration = retry_after(num_retries, connect_to_compute_retry_config); num_retries += 1; time::sleep(wait_duration).await; diff --git a/proxy/src/proxy/retry.rs b/proxy/src/proxy/retry.rs index a85ed380b0..082e06caa3 100644 --- a/proxy/src/proxy/retry.rs +++ b/proxy/src/proxy/retry.rs @@ -1,18 +1,12 @@ -use crate::compute; +use crate::{compute, config::RetryConfig}; use std::{error::Error, io}; use tokio::time; -/// Number of times we should retry the `/proxy_wake_compute` http request. -/// Retry duration is BASE_RETRY_WAIT_DURATION * RETRY_WAIT_EXPONENT_BASE ^ n, where n starts at 0 -pub const NUM_RETRIES_CONNECT: u32 = 16; -const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(25); -const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; - pub trait ShouldRetry { fn could_retry(&self) -> bool; - fn should_retry(&self, num_retries: u32) -> bool { + fn should_retry(&self, num_retries: u32, config: RetryConfig) -> bool { match self { - _ if num_retries >= NUM_RETRIES_CONNECT => false, + _ if num_retries >= config.max_retries => false, err => err.could_retry(), } } @@ -63,6 +57,8 @@ impl ShouldRetry for compute::ConnectionError { } } -pub fn retry_after(num_retries: u32) -> time::Duration { - BASE_RETRY_WAIT_DURATION.mul_f64(RETRY_WAIT_EXPONENT_BASE.powi((num_retries as i32) - 1)) +pub fn retry_after(num_retries: u32, config: RetryConfig) -> time::Duration { + config + .base_delay + .mul_f64(config.backoff_factor.powi((num_retries as i32) - 1)) } diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 849e9bd33c..e0ec90cb44 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -10,13 +10,13 @@ use super::*; use crate::auth::backend::{ ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned, TestBackend, }; -use crate::config::CertResolver; +use crate::config::{CertResolver, RetryConfig}; use crate::console::caches::NodeInfoCache; use crate::console::messages::MetricsAuxInfo; use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend}; use crate::console::{self, CachedNodeInfo, NodeInfo}; use crate::error::ErrorKind; -use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT}; +use crate::proxy::retry::retry_after; use crate::{http, sasl, scram, BranchId, EndpointId, ProjectId}; use anyhow::{bail, Context}; use async_trait::async_trait; @@ -361,11 +361,15 @@ async fn scram_auth_mock() -> anyhow::Result<()> { #[test] fn connect_compute_total_wait() { let mut total_wait = tokio::time::Duration::ZERO; - for num_retries in 1..NUM_RETRIES_CONNECT { - total_wait += retry_after(num_retries); + let config = RetryConfig { + base_delay: Duration::from_secs(1), + max_retries: 5, + backoff_factor: 2.0, + }; + for num_retries in 1..config.max_retries { + total_wait += retry_after(num_retries, config); } - assert!(total_wait < tokio::time::Duration::from_secs(12)); - assert!(total_wait > tokio::time::Duration::from_secs(10)); + assert!(f64::abs(total_wait.as_secs_f64() - 15.0) < 0.1); } #[derive(Clone, Copy, Debug)] @@ -549,7 +553,12 @@ async fn connect_to_compute_success() { let mut ctx = RequestMonitoring::test(); let mechanism = TestConnectMechanism::new(vec![Wake, Connect]); let user_info = helper_create_connect_info(&mechanism); - connect_to_compute(&mut ctx, &mechanism, &user_info, false) + let config = RetryConfig { + base_delay: Duration::from_secs(1), + max_retries: 5, + backoff_factor: 2.0, + }; + connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config) .await .unwrap(); mechanism.verify(); @@ -562,7 +571,12 @@ async fn connect_to_compute_retry() { let mut ctx = RequestMonitoring::test(); let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]); let user_info = helper_create_connect_info(&mechanism); - connect_to_compute(&mut ctx, &mechanism, &user_info, false) + let config = RetryConfig { + base_delay: Duration::from_secs(1), + max_retries: 5, + backoff_factor: 2.0, + }; + connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config) .await .unwrap(); mechanism.verify(); @@ -576,7 +590,12 @@ async fn connect_to_compute_non_retry_1() { let mut ctx = RequestMonitoring::test(); let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]); let user_info = helper_create_connect_info(&mechanism); - connect_to_compute(&mut ctx, &mechanism, &user_info, false) + let config = RetryConfig { + base_delay: Duration::from_secs(1), + max_retries: 5, + backoff_factor: 2.0, + }; + connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config) .await .unwrap_err(); mechanism.verify(); @@ -590,7 +609,12 @@ async fn connect_to_compute_non_retry_2() { let mut ctx = RequestMonitoring::test(); let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]); let user_info = helper_create_connect_info(&mechanism); - connect_to_compute(&mut ctx, &mechanism, &user_info, false) + let config = RetryConfig { + base_delay: Duration::from_secs(1), + max_retries: 5, + backoff_factor: 2.0, + }; + connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config) .await .unwrap(); mechanism.verify(); @@ -600,17 +624,32 @@ async fn connect_to_compute_non_retry_2() { #[tokio::test] async fn connect_to_compute_non_retry_3() { let _ = env_logger::try_init(); - assert_eq!(NUM_RETRIES_CONNECT, 16); + tokio::time::pause(); use ConnectAction::*; let mut ctx = RequestMonitoring::test(); - let mechanism = TestConnectMechanism::new(vec![ - Wake, Retry, Wake, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, - Retry, Retry, Retry, Retry, Retry, /* the 17th time */ Retry, - ]); + let mechanism = + TestConnectMechanism::new(vec![Wake, Retry, Wake, Retry, Retry, Retry, Retry, Retry]); let user_info = helper_create_connect_info(&mechanism); - connect_to_compute(&mut ctx, &mechanism, &user_info, false) - .await - .unwrap_err(); + let wake_compute_retry_config = RetryConfig { + base_delay: Duration::from_secs(1), + max_retries: 1, + backoff_factor: 2.0, + }; + let connect_to_compute_retry_config = RetryConfig { + base_delay: Duration::from_secs(1), + max_retries: 5, + backoff_factor: 2.0, + }; + connect_to_compute( + &mut ctx, + &mechanism, + &user_info, + false, + wake_compute_retry_config, + connect_to_compute_retry_config, + ) + .await + .unwrap_err(); mechanism.verify(); } @@ -622,7 +661,12 @@ async fn wake_retry() { let mut ctx = RequestMonitoring::test(); let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]); let user_info = helper_create_connect_info(&mechanism); - connect_to_compute(&mut ctx, &mechanism, &user_info, false) + let config = RetryConfig { + base_delay: Duration::from_secs(1), + max_retries: 5, + backoff_factor: 2.0, + }; + connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config) .await .unwrap(); mechanism.verify(); @@ -636,7 +680,12 @@ async fn wake_non_retry() { let mut ctx = RequestMonitoring::test(); let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]); let user_info = helper_create_connect_info(&mechanism); - connect_to_compute(&mut ctx, &mechanism, &user_info, false) + let config = RetryConfig { + base_delay: Duration::from_secs(1), + max_retries: 5, + backoff_factor: 2.0, + }; + connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config) .await .unwrap_err(); mechanism.verify(); diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs index fe228ab33d..cfedf0e98a 100644 --- a/proxy/src/proxy/wake_compute.rs +++ b/proxy/src/proxy/wake_compute.rs @@ -1,10 +1,14 @@ +use crate::config::RetryConfig; use crate::console::{errors::WakeComputeError, provider::CachedNodeInfo}; use crate::context::RequestMonitoring; -use crate::metrics::{ConnectionFailuresBreakdownGroup, Metrics, WakeupFailureKind}; +use crate::metrics::{ + ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType, + WakeupFailureKind, +}; use crate::proxy::retry::retry_after; use hyper::StatusCode; use std::ops::ControlFlow; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use super::connect_compute::ComputeConnectBackend; use super::retry::ShouldRetry; @@ -13,23 +17,42 @@ pub async fn wake_compute( num_retries: &mut u32, ctx: &mut RequestMonitoring, api: &B, + config: RetryConfig, ) -> Result { + let retry_type = RetryType::WakeCompute; loop { let wake_res = api.wake_compute(ctx).await; - match handle_try_wake(wake_res, *num_retries) { + match handle_try_wake(wake_res, *num_retries, config) { Err(e) => { error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node"); report_error(&e, false); + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Failed, + retry_type, + }, + (*num_retries).into(), + ); return Err(e); } Ok(ControlFlow::Continue(e)) => { warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node"); report_error(&e, true); } - Ok(ControlFlow::Break(n)) => return Ok(n), + Ok(ControlFlow::Break(n)) => { + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Success, + retry_type, + }, + (*num_retries).into(), + ); + info!(?num_retries, "compute node woken up after"); + return Ok(n); + } } - let wait_duration = retry_after(*num_retries); + let wait_duration = retry_after(*num_retries, config); *num_retries += 1; tokio::time::sleep(wait_duration).await; } @@ -42,10 +65,11 @@ pub async fn wake_compute( pub fn handle_try_wake( result: Result, num_retries: u32, + config: RetryConfig, ) -> Result, WakeComputeError> { match result { Err(err) => match &err { - WakeComputeError::ApiError(api) if api.should_retry(num_retries) => { + WakeComputeError::ApiError(api) if api.should_retry(num_retries, config) => { Ok(ControlFlow::Continue(err)) } _ => Err(err), diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index e74c63599a..b91c0e62ed 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -108,6 +108,8 @@ impl PoolingBackend { }, &backend, false, // do not allow self signed compute for http flow + self.config.wake_compute_retry_config, + self.config.connect_to_compute_retry_config, ) .await }