diff --git a/proxy/src/cache/timed_lru.rs b/proxy/src/cache/timed_lru.rs index 7cfe5100ea..183e1ea449 100644 --- a/proxy/src/cache/timed_lru.rs +++ b/proxy/src/cache/timed_lru.rs @@ -30,7 +30,7 @@ use super::{Cache, timed_lru}; /// /// * There's an API for immediate invalidation (removal) of a cache entry; /// It's useful in case we know for sure that the entry is no longer correct. -/// See [`timed_lru::LookupInfo`] & [`timed_lru::Cached`] for more information. +/// See [`timed_lru::Cached`] for more information. /// /// * Expired entries are kept in the cache, until they are evicted by the LRU policy, /// or by a successful lookup (i.e. the entry hasn't expired yet). @@ -54,7 +54,7 @@ pub(crate) struct TimedLru { impl Cache for TimedLru { type Key = K; type Value = V; - type LookupInfo = LookupInfo; + type LookupInfo = Key; fn invalidate(&self, info: &Self::LookupInfo) { self.invalidate_raw(info); @@ -87,30 +87,24 @@ impl TimedLru { /// Drop an entry from the cache if it's outdated. #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)] - fn invalidate_raw(&self, info: &LookupInfo) { - let now = Instant::now(); - + fn invalidate_raw(&self, key: &K) { // Do costly things before taking the lock. let mut cache = self.cache.lock(); - let raw_entry = match cache.raw_entry_mut().from_key(&info.key) { + let entry = match cache.raw_entry_mut().from_key(key) { RawEntryMut::Vacant(_) => return, - RawEntryMut::Occupied(x) => x, + RawEntryMut::Occupied(x) => x.remove(), }; - - // Remove the entry if it was created prior to lookup timestamp. - let entry = raw_entry.get(); - let (created_at, expires_at) = (entry.created_at, entry.expires_at); - let should_remove = created_at <= info.created_at || expires_at <= now; - - if should_remove { - raw_entry.remove(); - } - drop(cache); // drop lock before logging + + let Entry { + created_at, + expires_at, + .. + } = entry; + debug!( - created_at = format_args!("{created_at:?}"), - expires_at = format_args!("{expires_at:?}"), - entry_removed = should_remove, + ?created_at, + ?expires_at, "processed a cache entry invalidation event" ); } @@ -211,10 +205,10 @@ impl TimedLru { } pub(crate) fn insert_unit(&self, key: K, value: V) -> (Option, Cached<&Self, ()>) { - let (created_at, old) = self.insert_raw(key.clone(), value); + let (_, old) = self.insert_raw(key.clone(), value); let cached = Cached { - token: Some((self, LookupInfo { created_at, key })), + token: Some((self, key)), value: (), }; @@ -229,28 +223,9 @@ impl TimedLru { K: Borrow + Clone, Q: Hash + Eq + ?Sized, { - self.get_raw(key, |key, entry| { - let info = LookupInfo { - created_at: entry.created_at, - key: key.clone(), - }; - - Cached { - token: Some((self, info)), - value: entry.value.clone(), - } + self.get_raw(key, |key, entry| Cached { + token: Some((self, key.clone())), + value: entry.value.clone(), }) } } - -/// Lookup information for key invalidation. -pub(crate) struct LookupInfo { - /// Time of creation of a cache [`Entry`]. - /// We use this during invalidation lookups to prevent eviction of a newer - /// entry sharing the same key (it might've been inserted by a different - /// task after we got the entry we're trying to invalidate now). - created_at: Instant, - - /// Search by this key. - key: K, -} diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index 7fb88e6a45..0a19090ce0 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -236,7 +236,7 @@ impl AuthInfo { &self, ctx: &RequestContext, compute: &mut ComputeConnection, - user_info: ComputeUserInfo, + user_info: &ComputeUserInfo, ) -> Result { // client config with stubbed connect info. // TODO(conrad): should we rewrite this to bypass tokio-postgres2 entirely, @@ -272,7 +272,7 @@ impl AuthInfo { secret_key, }, compute.hostname.to_string(), - user_info, + user_info.clone(), ); Ok(PostgresSettings { diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 112465a89b..d5903286a0 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -226,7 +226,7 @@ pub(crate) async fn handle_client( .await?; let pg_settings = auth_info - .authenticate(ctx, &mut node, user_info) + .authenticate(ctx, &mut node, &user_info) .or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) }) .await?; diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index aa675a439e..9f642f52ab 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -112,7 +112,7 @@ where let node_info = if !node_info.cached() || !err.should_retry_wake_compute() { // If we just recieved this from cplane and didn't get it from cache, we shouldn't retry. // Do not need to retrieve a new node_info, just return the old one. - if should_retry(&err, num_retries, compute.retry) { + if !should_retry(&err, num_retries, compute.retry) { Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 6b84e47982..d9c0585efb 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -18,9 +18,11 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, warn}; +use crate::cache::Cache; use crate::cancellation::{self, CancellationHandler}; use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; use crate::context::RequestContext; +use crate::control_plane::client::ControlPlaneClient; use crate::error::{ReportableError, UserFacingError}; use crate::metrics::{Metrics, NumClientConnectionsGuard}; pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute}; @@ -29,6 +31,7 @@ use crate::pglb::passthrough::ProxyPassthrough; use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams}; use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol}; use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute}; +use crate::proxy::retry::ShouldRetryWakeCompute; use crate::rate_limiter::EndpointRateLimiter; use crate::stream::{PqStream, Stream}; use crate::types::EndpointCacheKey; @@ -349,26 +352,56 @@ pub(crate) async fn handle_client( let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys); auth_info.set_startup_params(¶ms, params_compat); - let res = connect_to_compute( - ctx, - &TcpMechanism { - locks: &config.connect_compute_locks, - }, - &auth::Backend::ControlPlane(cplane, creds.info.clone()), - config.wake_compute_retry_config, - &config.connect_to_compute, - ) - .await; - - let mut node = match res { - Ok(node) => node, - Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, + let mut node; + let mut attempt = 0; + let connect = TcpMechanism { + locks: &config.connect_compute_locks, }; + let backend = auth::Backend::ControlPlane(cplane, creds.info); - let pg_settings = auth_info.authenticate(ctx, &mut node, creds.info).await; - let pg_settings = match pg_settings { - Ok(pg_settings) => pg_settings, - Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, + // NOTE: This is messy, but should hopefully be detangled with PGLB. + // We wanted to separate the concerns of **connect** to compute (a PGLB operation), + // from **authenticate** to compute (a NeonKeeper operation). + // + // This unfortunately removed retry handling for one error case where + // the compute was cached, and we connected, but the compute cache was actually stale + // and is associated with the wrong endpoint. We detect this when the **authentication** fails. + // As such, we retry once here if the `authenticate` function fails and the error is valid to retry. + let pg_settings = loop { + attempt += 1; + + let res = connect_to_compute( + ctx, + &connect, + &backend, + config.wake_compute_retry_config, + &config.connect_to_compute, + ) + .await; + + match res { + Ok(n) => node = n, + Err(e) => return Err(stream.throw_error(e, Some(ctx)).await)?, + } + + let auth::Backend::ControlPlane(cplane, user_info) = &backend else { + unreachable!("ensured above"); + }; + + let res = auth_info.authenticate(ctx, &mut node, user_info).await; + match res { + Ok(pg_settings) => break pg_settings, + Err(e) if attempt < 2 && e.should_retry_wake_compute() => { + tracing::warn!(error = ?e, "retrying wake compute"); + + #[allow(irrefutable_let_patterns)] + if let ControlPlaneClient::ProxyV1(cplane_proxy_v1) = &**cplane { + let key = user_info.endpoint_cache_key(); + cplane_proxy_v1.caches.node_info.invalidate(&key); + } + } + Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, + } }; let session = cancellation_handler.get_key(); diff --git a/proxy/src/proxy/retry.rs b/proxy/src/proxy/retry.rs index e9eca95724..b06c3be72c 100644 --- a/proxy/src/proxy/retry.rs +++ b/proxy/src/proxy/retry.rs @@ -3,7 +3,7 @@ use std::io; use tokio::time; -use crate::compute; +use crate::compute::{self, PostgresError}; use crate::config::RetryConfig; pub(crate) trait CouldRetry { @@ -115,6 +115,14 @@ impl ShouldRetryWakeCompute for compute::ConnectionError { } } +impl ShouldRetryWakeCompute for PostgresError { + fn should_retry_wake_compute(&self) -> bool { + match self { + PostgresError::Postgres(error) => error.should_retry_wake_compute(), + } + } +} + pub(crate) fn retry_after(num_retries: u32, config: RetryConfig) -> time::Duration { config .base_delay diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index 29a269208a..4f27496019 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -374,6 +374,7 @@ fn connect_compute_total_wait() { #[derive(Clone, Copy, Debug)] enum ConnectAction { Wake, + WakeCold, WakeFail, WakeRetry, Connect, @@ -504,6 +505,9 @@ impl TestControlPlaneClient for TestConnectMechanism { *counter += 1; match action { ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)), + ConnectAction::WakeCold => Ok(CachedNodeInfo::new_uncached( + helper_create_uncached_node_info(), + )), ConnectAction::WakeFail => { let err = control_plane::errors::ControlPlaneError::Message(Box::new( ControlPlaneErrorMessage { @@ -551,8 +555,8 @@ impl TestControlPlaneClient for TestConnectMechanism { } } -fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo { - let node = NodeInfo { +fn helper_create_uncached_node_info() -> NodeInfo { + NodeInfo { conn_info: compute::ConnectInfo { host: "test".into(), port: 5432, @@ -566,7 +570,11 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn compute_id: "compute".into(), cold_start_info: crate::control_plane::messages::ColdStartInfo::Warm, }, - }; + } +} + +fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo { + let node = helper_create_uncached_node_info(); let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone())); node2.map(|()| node) } @@ -742,7 +750,7 @@ async fn fail_no_wake_skips_cache_invalidation() { let ctx = RequestContext::test(); let mech = TestConnectMechanism::new(vec![ ConnectAction::Wake, - ConnectAction::FailNoWake, + ConnectAction::RetryNoWake, ConnectAction::Connect, ]); let user = helper_create_connect_info(&mech); @@ -788,7 +796,7 @@ async fn retry_no_wake_skips_invalidation() { let ctx = RequestContext::test(); // Wake → RetryNoWake (retryable + NOT wakeable) - let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake]); + let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake, Fail]); let user_info = helper_create_connect_info(&mechanism); let cfg = config(); @@ -802,3 +810,44 @@ async fn retry_no_wake_skips_invalidation() { "invalidating stalled compute node info cache entry" )); } + +#[tokio::test] +#[traced_test] +async fn retry_no_wake_error_fast() { + let _ = env_logger::try_init(); + use ConnectAction::*; + + let ctx = RequestContext::test(); + // Wake → FailNoWake (not retryable + NOT wakeable) + let mechanism = TestConnectMechanism::new(vec![Wake, FailNoWake]); + let user_info = helper_create_connect_info(&mechanism); + let cfg = config(); + + connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg) + .await + .unwrap_err(); + mechanism.verify(); + + // Because FailNoWake has wakeable=false, we must NOT see invalidate_cache + assert!(!logs_contain( + "invalidating stalled compute node info cache entry" + )); +} + +#[tokio::test] +#[traced_test] +async fn retry_cold_wake_skips_invalidation() { + let _ = env_logger::try_init(); + use ConnectAction::*; + + let ctx = RequestContext::test(); + // WakeCold → FailNoWake (not retryable + NOT wakeable) + let mechanism = TestConnectMechanism::new(vec![WakeCold, Retry, Connect]); + let user_info = helper_create_connect_info(&mechanism); + let cfg = config(); + + connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg) + .await + .unwrap(); + mechanism.verify(); +}