From eb7860312179ce8a9275dc50b90626747bfe35e5 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 31 Jul 2023 14:30:24 +0100 Subject: [PATCH] proxy: div by zero (#4845) ## Problem 1. In the CacheInvalid state loop, we weren't checking the `num_retries`. If this managed to get up to `32`, the retry_after procedure would compute 2^32 which would overflow to 0 and trigger a div by zero 2. When fixing the above, I started working on a flow diagram for the state machine logic and realised it was more complex than it had to be: a. We start in a `Cached` state b. `Cached`: call `connect_once`. After the first connect_once error, we always move to the `CacheInvalid` state, otherwise, we return the connection. c. `CacheInvalid`: we attempt to `wake_compute` and we either switch to Cached or we retry this step (or we error). d. `Cached`: call `connect_once`. We either retry this step or we have a connection (or we error) - After num_retries > 1 we never switch back to `CacheInvalid`. ## Summary of changes 1. Insert a `num_retries` check in the `handle_try_wake` procedure. Also using floats in the retry_after procedure to prevent the overflow entirely 2. Refactor connect_to_compute to be more linear in design. --- proxy/src/auth/backend/classic.rs | 11 +-- proxy/src/proxy.rs | 140 +++++++++++++----------------- proxy/src/proxy/tests.rs | 2 +- 3 files changed, 66 insertions(+), 87 deletions(-) diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index 4d20b9bf90..5ed0f747c2 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -5,7 +5,7 @@ use crate::{ auth::{self, AuthFlow, ClientCredentials}, compute, console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra}, - proxy::{try_wake, NUM_RETRIES_CONNECT}, + proxy::handle_try_wake, sasl, scram, stream::PqStream, }; @@ -51,14 +51,15 @@ pub(super) async fn authenticate( } }; + info!("compute node's state has likely changed; requesting a wake-up"); let mut num_retries = 0; let mut node = loop { - num_retries += 1; - match try_wake(api, extra, creds).await? { + let wake_res = api.wake_compute(extra, creds).await; + match handle_try_wake(wake_res, num_retries)? { + ControlFlow::Continue(_) => num_retries += 1, ControlFlow::Break(n) => break n, - ControlFlow::Continue(_) if num_retries < NUM_RETRIES_CONNECT => continue, - ControlFlow::Continue(e) => return Err(e.into()), } + info!(num_retries, "retrying wake compute"); }; if let Some(keys) = scram_keys { use tokio_postgres::config::AuthKeys; diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 6a0a65578c..5f59957b2d 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -347,11 +347,6 @@ async fn connect_to_compute_once( .await } -enum ConnectionState { - Cached(console::CachedNodeInfo), - Invalid(compute::ConnCfg, E), -} - #[async_trait] pub trait ConnectMechanism { type Connection; @@ -407,70 +402,67 @@ where mechanism.update_connect_config(&mut node_info.config); - let mut num_retries = 0; - let mut state = ConnectionState::::Cached(node_info); + // try once + let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { + Ok(res) => return Ok(res), + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + (invalidate_cache(node_info), e) + } + }; - loop { - match state { - ConnectionState::Invalid(config, err) => { - info!("compute node's state has likely changed; requesting a wake-up"); + let mut num_retries = 1; - let wake_res = match creds { - auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await, - auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await, - // nothing to do? - auth::BackendType::Link(_) => return Err(err.into()), - // test backend - auth::BackendType::Test(x) => x.wake_compute(), - }; + // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node + info!("compute node's state has likely changed; requesting a wake-up"); + let node_info = loop { + let wake_res = match creds { + auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await, + auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await, + // nothing to do? + auth::BackendType::Link(_) => return Err(err.into()), + // test backend + auth::BackendType::Test(x) => x.wake_compute(), + }; - match handle_try_wake(wake_res) { - // there was an error communicating with the control plane - Err(e) => return Err(e.into()), - // failed to wake up but we can continue to retry - Ok(ControlFlow::Continue(_)) => { - state = ConnectionState::Invalid(config, err); - let wait_duration = retry_after(num_retries); - num_retries += 1; - - info!(num_retries, "retrying wake compute"); - time::sleep(wait_duration).await; - continue; - } - // successfully woke up a compute node and can break the wakeup loop - Ok(ControlFlow::Break(mut node_info)) => { - node_info.config.reuse_password(&config); - mechanism.update_connect_config(&mut node_info.config); - state = ConnectionState::Cached(node_info) - } - } + match handle_try_wake(wake_res, num_retries)? { + // failed to wake up but we can continue to retry + ControlFlow::Continue(_) => {} + // successfully woke up a compute node and can break the wakeup loop + ControlFlow::Break(mut node_info) => { + node_info.config.reuse_password(&config); + mechanism.update_connect_config(&mut node_info.config); + break node_info; } - ConnectionState::Cached(node_info) => { - match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { - Ok(res) => return Ok(res), - Err(e) => { - error!(error = ?e, "could not connect to compute node"); - if !e.should_retry(num_retries) { - return Err(e.into()); - } + } - // after the first connect failure, - // we should invalidate the cache and wake up a new compute node - if num_retries == 0 { - state = ConnectionState::Invalid(invalidate_cache(node_info), e); - } else { - state = ConnectionState::Cached(node_info); - } + let wait_duration = retry_after(num_retries); + num_retries += 1; - let wait_duration = retry_after(num_retries); - num_retries += 1; + time::sleep(wait_duration).await; + info!(num_retries, "retrying wake compute"); + }; - info!(num_retries, "retrying wake compute"); - time::sleep(wait_duration).await; - } + // now that we have a new node, try connect to it repeatedly. + // this can error for a few reasons, for instance: + // * DNS connection settings haven't quite propagated yet + info!("wake_compute success. attempting to connect"); + loop { + match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { + Ok(res) => return Ok(res), + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + if !e.should_retry(num_retries) { + return Err(e.into()); } } } + + let wait_duration = retry_after(num_retries); + num_retries += 1; + + time::sleep(wait_duration).await; + info!(num_retries, "retrying connect_once"); } } @@ -478,12 +470,15 @@ where /// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable /// * Returns Ok(Break(node)) if the wakeup succeeded /// * Returns Err(e) if there was an error -fn handle_try_wake( +pub fn handle_try_wake( result: Result, + num_retries: u32, ) -> Result, WakeComputeError> { match result { Err(err) => match &err { - WakeComputeError::ApiError(api) if api.could_retry() => Ok(ControlFlow::Continue(err)), + WakeComputeError::ApiError(api) if api.should_retry(num_retries) => { + Ok(ControlFlow::Continue(err)) + } _ => Err(err), }, // Ready to try again. @@ -491,22 +486,10 @@ fn handle_try_wake( } } -/// Attempts to wake up the compute node. -pub async fn try_wake( - api: &impl console::Api, - extra: &console::ConsoleReqExtra<'_>, - creds: &auth::ClientCredentials<'_>, -) -> Result, WakeComputeError> { - info!("compute node's state has likely changed; requesting a wake-up"); - handle_try_wake(api.wake_compute(extra, creds).await) -} - pub trait ShouldRetry { fn could_retry(&self) -> bool; fn should_retry(&self, num_retries: u32) -> bool { match self { - // retry all errors at least once - _ if num_retries == 0 => true, _ if num_retries >= NUM_RETRIES_CONNECT => false, err => err.could_retry(), } @@ -558,14 +541,9 @@ impl ShouldRetry for compute::ConnectionError { } } -pub fn retry_after(num_retries: u32) -> time::Duration { - match num_retries { - 0 => time::Duration::ZERO, - _ => { - // 3/2 = 1.5 which seems to be an ok growth factor heuristic - BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries) - } - } +fn retry_after(num_retries: u32) -> time::Duration { + // 1.5 seems to be an ok growth factor heuristic + BASE_RETRY_WAIT_DURATION.mul_f64(1.5_f64.powi(num_retries as i32)) } /// Finish client connection initialization: confirm auth success, send params, etc. diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 27e14d442c..5653ec94dc 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -302,7 +302,7 @@ async fn scram_auth_mock() -> anyhow::Result<()> { #[test] fn connect_compute_total_wait() { let mut total_wait = tokio::time::Duration::ZERO; - for num_retries in 0..10 { + for num_retries in 1..10 { total_wait += retry_after(num_retries); } assert!(total_wait < tokio::time::Duration::from_secs(12));