[proxy] fix connect_to_compute retry handling (#12351)

# Problem

In #12335 I moved the `authenticate` method outside of the
`connect_to_compute` loop. This triggered [e2e tests to become
flaky](https://github.com/neondatabase/cloud/pull/30533). This
highlighted an edge case we forgot to consider with that change.

When we connect to compute, the compute IP might be cached. This cache
hit might however be stale. Because we can't validate the IP is
associated with a specific compute-id☨, we will succeed the
connect_to_compute operation and fail when it comes to password
authentication☨☨. Before the change, we were invalidating the cache and
triggering wake_compute if the authentication failed.

Additionally, I noticed some faulty logic I introduced 1 year ago
https://github.com/neondatabase/neon/pull/8141/files#diff-5491e3afe62d8c5c77178149c665603b29d88d3ec2e47fc1b3bb119a0a970afaL145-R147

☨ We can when we roll out TLS, as the certificate common name includes
the compute-id.

☨☨ Technically password authentication could pass for the wrong compute,
but I think this would only happen in the very very rare event that the
IP got reused **and** the compute's endpoint happened to be a
branch/replica.

# Solution

1. Fix the broken logic
2. Simplify cache invalidation (I don't know why it was so convoluted)
3. Add a loop around connect_to_compute + authenticate to re-introduce
the wake_compute invalidation we accidentally removed.

I went with this approach to try and avoid interfering with
https://github.com/neondatabase/neon/compare/main...cloneable/proxy-pglb-connect-compute-split.
The changes made in commit 3 will move into `handle_client_request` I
suspect,
This commit is contained in:
Conrad Ludgate
2025-06-27 11:36:27 +01:00
committed by GitHub
parent 6fa1562b57
commit abc1efd5a6
7 changed files with 137 additions and 72 deletions

View File

@@ -30,7 +30,7 @@ use super::{Cache, timed_lru};
///
/// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct.
/// See [`timed_lru::LookupInfo`] & [`timed_lru::Cached`] for more information.
/// See [`timed_lru::Cached`] for more information.
///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet).
@@ -54,7 +54,7 @@ pub(crate) struct TimedLru<K, V> {
impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
type Key = K;
type Value = V;
type LookupInfo<Key> = LookupInfo<Key>;
type LookupInfo<Key> = Key;
fn invalidate(&self, info: &Self::LookupInfo<K>) {
self.invalidate_raw(info);
@@ -87,30 +87,24 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
/// Drop an entry from the cache if it's outdated.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn invalidate_raw(&self, info: &LookupInfo<K>) {
let now = Instant::now();
fn invalidate_raw(&self, key: &K) {
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let raw_entry = match cache.raw_entry_mut().from_key(&info.key) {
let entry = match cache.raw_entry_mut().from_key(key) {
RawEntryMut::Vacant(_) => return,
RawEntryMut::Occupied(x) => x,
RawEntryMut::Occupied(x) => x.remove(),
};
// Remove the entry if it was created prior to lookup timestamp.
let entry = raw_entry.get();
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
let should_remove = created_at <= info.created_at || expires_at <= now;
if should_remove {
raw_entry.remove();
}
drop(cache); // drop lock before logging
let Entry {
created_at,
expires_at,
..
} = entry;
debug!(
created_at = format_args!("{created_at:?}"),
expires_at = format_args!("{expires_at:?}"),
entry_removed = should_remove,
?created_at,
?expires_at,
"processed a cache entry invalidation event"
);
}
@@ -211,10 +205,10 @@ impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
}
pub(crate) fn insert_unit(&self, key: K, value: V) -> (Option<V>, Cached<&Self, ()>) {
let (created_at, old) = self.insert_raw(key.clone(), value);
let (_, old) = self.insert_raw(key.clone(), value);
let cached = Cached {
token: Some((self, LookupInfo { created_at, key })),
token: Some((self, key)),
value: (),
};
@@ -229,28 +223,9 @@ impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized,
{
self.get_raw(key, |key, entry| {
let info = LookupInfo {
created_at: entry.created_at,
key: key.clone(),
};
Cached {
token: Some((self, info)),
value: entry.value.clone(),
}
self.get_raw(key, |key, entry| Cached {
token: Some((self, key.clone())),
value: entry.value.clone(),
})
}
}
/// Lookup information for key invalidation.
pub(crate) struct LookupInfo<K> {
/// Time of creation of a cache [`Entry`].
/// We use this during invalidation lookups to prevent eviction of a newer
/// entry sharing the same key (it might've been inserted by a different
/// task after we got the entry we're trying to invalidate now).
created_at: Instant,
/// Search by this key.
key: K,
}

View File

@@ -236,7 +236,7 @@ impl AuthInfo {
&self,
ctx: &RequestContext,
compute: &mut ComputeConnection,
user_info: ComputeUserInfo,
user_info: &ComputeUserInfo,
) -> Result<PostgresSettings, PostgresError> {
// client config with stubbed connect info.
// TODO(conrad): should we rewrite this to bypass tokio-postgres2 entirely,
@@ -272,7 +272,7 @@ impl AuthInfo {
secret_key,
},
compute.hostname.to_string(),
user_info,
user_info.clone(),
);
Ok(PostgresSettings {

View File

@@ -226,7 +226,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
.await?;
let pg_settings = auth_info
.authenticate(ctx, &mut node, user_info)
.authenticate(ctx, &mut node, &user_info)
.or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) })
.await?;

View File

@@ -112,7 +112,7 @@ where
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if should_retry(&err, num_retries, compute.retry) {
if !should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,

View File

@@ -18,9 +18,11 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, warn};
use crate::cache::Cache;
use crate::cancellation::{self, CancellationHandler};
use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
use crate::context::RequestContext;
use crate::control_plane::client::ControlPlaneClient;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumClientConnectionsGuard};
pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute};
@@ -29,6 +31,7 @@ use crate::pglb::passthrough::ProxyPassthrough;
use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams};
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
use crate::proxy::retry::ShouldRetryWakeCompute;
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::{PqStream, Stream};
use crate::types::EndpointCacheKey;
@@ -349,26 +352,56 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys);
auth_info.set_startup_params(&params, params_compat);
let res = connect_to_compute(
ctx,
&TcpMechanism {
locks: &config.connect_compute_locks,
},
&auth::Backend::ControlPlane(cplane, creds.info.clone()),
config.wake_compute_retry_config,
&config.connect_to_compute,
)
.await;
let mut node = match res {
Ok(node) => node,
Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?,
let mut node;
let mut attempt = 0;
let connect = TcpMechanism {
locks: &config.connect_compute_locks,
};
let backend = auth::Backend::ControlPlane(cplane, creds.info);
let pg_settings = auth_info.authenticate(ctx, &mut node, creds.info).await;
let pg_settings = match pg_settings {
Ok(pg_settings) => pg_settings,
Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?,
// NOTE: This is messy, but should hopefully be detangled with PGLB.
// We wanted to separate the concerns of **connect** to compute (a PGLB operation),
// from **authenticate** to compute (a NeonKeeper operation).
//
// This unfortunately removed retry handling for one error case where
// the compute was cached, and we connected, but the compute cache was actually stale
// and is associated with the wrong endpoint. We detect this when the **authentication** fails.
// As such, we retry once here if the `authenticate` function fails and the error is valid to retry.
let pg_settings = loop {
attempt += 1;
let res = connect_to_compute(
ctx,
&connect,
&backend,
config.wake_compute_retry_config,
&config.connect_to_compute,
)
.await;
match res {
Ok(n) => node = n,
Err(e) => return Err(stream.throw_error(e, Some(ctx)).await)?,
}
let auth::Backend::ControlPlane(cplane, user_info) = &backend else {
unreachable!("ensured above");
};
let res = auth_info.authenticate(ctx, &mut node, user_info).await;
match res {
Ok(pg_settings) => break pg_settings,
Err(e) if attempt < 2 && e.should_retry_wake_compute() => {
tracing::warn!(error = ?e, "retrying wake compute");
#[allow(irrefutable_let_patterns)]
if let ControlPlaneClient::ProxyV1(cplane_proxy_v1) = &**cplane {
let key = user_info.endpoint_cache_key();
cplane_proxy_v1.caches.node_info.invalidate(&key);
}
}
Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?,
}
};
let session = cancellation_handler.get_key();

View File

@@ -3,7 +3,7 @@ use std::io;
use tokio::time;
use crate::compute;
use crate::compute::{self, PostgresError};
use crate::config::RetryConfig;
pub(crate) trait CouldRetry {
@@ -115,6 +115,14 @@ impl ShouldRetryWakeCompute for compute::ConnectionError {
}
}
impl ShouldRetryWakeCompute for PostgresError {
fn should_retry_wake_compute(&self) -> bool {
match self {
PostgresError::Postgres(error) => error.should_retry_wake_compute(),
}
}
}
pub(crate) fn retry_after(num_retries: u32, config: RetryConfig) -> time::Duration {
config
.base_delay

View File

@@ -374,6 +374,7 @@ fn connect_compute_total_wait() {
#[derive(Clone, Copy, Debug)]
enum ConnectAction {
Wake,
WakeCold,
WakeFail,
WakeRetry,
Connect,
@@ -504,6 +505,9 @@ impl TestControlPlaneClient for TestConnectMechanism {
*counter += 1;
match action {
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
ConnectAction::WakeCold => Ok(CachedNodeInfo::new_uncached(
helper_create_uncached_node_info(),
)),
ConnectAction::WakeFail => {
let err = control_plane::errors::ControlPlaneError::Message(Box::new(
ControlPlaneErrorMessage {
@@ -551,8 +555,8 @@ impl TestControlPlaneClient for TestConnectMechanism {
}
}
fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
let node = NodeInfo {
fn helper_create_uncached_node_info() -> NodeInfo {
NodeInfo {
conn_info: compute::ConnectInfo {
host: "test".into(),
port: 5432,
@@ -566,7 +570,11 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
compute_id: "compute".into(),
cold_start_info: crate::control_plane::messages::ColdStartInfo::Warm,
},
};
}
}
fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
let node = helper_create_uncached_node_info();
let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone()));
node2.map(|()| node)
}
@@ -742,7 +750,7 @@ async fn fail_no_wake_skips_cache_invalidation() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::FailNoWake,
ConnectAction::RetryNoWake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
@@ -788,7 +796,7 @@ async fn retry_no_wake_skips_invalidation() {
let ctx = RequestContext::test();
// Wake → RetryNoWake (retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake]);
let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake, Fail]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
@@ -802,3 +810,44 @@ async fn retry_no_wake_skips_invalidation() {
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_no_wake_error_fast() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → FailNoWake (not retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![Wake, FailNoWake]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap_err();
mechanism.verify();
// Because FailNoWake has wakeable=false, we must NOT see invalidate_cache
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_cold_wake_skips_invalidation() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// WakeCold → FailNoWake (not retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![WakeCold, Retry, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap();
mechanism.verify();
}