mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 11:30:37 +00:00
proxy: div by zero (#4845)
## Problem 1. In the CacheInvalid state loop, we weren't checking the `num_retries`. If this managed to get up to `32`, the retry_after procedure would compute 2^32 which would overflow to 0 and trigger a div by zero 2. When fixing the above, I started working on a flow diagram for the state machine logic and realised it was more complex than it had to be: a. We start in a `Cached` state b. `Cached`: call `connect_once`. After the first connect_once error, we always move to the `CacheInvalid` state, otherwise, we return the connection. c. `CacheInvalid`: we attempt to `wake_compute` and we either switch to Cached or we retry this step (or we error). d. `Cached`: call `connect_once`. We either retry this step or we have a connection (or we error) - After num_retries > 1 we never switch back to `CacheInvalid`. ## Summary of changes 1. Insert a `num_retries` check in the `handle_try_wake` procedure. Also using floats in the retry_after procedure to prevent the overflow entirely 2. Refactor connect_to_compute to be more linear in design.
This commit is contained in:
@@ -5,7 +5,7 @@ use crate::{
|
||||
auth::{self, AuthFlow, ClientCredentials},
|
||||
compute,
|
||||
console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra},
|
||||
proxy::{try_wake, NUM_RETRIES_CONNECT},
|
||||
proxy::handle_try_wake,
|
||||
sasl, scram,
|
||||
stream::PqStream,
|
||||
};
|
||||
@@ -51,14 +51,15 @@ pub(super) async fn authenticate(
|
||||
}
|
||||
};
|
||||
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let mut num_retries = 0;
|
||||
let mut node = loop {
|
||||
num_retries += 1;
|
||||
match try_wake(api, extra, creds).await? {
|
||||
let wake_res = api.wake_compute(extra, creds).await;
|
||||
match handle_try_wake(wake_res, num_retries)? {
|
||||
ControlFlow::Continue(_) => num_retries += 1,
|
||||
ControlFlow::Break(n) => break n,
|
||||
ControlFlow::Continue(_) if num_retries < NUM_RETRIES_CONNECT => continue,
|
||||
ControlFlow::Continue(e) => return Err(e.into()),
|
||||
}
|
||||
info!(num_retries, "retrying wake compute");
|
||||
};
|
||||
if let Some(keys) = scram_keys {
|
||||
use tokio_postgres::config::AuthKeys;
|
||||
|
||||
@@ -347,11 +347,6 @@ async fn connect_to_compute_once(
|
||||
.await
|
||||
}
|
||||
|
||||
enum ConnectionState<E> {
|
||||
Cached(console::CachedNodeInfo),
|
||||
Invalid(compute::ConnCfg, E),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ConnectMechanism {
|
||||
type Connection;
|
||||
@@ -407,70 +402,67 @@ where
|
||||
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
|
||||
let mut num_retries = 0;
|
||||
let mut state = ConnectionState::<M::ConnectError>::Cached(node_info);
|
||||
// try once
|
||||
let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
(invalidate_cache(node_info), e)
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
match state {
|
||||
ConnectionState::Invalid(config, err) => {
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let mut num_retries = 1;
|
||||
|
||||
let wake_res = match creds {
|
||||
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
|
||||
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => return Err(err.into()),
|
||||
// test backend
|
||||
auth::BackendType::Test(x) => x.wake_compute(),
|
||||
};
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let node_info = loop {
|
||||
let wake_res = match creds {
|
||||
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
|
||||
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => return Err(err.into()),
|
||||
// test backend
|
||||
auth::BackendType::Test(x) => x.wake_compute(),
|
||||
};
|
||||
|
||||
match handle_try_wake(wake_res) {
|
||||
// there was an error communicating with the control plane
|
||||
Err(e) => return Err(e.into()),
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(ControlFlow::Continue(_)) => {
|
||||
state = ConnectionState::Invalid(config, err);
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
info!(num_retries, "retrying wake compute");
|
||||
time::sleep(wait_duration).await;
|
||||
continue;
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(ControlFlow::Break(mut node_info)) => {
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
state = ConnectionState::Cached(node_info)
|
||||
}
|
||||
}
|
||||
match handle_try_wake(wake_res, num_retries)? {
|
||||
// failed to wake up but we can continue to retry
|
||||
ControlFlow::Continue(_) => {}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
ControlFlow::Break(mut node_info) => {
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
break node_info;
|
||||
}
|
||||
ConnectionState::Cached(node_info) => {
|
||||
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !e.should_retry(num_retries) {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
|
||||
// after the first connect failure,
|
||||
// we should invalidate the cache and wake up a new compute node
|
||||
if num_retries == 0 {
|
||||
state = ConnectionState::Invalid(invalidate_cache(node_info), e);
|
||||
} else {
|
||||
state = ConnectionState::Cached(node_info);
|
||||
}
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
time::sleep(wait_duration).await;
|
||||
info!(num_retries, "retrying wake compute");
|
||||
};
|
||||
|
||||
info!(num_retries, "retrying wake compute");
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
// this can error for a few reasons, for instance:
|
||||
// * DNS connection settings haven't quite propagated yet
|
||||
info!("wake_compute success. attempting to connect");
|
||||
loop {
|
||||
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !e.should_retry(num_retries) {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
time::sleep(wait_duration).await;
|
||||
info!(num_retries, "retrying connect_once");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,12 +470,15 @@ where
|
||||
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
|
||||
/// * Returns Ok(Break(node)) if the wakeup succeeded
|
||||
/// * Returns Err(e) if there was an error
|
||||
fn handle_try_wake(
|
||||
pub fn handle_try_wake(
|
||||
result: Result<console::CachedNodeInfo, WakeComputeError>,
|
||||
num_retries: u32,
|
||||
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
|
||||
match result {
|
||||
Err(err) => match &err {
|
||||
WakeComputeError::ApiError(api) if api.could_retry() => Ok(ControlFlow::Continue(err)),
|
||||
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
|
||||
Ok(ControlFlow::Continue(err))
|
||||
}
|
||||
_ => Err(err),
|
||||
},
|
||||
// Ready to try again.
|
||||
@@ -491,22 +486,10 @@ fn handle_try_wake(
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to wake up the compute node.
|
||||
pub async fn try_wake(
|
||||
api: &impl console::Api,
|
||||
extra: &console::ConsoleReqExtra<'_>,
|
||||
creds: &auth::ClientCredentials<'_>,
|
||||
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
handle_try_wake(api.wake_compute(extra, creds).await)
|
||||
}
|
||||
|
||||
pub trait ShouldRetry {
|
||||
fn could_retry(&self) -> bool;
|
||||
fn should_retry(&self, num_retries: u32) -> bool {
|
||||
match self {
|
||||
// retry all errors at least once
|
||||
_ if num_retries == 0 => true,
|
||||
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
||||
err => err.could_retry(),
|
||||
}
|
||||
@@ -558,14 +541,9 @@ impl ShouldRetry for compute::ConnectionError {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||
match num_retries {
|
||||
0 => time::Duration::ZERO,
|
||||
_ => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)
|
||||
}
|
||||
}
|
||||
fn retry_after(num_retries: u32) -> time::Duration {
|
||||
// 1.5 seems to be an ok growth factor heuristic
|
||||
BASE_RETRY_WAIT_DURATION.mul_f64(1.5_f64.powi(num_retries as i32))
|
||||
}
|
||||
|
||||
/// Finish client connection initialization: confirm auth success, send params, etc.
|
||||
|
||||
@@ -302,7 +302,7 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
|
||||
#[test]
|
||||
fn connect_compute_total_wait() {
|
||||
let mut total_wait = tokio::time::Duration::ZERO;
|
||||
for num_retries in 0..10 {
|
||||
for num_retries in 1..10 {
|
||||
total_wait += retry_after(num_retries);
|
||||
}
|
||||
assert!(total_wait < tokio::time::Duration::from_secs(12));
|
||||
|
||||
Reference in New Issue
Block a user