[proxy] Recognize more cplane errors, use retry_delay_ms as TTL (#12543)

## Problem

Not all cplane errors are properly recognized and cached/retried.

## Summary of changes

Add more cplane error reasons. Also, use retry_delay_ms as cache TTL if
present.

Related to https://github.com/neondatabase/cloud/issues/19353
This commit is contained in:
Krzysztof Szafrański
2025-07-15 09:42:48 +02:00
committed by GitHub
parent 9a2456bea5
commit ff526a1051
5 changed files with 107 additions and 73 deletions

View File

@@ -14,8 +14,8 @@ use std::time::{Duration, Instant};
use hashlink::{LruCache, linked_hash_map::RawEntryMut}; use hashlink::{LruCache, linked_hash_map::RawEntryMut};
use tracing::debug; use tracing::debug;
use super::Cache;
use super::common::Cached; use super::common::Cached;
use super::{Cache, timed_lru};
/// An implementation of timed LRU cache with fixed capacity. /// An implementation of timed LRU cache with fixed capacity.
/// Key properties: /// Key properties:
@@ -30,7 +30,7 @@ use super::{Cache, timed_lru};
/// ///
/// * There's an API for immediate invalidation (removal) of a cache entry; /// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct. /// It's useful in case we know for sure that the entry is no longer correct.
/// See [`timed_lru::Cached`] for more information. /// See [`Cached`] for more information.
/// ///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy, /// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet). /// or by a successful lookup (i.e. the entry hasn't expired yet).
@@ -217,15 +217,18 @@ impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
} }
impl<K: Hash + Eq, V: Clone> TimedLru<K, V> { impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
/// Retrieve a cached entry in convenient wrapper. /// Retrieve a cached entry in convenient wrapper, alongside timing information.
pub(crate) fn get<Q>(&self, key: &Q) -> Option<timed_lru::Cached<&Self>> pub(crate) fn get_with_created_at<Q>(
&self,
key: &Q,
) -> Option<Cached<&Self, (<Self as Cache>::Value, Instant)>>
where where
K: Borrow<Q> + Clone, K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized, Q: Hash + Eq + ?Sized,
{ {
self.get_raw(key, |key, entry| Cached { self.get_raw(key, |key, entry| Cached {
token: Some((self, key.clone())), token: Some((self, key.clone())),
value: entry.value.clone(), value: (entry.value.clone(), entry.created_at),
}) })
} }
} }

View File

