diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index 6753e7ed7f..4d20b9bf90 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -1,8 +1,11 @@ +use std::ops::ControlFlow; + use super::AuthSuccess; use crate::{ auth::{self, AuthFlow, ClientCredentials}, compute, console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra}, + proxy::{try_wake, NUM_RETRIES_CONNECT}, sasl, scram, stream::PqStream, }; @@ -48,7 +51,15 @@ pub(super) async fn authenticate( } }; - let mut node = api.wake_compute(extra, creds).await?; + let mut num_retries = 0; + let mut node = loop { + num_retries += 1; + match try_wake(api, extra, creds).await? { + ControlFlow::Break(n) => break n, + ControlFlow::Continue(_) if num_retries < NUM_RETRIES_CONNECT => continue, + ControlFlow::Continue(e) => return Err(e.into()), + } + }; if let Some(keys) = scram_keys { use tokio_postgres::config::AuthKeys; node.config.auth_keys(AuthKeys::ScramSha256(keys)); diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 3eaed1b82b..37190c76b8 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -14,6 +14,7 @@ pub mod errors { use crate::{ error::{io_error, UserFacingError}, http, + proxy::ShouldRetry, }; use thiserror::Error; @@ -72,6 +73,24 @@ pub mod errors { } } + impl ShouldRetry for ApiError { + fn could_retry(&self) -> bool { + match self { + // retry some transport errors + Self::Transport(io) => io.could_retry(), + // retry some temporary failures because the compute was in a bad state + // (bad request can be returned when the endpoint was in transition) + Self::Console { + status: http::StatusCode::BAD_REQUEST | http::StatusCode::LOCKED, + .. + } => true, + // retry server errors + Self::Console { status, .. } if status.is_server_error() => true, + _ => false, + } + } + } + impl From for ApiError { fn from(e: reqwest::Error) -> Self { io_error(e).into() diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index e37e7dc44b..b0c5a41efb 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -6,17 +6,12 @@ use crate::{ cancellation::{self, CancelMap}, compute::{self, PostgresConnection}, config::{ProxyConfig, TlsConfig}, - console::{ - self, - errors::{ApiError, WakeComputeError}, - messages::MetricsAuxInfo, - }, + console::{self, errors::WakeComputeError, messages::MetricsAuxInfo}, stream::{PqStream, Stream}, }; use anyhow::{bail, Context}; use async_trait::async_trait; use futures::TryFutureExt; -use hyper::StatusCode; use metrics::{ exponential_buckets, register_histogram, register_int_counter_vec, Histogram, IntCounterVec, }; @@ -33,7 +28,7 @@ use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. /// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n -const NUM_RETRIES_CONNECT: u32 = 10; +pub const NUM_RETRIES_CONNECT: u32 = 10; const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2); const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100); @@ -418,13 +413,22 @@ where loop { match state { ConnectionState::Invalid(config, err) => { - match try_wake(&config, extra, creds).await { - // we can't wake up the compute node - Ok(None) => return Err(err.into()), + let wake_res = match creds { + auth::BackendType::Console(api, creds) => { + try_wake(api.as_ref(), extra, creds).await + } + auth::BackendType::Postgres(api, creds) => { + try_wake(api.as_ref(), extra, creds).await + } + // nothing to do? + auth::BackendType::Link(_) => return Err(err.into()), + }; + + match wake_res { // there was an error communicating with the control plane Err(e) => return Err(e.into()), // failed to wake up but we can continue to retry - Ok(Some(ControlFlow::Continue(()))) => { + Ok(ControlFlow::Continue(_)) => { state = ConnectionState::Invalid(config, err); let wait_duration = retry_after(num_retries); num_retries += 1; @@ -434,7 +438,8 @@ where continue; } // successfully woke up a compute node and can break the wakeup loop - Ok(Some(ControlFlow::Break(mut node_info))) => { + Ok(ControlFlow::Break(mut node_info)) => { + node_info.config.reuse_password(&config); mechanism.update_connect_config(&mut node_info.config); state = ConnectionState::Cached(node_info) } @@ -470,28 +475,22 @@ where } /// Attempts to wake up the compute node. -/// * Returns Ok(Some(true)) if there was an error waking but retries are acceptable -/// * Returns Ok(Some(false)) if the wakeup succeeded -/// * Returns Ok(None) or Err(e) if there was an error -async fn try_wake( - config: &compute::ConnCfg, +/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable +/// * Returns Ok(Break(node)) if the wakeup succeeded +/// * Returns Err(e) if there was an error +pub async fn try_wake( + api: &impl console::Api, extra: &console::ConsoleReqExtra<'_>, - creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, -) -> Result>, WakeComputeError> { + creds: &auth::ClientCredentials<'_>, +) -> Result, WakeComputeError> { info!("compute node's state has likely changed; requesting a wake-up"); - match creds.wake_compute(extra).await { - // retry wake if the compute was in an invalid state - Err(WakeComputeError::ApiError(ApiError::Console { - status: StatusCode::BAD_REQUEST, - .. - })) => Ok(Some(ControlFlow::Continue(()))), - // Update `node_info` and try again. - Ok(Some(mut new)) => { - new.config.reuse_password(config); - Ok(Some(ControlFlow::Break(new))) - } - Err(e) => Err(e), - Ok(None) => Ok(None), + match api.wake_compute(extra, creds).await { + Err(err) => match &err { + WakeComputeError::ApiError(api) if api.could_retry() => Ok(ControlFlow::Continue(err)), + _ => Err(err), + }, + // Ready to try again. + Ok(new) => Ok(ControlFlow::Break(new)), } }