From ff526a1051b42443ad0cb6e81aff27a314b3482a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Szafra=C5=84ski?= Date: Tue, 15 Jul 2025 09:42:48 +0200 Subject: [PATCH] [proxy] Recognize more cplane errors, use retry_delay_ms as TTL (#12543) ## Problem Not all cplane errors are properly recognized and cached/retried. ## Summary of changes Add more cplane error reasons. Also, use retry_delay_ms as cache TTL if present. Related to https://github.com/neondatabase/cloud/issues/19353 --- proxy/src/cache/timed_lru.rs | 13 ++-- .../control_plane/client/cplane_proxy_v1.rs | 75 ++++++++++--------- proxy/src/control_plane/errors.rs | 49 ++++++------ proxy/src/control_plane/messages.rs | 33 ++++++-- proxy/src/proxy/mod.rs | 10 ++- 5 files changed, 107 insertions(+), 73 deletions(-) diff --git a/proxy/src/cache/timed_lru.rs b/proxy/src/cache/timed_lru.rs index 183e1ea449..e87cf53ab9 100644 --- a/proxy/src/cache/timed_lru.rs +++ b/proxy/src/cache/timed_lru.rs @@ -14,8 +14,8 @@ use std::time::{Duration, Instant}; use hashlink::{LruCache, linked_hash_map::RawEntryMut}; use tracing::debug; +use super::Cache; use super::common::Cached; -use super::{Cache, timed_lru}; /// An implementation of timed LRU cache with fixed capacity. /// Key properties: @@ -30,7 +30,7 @@ use super::{Cache, timed_lru}; /// /// * There's an API for immediate invalidation (removal) of a cache entry; /// It's useful in case we know for sure that the entry is no longer correct. -/// See [`timed_lru::Cached`] for more information. +/// See [`Cached`] for more information. /// /// * Expired entries are kept in the cache, until they are evicted by the LRU policy, /// or by a successful lookup (i.e. the entry hasn't expired yet). @@ -217,15 +217,18 @@ impl TimedLru { } impl TimedLru { - /// Retrieve a cached entry in convenient wrapper. - pub(crate) fn get(&self, key: &Q) -> Option> + /// Retrieve a cached entry in convenient wrapper, alongside timing information. + pub(crate) fn get_with_created_at( + &self, + key: &Q, + ) -> Option::Value, Instant)>> where K: Borrow + Clone, Q: Hash + Eq + ?Sized, { self.get_raw(key, |key, entry| Cached { token: Some((self, key.clone())), - value: entry.value.clone(), + value: (entry.value.clone(), entry.created_at), }) } } diff --git a/proxy/src/control_plane/client/cplane_proxy_v1.rs b/proxy/src/control_plane/client/cplane_proxy_v1.rs index fc263b73b1..bb785b8b0c 100644 --- a/proxy/src/control_plane/client/cplane_proxy_v1.rs +++ b/proxy/src/control_plane/client/cplane_proxy_v1.rs @@ -23,12 +23,13 @@ use crate::control_plane::errors::{ ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError, }; use crate::control_plane::locks::ApiLocks; -use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason}; +use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse}; use crate::control_plane::{ AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo, RoleAccessControl, }; use crate::metrics::Metrics; +use crate::proxy::retry::CouldRetry; use crate::rate_limiter::WakeComputeRateLimiter; use crate::types::{EndpointCacheKey, EndpointId, RoleName}; use crate::{compute, http, scram}; @@ -382,16 +383,31 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { macro_rules! check_cache { () => { - if let Some(cached) = self.caches.node_info.get(&key) { - let (cached, info) = cached.take_value(); - let info = info.map_err(|c| { - info!(key = &*key, "found cached wake_compute error"); - WakeComputeError::ControlPlane(ControlPlaneError::Message(Box::new(*c))) - })?; + if let Some(cached) = self.caches.node_info.get_with_created_at(&key) { + let (cached, (info, created_at)) = cached.take_value(); + return match info { + Err(mut msg) => { + info!(key = &*key, "found cached wake_compute error"); - debug!(key = &*key, "found cached compute node info"); - ctx.set_project(info.aux.clone()); - return Ok(cached.map(|()| info)); + // if retry_delay_ms is set, reduce it by the amount of time it spent in cache + if let Some(status) = &mut msg.status { + if let Some(retry_info) = &mut status.details.retry_info { + retry_info.retry_delay_ms = retry_info + .retry_delay_ms + .saturating_sub(created_at.elapsed().as_millis() as u64) + } + } + + Err(WakeComputeError::ControlPlane(ControlPlaneError::Message( + msg, + ))) + } + Ok(info) => { + debug!(key = &*key, "found cached compute node info"); + ctx.set_project(info.aux.clone()); + Ok(cached.map(|()| info)) + } + }; } }; } @@ -434,42 +450,29 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { Ok(cached.map(|()| node)) } Err(err) => match err { - WakeComputeError::ControlPlane(ControlPlaneError::Message(err)) => { - let Some(status) = &err.status else { - return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message( - err, - ))); - }; + WakeComputeError::ControlPlane(ControlPlaneError::Message(ref msg)) => { + let retry_info = msg.status.as_ref().and_then(|s| s.details.retry_info); - let reason = status - .details - .error_info - .map_or(Reason::Unknown, |x| x.reason); - - // if we can retry this error, do not cache it. - if reason.can_retry() { - return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message( - err, - ))); + // If we can retry this error, do not cache it, + // unless we were given a retry delay. + if msg.could_retry() && retry_info.is_none() { + return Err(err); } - // at this point, we should only have quota errors. debug!( key = &*key, "created a cache entry for the wake compute error" ); - self.caches.node_info.insert_ttl( - key, - Err(err.clone()), - Duration::from_secs(30), - ); + let ttl = retry_info.map_or(Duration::from_secs(30), |r| { + Duration::from_millis(r.retry_delay_ms) + }); - Err(WakeComputeError::ControlPlane(ControlPlaneError::Message( - err, - ))) + self.caches.node_info.insert_ttl(key, Err(msg.clone()), ttl); + + Err(err) } - err => return Err(err), + err => Err(err), }, } } diff --git a/proxy/src/control_plane/errors.rs b/proxy/src/control_plane/errors.rs index f640657d90..12843e48c7 100644 --- a/proxy/src/control_plane/errors.rs +++ b/proxy/src/control_plane/errors.rs @@ -43,28 +43,35 @@ impl UserFacingError for ControlPlaneError { } impl ReportableError for ControlPlaneError { - fn get_error_kind(&self) -> crate::error::ErrorKind { + fn get_error_kind(&self) -> ErrorKind { match self { ControlPlaneError::Message(e) => match e.get_reason() { - Reason::RoleProtected => ErrorKind::User, - Reason::ResourceNotFound => ErrorKind::User, - Reason::ProjectNotFound => ErrorKind::User, - Reason::EndpointNotFound => ErrorKind::User, - Reason::BranchNotFound => ErrorKind::User, + Reason::RoleProtected + | Reason::ResourceNotFound + | Reason::ProjectNotFound + | Reason::EndpointNotFound + | Reason::EndpointDisabled + | Reason::BranchNotFound + | Reason::InvalidEphemeralEndpointOptions => ErrorKind::User, + Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit, - Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::Quota, - Reason::ActiveTimeQuotaExceeded => ErrorKind::Quota, - Reason::ComputeTimeQuotaExceeded => ErrorKind::Quota, - Reason::WrittenDataQuotaExceeded => ErrorKind::Quota, - Reason::DataTransferQuotaExceeded => ErrorKind::Quota, - Reason::LogicalSizeQuotaExceeded => ErrorKind::Quota, - Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane, - Reason::LockAlreadyTaken => ErrorKind::ControlPlane, - Reason::RunningOperations => ErrorKind::ControlPlane, - Reason::ActiveEndpointsLimitExceeded => ErrorKind::ControlPlane, - Reason::Unknown => ErrorKind::ControlPlane, + + Reason::NonDefaultBranchComputeTimeExceeded + | Reason::ActiveTimeQuotaExceeded + | Reason::ComputeTimeQuotaExceeded + | Reason::WrittenDataQuotaExceeded + | Reason::DataTransferQuotaExceeded + | Reason::LogicalSizeQuotaExceeded + | Reason::ActiveEndpointsLimitExceeded => ErrorKind::Quota, + + Reason::ConcurrencyLimitReached + | Reason::LockAlreadyTaken + | Reason::RunningOperations + | Reason::EndpointIdle + | Reason::ProjectUnderMaintenance + | Reason::Unknown => ErrorKind::ControlPlane, }, - ControlPlaneError::Transport(_) => crate::error::ErrorKind::ControlPlane, + ControlPlaneError::Transport(_) => ErrorKind::ControlPlane, } } } @@ -120,10 +127,10 @@ impl UserFacingError for GetAuthInfoError { } impl ReportableError for GetAuthInfoError { - fn get_error_kind(&self) -> crate::error::ErrorKind { + fn get_error_kind(&self) -> ErrorKind { match self { - Self::BadSecret => crate::error::ErrorKind::ControlPlane, - Self::ApiError(_) => crate::error::ErrorKind::ControlPlane, + Self::BadSecret => ErrorKind::ControlPlane, + Self::ApiError(_) => ErrorKind::ControlPlane, } } } diff --git a/proxy/src/control_plane/messages.rs b/proxy/src/control_plane/messages.rs index f0314f91f0..cf193ed268 100644 --- a/proxy/src/control_plane/messages.rs +++ b/proxy/src/control_plane/messages.rs @@ -126,10 +126,16 @@ pub(crate) enum Reason { /// or that the subject doesn't have enough permissions to access the requested endpoint. #[serde(rename = "ENDPOINT_NOT_FOUND")] EndpointNotFound, + /// EndpointDisabled indicates that the endpoint has been disabled and does not accept connections. + #[serde(rename = "ENDPOINT_DISABLED")] + EndpointDisabled, /// BranchNotFound indicates that the branch wasn't found, usually due to the provided ID not being correct, /// or that the subject doesn't have enough permissions to access the requested branch. #[serde(rename = "BRANCH_NOT_FOUND")] BranchNotFound, + /// InvalidEphemeralEndpointOptions indicates that the specified LSN or timestamp are wrong. + #[serde(rename = "INVALID_EPHEMERAL_OPTIONS")] + InvalidEphemeralEndpointOptions, /// RateLimitExceeded indicates that the rate limit for the operation has been exceeded. #[serde(rename = "RATE_LIMIT_EXCEEDED")] RateLimitExceeded, @@ -152,6 +158,9 @@ pub(crate) enum Reason { /// LogicalSizeQuotaExceeded indicates that the logical size quota was exceeded. #[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")] LogicalSizeQuotaExceeded, + /// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded. + #[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")] + ActiveEndpointsLimitExceeded, /// RunningOperations indicates that the project already has some running operations /// and scheduling of new ones is prohibited. #[serde(rename = "RUNNING_OPERATIONS")] @@ -162,9 +171,13 @@ pub(crate) enum Reason { /// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken. #[serde(rename = "LOCK_ALREADY_TAKEN")] LockAlreadyTaken, - /// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded. - #[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")] - ActiveEndpointsLimitExceeded, + /// EndpointIdle indicates that the endpoint cannot become active, because it's idle. + #[serde(rename = "ENDPOINT_IDLE")] + EndpointIdle, + /// ProjectUnderMaintenance indicates that the project is currently ongoing maintenance, + /// and thus cannot accept connections. + #[serde(rename = "PROJECT_UNDER_MAINTENANCE")] + ProjectUnderMaintenance, #[default] #[serde(other)] Unknown, @@ -184,13 +197,15 @@ impl Reason { pub(crate) fn can_retry(self) -> bool { match self { // do not retry role protected errors - // not a transitive error + // not a transient error Reason::RoleProtected => false, - // on retry, it will still not be found + // on retry, it will still not be found or valid Reason::ResourceNotFound | Reason::ProjectNotFound | Reason::EndpointNotFound - | Reason::BranchNotFound => false, + | Reason::EndpointDisabled + | Reason::BranchNotFound + | Reason::InvalidEphemeralEndpointOptions => false, // we were asked to go away Reason::RateLimitExceeded | Reason::NonDefaultBranchComputeTimeExceeded @@ -200,11 +215,13 @@ impl Reason { | Reason::DataTransferQuotaExceeded | Reason::LogicalSizeQuotaExceeded | Reason::ActiveEndpointsLimitExceeded => false, - // transitive error. control plane is currently busy + // transient error. control plane is currently busy // but might be ready soon Reason::RunningOperations | Reason::ConcurrencyLimitReached - | Reason::LockAlreadyTaken => true, + | Reason::LockAlreadyTaken + | Reason::EndpointIdle + | Reason::ProjectUnderMaintenance => true, // unknown error. better not retry it. Reason::Unknown => false, } diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 08c81afa04..02651109e0 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -195,15 +195,18 @@ impl NeonOptions { // proxy options: /// `PARAMS_COMPAT` allows opting in to forwarding all startup parameters from client to compute. - pub const PARAMS_COMPAT: &str = "proxy_params_compat"; + pub const PARAMS_COMPAT: &'static str = "proxy_params_compat"; // cplane options: /// `LSN` allows provisioning an ephemeral compute with time-travel to the provided LSN. - const LSN: &str = "lsn"; + const LSN: &'static str = "lsn"; + + /// `TIMESTAMP` allows provisioning an ephemeral compute with time-travel to the provided timestamp. + const TIMESTAMP: &'static str = "timestamp"; /// `ENDPOINT_TYPE` allows configuring an ephemeral compute to be read_only or read_write. - const ENDPOINT_TYPE: &str = "endpoint_type"; + const ENDPOINT_TYPE: &'static str = "endpoint_type"; pub(crate) fn parse_params(params: &StartupMessageParams) -> Self { params @@ -228,6 +231,7 @@ impl NeonOptions { // This is not a cplane option, we know it does not create ephemeral computes. Self::PARAMS_COMPAT => false, Self::LSN => true, + Self::TIMESTAMP => true, Self::ENDPOINT_TYPE => true, // err on the side of caution. any cplane options we don't know about // might lead to ephemeral computes.