@@ -23,12 +23,13 @@ use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError, ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
}; };
use crate::control_plane::locks::ApiLocks; use crate::control_plane::locks::ApiLocks;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason}; use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse};
use crate::control_plane::{ use crate::control_plane::{
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo, AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
RoleAccessControl, RoleAccessControl,
}; };
use crate::metrics::Metrics; use crate::metrics::Metrics;
use crate::proxy::retry::CouldRetry;
use crate::rate_limiter::WakeComputeRateLimiter; use crate::rate_limiter::WakeComputeRateLimiter;
use crate::types::{EndpointCacheKey, EndpointId, RoleName}; use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::{compute, http, scram}; use crate::{compute, http, scram};
@@ -382,16 +383,31 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
macro_rules! check_cache { macro_rules! check_cache {
() => { () => {
if let Some(cached) = self.caches.node_info.get(&key) { if let Some(cached) = self.caches.node_info.get_with_created_at(&key) {
let (cached, info) = cached.take_value(); let (cached, (info, created_at)) = cached.take_value();
let info = info.map_err(|c| { return match info {
Err(mut msg) => {
info!(key = &*key, "found cached wake_compute error"); info!(key = &*key, "found cached wake_compute error");
WakeComputeError::ControlPlane(ControlPlaneError::Message(Box::new(*c)))
})?;
// if retry_delay_ms is set, reduce it by the amount of time it spent in cache
if let Some(status) = &mut msg.status {
if let Some(retry_info) = &mut status.details.retry_info {
retry_info.retry_delay_ms = retry_info
.retry_delay_ms
.saturating_sub(created_at.elapsed().as_millis() as u64)
}
}
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
msg,
)))
}
Ok(info) => {
debug!(key = &*key, "found cached compute node info"); debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone()); ctx.set_project(info.aux.clone());
return Ok(cached.map(|()| info)); Ok(cached.map(|()| info))
}
};
} }
}; };
} }
@@ -434,42 +450,29 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
Ok(cached.map(|()| node)) Ok(cached.map(|()| node))
} }
Err(err) => match err { Err(err) => match err {
WakeComputeError::ControlPlane(ControlPlaneError::Message(err)) => { WakeComputeError::ControlPlane(ControlPlaneError::Message(ref msg)) => {
let Some(status) = &err.status else { let retry_info = msg.status.as_ref().and_then(|s| s.details.retry_info);
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
};
let reason = status // If we can retry this error, do not cache it,
.details // unless we were given a retry delay.
.error_info if msg.could_retry() && retry_info.is_none() {
.map_or(Reason::Unknown, |x| x.reason); return Err(err);
// if we can retry this error, do not cache it.
if reason.can_retry() {
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
} }
// at this point, we should only have quota errors.
debug!( debug!(
key = &*key, key = &*key,
"created a cache entry for the wake compute error" "created a cache entry for the wake compute error"
); );
self.caches.node_info.insert_ttl( let ttl = retry_info.map_or(Duration::from_secs(30), |r| {
key, Duration::from_millis(r.retry_delay_ms)
Err(err.clone()), });
Duration::from_secs(30),
);
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message( self.caches.node_info.insert_ttl(key, Err(msg.clone()), ttl);
err,
))) Err(err)
} }
err => return Err(err), err => Err(err),
}, },
} }
} }

View File

@@ -43,28 +43,35 @@ impl UserFacingError for ControlPlaneError {
} }
impl ReportableError for ControlPlaneError { impl ReportableError for ControlPlaneError {
fn get_error_kind(&self) -> crate::error::ErrorKind { fn get_error_kind(&self) -> ErrorKind {
match self { match self {
ControlPlaneError::Message(e) => match e.get_reason() { ControlPlaneError::Message(e) => match e.get_reason() {
Reason::RoleProtected => ErrorKind::User, Reason::RoleProtected
Reason::ResourceNotFound => ErrorKind::User, | Reason::ResourceNotFound
Reason::ProjectNotFound => ErrorKind::User, | Reason::ProjectNotFound
Reason::EndpointNotFound => ErrorKind::User, | Reason::EndpointNotFound
Reason::BranchNotFound => ErrorKind::User, | Reason::EndpointDisabled
| Reason::BranchNotFound
| Reason::InvalidEphemeralEndpointOptions => ErrorKind::User,
Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit, Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit,
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::Quota,
Reason::ActiveTimeQuotaExceeded => ErrorKind::Quota, Reason::NonDefaultBranchComputeTimeExceeded
Reason::ComputeTimeQuotaExceeded => ErrorKind::Quota, | Reason::ActiveTimeQuotaExceeded
Reason::WrittenDataQuotaExceeded => ErrorKind::Quota, | Reason::ComputeTimeQuotaExceeded
Reason::DataTransferQuotaExceeded => ErrorKind::Quota, | Reason::WrittenDataQuotaExceeded
Reason::LogicalSizeQuotaExceeded => ErrorKind::Quota, | Reason::DataTransferQuotaExceeded
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane, | Reason::LogicalSizeQuotaExceeded
Reason::LockAlreadyTaken => ErrorKind::ControlPlane, | Reason::ActiveEndpointsLimitExceeded => ErrorKind::Quota,
Reason::RunningOperations => ErrorKind::ControlPlane,
Reason::ActiveEndpointsLimitExceeded => ErrorKind::ControlPlane, Reason::ConcurrencyLimitReached
Reason::Unknown => ErrorKind::ControlPlane, | Reason::LockAlreadyTaken
| Reason::RunningOperations
| Reason::EndpointIdle
| Reason::ProjectUnderMaintenance
| Reason::Unknown => ErrorKind::ControlPlane,
}, },
ControlPlaneError::Transport(_) => crate::error::ErrorKind::ControlPlane, ControlPlaneError::Transport(_) => ErrorKind::ControlPlane,
} }
} }
} }
@@ -120,10 +127,10 @@ impl UserFacingError for GetAuthInfoError {
} }
impl ReportableError for GetAuthInfoError { impl ReportableError for GetAuthInfoError {
fn get_error_kind(&self) -> crate::error::ErrorKind { fn get_error_kind(&self) -> ErrorKind {
match self { match self {
Self::BadSecret => crate::error::ErrorKind::ControlPlane, Self::BadSecret => ErrorKind::ControlPlane,
Self::ApiError(_) => crate::error::ErrorKind::ControlPlane, Self::ApiError(_) => ErrorKind::ControlPlane,
} }
} }
} }

View File

@@ -126,10 +126,16 @@ pub(crate) enum Reason {
/// or that the subject doesn't have enough permissions to access the requested endpoint. /// or that the subject doesn't have enough permissions to access the requested endpoint.
#[serde(rename = "ENDPOINT_NOT_FOUND")] #[serde(rename = "ENDPOINT_NOT_FOUND")]
EndpointNotFound, EndpointNotFound,
/// EndpointDisabled indicates that the endpoint has been disabled and does not accept connections.
#[serde(rename = "ENDPOINT_DISABLED")]
EndpointDisabled,
/// BranchNotFound indicates that the branch wasn't found, usually due to the provided ID not being correct, /// BranchNotFound indicates that the branch wasn't found, usually due to the provided ID not being correct,
/// or that the subject doesn't have enough permissions to access the requested branch. /// or that the subject doesn't have enough permissions to access the requested branch.
#[serde(rename = "BRANCH_NOT_FOUND")] #[serde(rename = "BRANCH_NOT_FOUND")]
BranchNotFound, BranchNotFound,
/// InvalidEphemeralEndpointOptions indicates that the specified LSN or timestamp are wrong.
#[serde(rename = "INVALID_EPHEMERAL_OPTIONS")]
InvalidEphemeralEndpointOptions,
/// RateLimitExceeded indicates that the rate limit for the operation has been exceeded. /// RateLimitExceeded indicates that the rate limit for the operation has been exceeded.
#[serde(rename = "RATE_LIMIT_EXCEEDED")] #[serde(rename = "RATE_LIMIT_EXCEEDED")]
RateLimitExceeded, RateLimitExceeded,
@@ -152,6 +158,9 @@ pub(crate) enum Reason {
/// LogicalSizeQuotaExceeded indicates that the logical size quota was exceeded. /// LogicalSizeQuotaExceeded indicates that the logical size quota was exceeded.
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")] #[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
LogicalSizeQuotaExceeded, LogicalSizeQuotaExceeded,
/// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded.
#[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")]
ActiveEndpointsLimitExceeded,
/// RunningOperations indicates that the project already has some running operations /// RunningOperations indicates that the project already has some running operations
/// and scheduling of new ones is prohibited. /// and scheduling of new ones is prohibited.
#[serde(rename = "RUNNING_OPERATIONS")] #[serde(rename = "RUNNING_OPERATIONS")]
@@ -162,9 +171,13 @@ pub(crate) enum Reason {
/// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken. /// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken.
#[serde(rename = "LOCK_ALREADY_TAKEN")] #[serde(rename = "LOCK_ALREADY_TAKEN")]
LockAlreadyTaken, LockAlreadyTaken,
/// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded. /// EndpointIdle indicates that the endpoint cannot become active, because it's idle.
#[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")] #[serde(rename = "ENDPOINT_IDLE")]
ActiveEndpointsLimitExceeded, EndpointIdle,
/// ProjectUnderMaintenance indicates that the project is currently ongoing maintenance,
/// and thus cannot accept connections.
#[serde(rename = "PROJECT_UNDER_MAINTENANCE")]
ProjectUnderMaintenance,
#[default] #[default]
#[serde(other)] #[serde(other)]
Unknown, Unknown,
@@ -184,13 +197,15 @@ impl Reason {
pub(crate) fn can_retry(self) -> bool { pub(crate) fn can_retry(self) -> bool {
match self { match self {
// do not retry role protected errors // do not retry role protected errors
// not a transitive error // not a transient error
Reason::RoleProtected => false, Reason::RoleProtected => false,
// on retry, it will still not be found // on retry, it will still not be found or valid
Reason::ResourceNotFound Reason::ResourceNotFound
| Reason::ProjectNotFound | Reason::ProjectNotFound
| Reason::EndpointNotFound | Reason::EndpointNotFound
| Reason::BranchNotFound => false, | Reason::EndpointDisabled
| Reason::BranchNotFound
| Reason::InvalidEphemeralEndpointOptions => false,
// we were asked to go away // we were asked to go away
Reason::RateLimitExceeded Reason::RateLimitExceeded
| Reason::NonDefaultBranchComputeTimeExceeded | Reason::NonDefaultBranchComputeTimeExceeded
@@ -200,11 +215,13 @@ impl Reason {
| Reason::DataTransferQuotaExceeded | Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded | Reason::LogicalSizeQuotaExceeded
| Reason::ActiveEndpointsLimitExceeded => false, | Reason::ActiveEndpointsLimitExceeded => false,
// transitive error. control plane is currently busy // transient error. control plane is currently busy
// but might be ready soon // but might be ready soon
Reason::RunningOperations Reason::RunningOperations
| Reason::ConcurrencyLimitReached | Reason::ConcurrencyLimitReached
| Reason::LockAlreadyTaken => true, | Reason::LockAlreadyTaken
| Reason::EndpointIdle
| Reason::ProjectUnderMaintenance => true,
// unknown error. better not retry it. // unknown error. better not retry it.
Reason::Unknown => false, Reason::Unknown => false,
} }

View File

@@ -195,15 +195,18 @@ impl NeonOptions {
// proxy options: // proxy options:
/// `PARAMS_COMPAT` allows opting in to forwarding all startup parameters from client to compute. /// `PARAMS_COMPAT` allows opting in to forwarding all startup parameters from client to compute.
pub const PARAMS_COMPAT: &str = "proxy_params_compat"; pub const PARAMS_COMPAT: &'static str = "proxy_params_compat";
// cplane options: // cplane options:
/// `LSN` allows provisioning an ephemeral compute with time-travel to the provided LSN. /// `LSN` allows provisioning an ephemeral compute with time-travel to the provided LSN.
const LSN: &str = "lsn"; const LSN: &'static str = "lsn";
/// `TIMESTAMP` allows provisioning an ephemeral compute with time-travel to the provided timestamp.
const TIMESTAMP: &'static str = "timestamp";
/// `ENDPOINT_TYPE` allows configuring an ephemeral compute to be read_only or read_write. /// `ENDPOINT_TYPE` allows configuring an ephemeral compute to be read_only or read_write.
const ENDPOINT_TYPE: &str = "endpoint_type"; const ENDPOINT_TYPE: &'static str = "endpoint_type";
pub(crate) fn parse_params(params: &StartupMessageParams) -> Self { pub(crate) fn parse_params(params: &StartupMessageParams) -> Self {
params params
@@ -228,6 +231,7 @@ impl NeonOptions {
// This is not a cplane option, we know it does not create ephemeral computes. // This is not a cplane option, we know it does not create ephemeral computes.
Self::PARAMS_COMPAT => false, Self::PARAMS_COMPAT => false,
Self::LSN => true, Self::LSN => true,
Self::TIMESTAMP => true,
Self::ENDPOINT_TYPE => true, Self::ENDPOINT_TYPE => true,
// err on the side of caution. any cplane options we don't know about // err on the side of caution. any cplane options we don't know about
// might lead to ephemeral computes. // might lead to ephemeral computes.