mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
proxy: cache certain non-retriable console errors for a short time (#8201)
## Problem
If there's a quota error, it makes sense to cache it for a short window
of time. Many clients do not handle database connection errors
gracefully, so just spam retry 🤡
## Summary of changes
Updates the node_info cache to support storing console errors. Store
console errors if they cannot be retried (using our own heuristic.
should only trigger for quota exceeded errors).
This commit is contained in:
7
proxy/src/cache/common.rs
vendored
7
proxy/src/cache/common.rs
vendored
@@ -53,6 +53,13 @@ impl<C: Cache, V> Cached<C, V> {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn map<U>(self, f: impl FnOnce(V) -> U) -> Cached<C, U> {
|
||||
Cached {
|
||||
token: self.token,
|
||||
value: f(self.value),
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop this entry from a cache if it's still there.
|
||||
pub fn invalidate(self) -> V {
|
||||
if let Some((cache, info)) = &self.token {
|
||||
|
||||
38
proxy/src/cache/timed_lru.rs
vendored
38
proxy/src/cache/timed_lru.rs
vendored
@@ -65,6 +65,8 @@ impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
|
||||
struct Entry<T> {
|
||||
created_at: Instant,
|
||||
expires_at: Instant,
|
||||
ttl: Duration,
|
||||
update_ttl_on_retrieval: bool,
|
||||
value: T,
|
||||
}
|
||||
|
||||
@@ -122,7 +124,6 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
|
||||
Q: Hash + Eq + ?Sized,
|
||||
{
|
||||
let now = Instant::now();
|
||||
let deadline = now.checked_add(self.ttl).expect("time overflow");
|
||||
|
||||
// Do costly things before taking the lock.
|
||||
let mut cache = self.cache.lock();
|
||||
@@ -142,7 +143,8 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
|
||||
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
|
||||
|
||||
// Update the deadline and the entry's position in the LRU list.
|
||||
if self.update_ttl_on_retrieval {
|
||||
let deadline = now.checked_add(raw_entry.get().ttl).expect("time overflow");
|
||||
if raw_entry.get().update_ttl_on_retrieval {
|
||||
raw_entry.get_mut().expires_at = deadline;
|
||||
}
|
||||
raw_entry.to_back();
|
||||
@@ -162,12 +164,27 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
|
||||
/// existed, return the previous value and its creation timestamp.
|
||||
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
|
||||
fn insert_raw(&self, key: K, value: V) -> (Instant, Option<V>) {
|
||||
self.insert_raw_ttl(key, value, self.ttl, self.update_ttl_on_retrieval)
|
||||
}
|
||||
|
||||
/// Insert an entry to the cache. If an entry with the same key already
|
||||
/// existed, return the previous value and its creation timestamp.
|
||||
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
|
||||
fn insert_raw_ttl(
|
||||
&self,
|
||||
key: K,
|
||||
value: V,
|
||||
ttl: Duration,
|
||||
update: bool,
|
||||
) -> (Instant, Option<V>) {
|
||||
let created_at = Instant::now();
|
||||
let expires_at = created_at.checked_add(self.ttl).expect("time overflow");
|
||||
let expires_at = created_at.checked_add(ttl).expect("time overflow");
|
||||
|
||||
let entry = Entry {
|
||||
created_at,
|
||||
expires_at,
|
||||
ttl,
|
||||
update_ttl_on_retrieval: update,
|
||||
value,
|
||||
};
|
||||
|
||||
@@ -190,6 +207,21 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
|
||||
}
|
||||
|
||||
impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
|
||||
pub fn insert_ttl(&self, key: K, value: V, ttl: Duration) {
|
||||
self.insert_raw_ttl(key, value, ttl, false);
|
||||
}
|
||||
|
||||
pub fn insert_unit(&self, key: K, value: V) -> (Option<V>, Cached<&Self, ()>) {
|
||||
let (created_at, old) = self.insert_raw(key.clone(), value);
|
||||
|
||||
let cached = Cached {
|
||||
token: Some((self, LookupInfo { created_at, key })),
|
||||
value: (),
|
||||
};
|
||||
|
||||
(old, cached)
|
||||
}
|
||||
|
||||
pub fn insert(&self, key: K, value: V) -> (Option<V>, Cached<&Self>) {
|
||||
let (created_at, old) = self.insert_raw(key.clone(), value.clone());
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use crate::proxy::retry::CouldRetry;
|
||||
|
||||
/// Generic error response with human-readable description.
|
||||
/// Note that we can't always present it to user as is.
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct ConsoleError {
|
||||
pub error: Box<str>,
|
||||
#[serde(skip)]
|
||||
@@ -82,41 +82,19 @@ impl CouldRetry for ConsoleError {
|
||||
.details
|
||||
.error_info
|
||||
.map_or(Reason::Unknown, |e| e.reason);
|
||||
match reason {
|
||||
// not a transitive error
|
||||
Reason::RoleProtected => false,
|
||||
// on retry, it will still not be found
|
||||
Reason::ResourceNotFound
|
||||
| Reason::ProjectNotFound
|
||||
| Reason::EndpointNotFound
|
||||
| Reason::BranchNotFound => false,
|
||||
// we were asked to go away
|
||||
Reason::RateLimitExceeded
|
||||
| Reason::NonDefaultBranchComputeTimeExceeded
|
||||
| Reason::ActiveTimeQuotaExceeded
|
||||
| Reason::ComputeTimeQuotaExceeded
|
||||
| Reason::WrittenDataQuotaExceeded
|
||||
| Reason::DataTransferQuotaExceeded
|
||||
| Reason::LogicalSizeQuotaExceeded => false,
|
||||
// transitive error. control plane is currently busy
|
||||
// but might be ready soon
|
||||
Reason::RunningOperations => true,
|
||||
Reason::ConcurrencyLimitReached => true,
|
||||
Reason::LockAlreadyTaken => true,
|
||||
// unknown error. better not retry it.
|
||||
Reason::Unknown => false,
|
||||
}
|
||||
|
||||
reason.can_retry()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct Status {
|
||||
pub code: Box<str>,
|
||||
pub message: Box<str>,
|
||||
pub details: Details,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct Details {
|
||||
pub error_info: Option<ErrorInfo>,
|
||||
pub retry_info: Option<RetryInfo>,
|
||||
@@ -199,6 +177,34 @@ impl Reason {
|
||||
| Reason::BranchNotFound
|
||||
)
|
||||
}
|
||||
|
||||
pub fn can_retry(&self) -> bool {
|
||||
match self {
|
||||
// do not retry role protected errors
|
||||
// not a transitive error
|
||||
Reason::RoleProtected => false,
|
||||
// on retry, it will still not be found
|
||||
Reason::ResourceNotFound
|
||||
| Reason::ProjectNotFound
|
||||
| Reason::EndpointNotFound
|
||||
| Reason::BranchNotFound => false,
|
||||
// we were asked to go away
|
||||
Reason::RateLimitExceeded
|
||||
| Reason::NonDefaultBranchComputeTimeExceeded
|
||||
| Reason::ActiveTimeQuotaExceeded
|
||||
| Reason::ComputeTimeQuotaExceeded
|
||||
| Reason::WrittenDataQuotaExceeded
|
||||
| Reason::DataTransferQuotaExceeded
|
||||
| Reason::LogicalSizeQuotaExceeded => false,
|
||||
// transitive error. control plane is currently busy
|
||||
// but might be ready soon
|
||||
Reason::RunningOperations
|
||||
| Reason::ConcurrencyLimitReached
|
||||
| Reason::LockAlreadyTaken => true,
|
||||
// unknown error. better not retry it.
|
||||
Reason::Unknown => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Deserialize)]
|
||||
@@ -206,7 +212,7 @@ pub struct RetryInfo {
|
||||
pub retry_delay_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct UserFacingMessage {
|
||||
pub message: Box<str>,
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
pub mod mock;
|
||||
pub mod neon;
|
||||
|
||||
use super::messages::MetricsAuxInfo;
|
||||
use super::messages::{ConsoleError, MetricsAuxInfo};
|
||||
use crate::{
|
||||
auth::{
|
||||
backend::{ComputeCredentialKeys, ComputeUserInfo},
|
||||
@@ -317,8 +317,8 @@ impl NodeInfo {
|
||||
}
|
||||
}
|
||||
|
||||
pub type NodeInfoCache = TimedLru<EndpointCacheKey, NodeInfo>;
|
||||
pub type CachedNodeInfo = Cached<&'static NodeInfoCache>;
|
||||
pub type NodeInfoCache = TimedLru<EndpointCacheKey, Result<NodeInfo, Box<ConsoleError>>>;
|
||||
pub type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
|
||||
pub type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option<AuthSecret>>;
|
||||
pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPattern>>>;
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use super::{
|
||||
use crate::{
|
||||
auth::backend::ComputeUserInfo,
|
||||
compute,
|
||||
console::messages::ColdStartInfo,
|
||||
console::messages::{ColdStartInfo, Reason},
|
||||
http,
|
||||
metrics::{CacheOutcome, Metrics},
|
||||
rate_limiter::EndpointRateLimiter,
|
||||
@@ -17,10 +17,10 @@ use crate::{
|
||||
};
|
||||
use crate::{cache::Cached, context::RequestMonitoring};
|
||||
use futures::TryFutureExt;
|
||||
use std::sync::Arc;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use tracing::{debug, error, info, info_span, warn, Instrument};
|
||||
|
||||
pub struct Api {
|
||||
endpoint: http::Endpoint,
|
||||
@@ -273,26 +273,34 @@ impl super::Api for Api {
|
||||
) -> Result<CachedNodeInfo, WakeComputeError> {
|
||||
let key = user_info.endpoint_cache_key();
|
||||
|
||||
macro_rules! check_cache {
|
||||
() => {
|
||||
if let Some(cached) = self.caches.node_info.get(&key) {
|
||||
let (cached, info) = cached.take_value();
|
||||
let info = info.map_err(|c| {
|
||||
info!(key = &*key, "found cached wake_compute error");
|
||||
WakeComputeError::ApiError(ApiError::Console(*c))
|
||||
})?;
|
||||
|
||||
debug!(key = &*key, "found cached compute node info");
|
||||
ctx.set_project(info.aux.clone());
|
||||
return Ok(cached.map(|()| info));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Every time we do a wakeup http request, the compute node will stay up
|
||||
// for some time (highly depends on the console's scale-to-zero policy);
|
||||
// The connection info remains the same during that period of time,
|
||||
// which means that we might cache it to reduce the load and latency.
|
||||
if let Some(cached) = self.caches.node_info.get(&key) {
|
||||
info!(key = &*key, "found cached compute node info");
|
||||
ctx.set_project(cached.aux.clone());
|
||||
return Ok(cached);
|
||||
}
|
||||
check_cache!();
|
||||
|
||||
let permit = self.locks.get_permit(&key).await?;
|
||||
|
||||
// after getting back a permit - it's possible the cache was filled
|
||||
// double check
|
||||
if permit.should_check_cache() {
|
||||
if let Some(cached) = self.caches.node_info.get(&key) {
|
||||
info!(key = &*key, "found cached compute node info");
|
||||
ctx.set_project(cached.aux.clone());
|
||||
return Ok(cached);
|
||||
}
|
||||
check_cache!();
|
||||
}
|
||||
|
||||
// check rate limit
|
||||
@@ -300,23 +308,56 @@ impl super::Api for Api {
|
||||
.wake_compute_endpoint_rate_limiter
|
||||
.check(user_info.endpoint.normalize_intern(), 1)
|
||||
{
|
||||
info!(key = &*key, "found cached compute node info");
|
||||
return Err(WakeComputeError::TooManyConnections);
|
||||
}
|
||||
|
||||
let mut node = permit.release_result(self.do_wake_compute(ctx, user_info).await)?;
|
||||
ctx.set_project(node.aux.clone());
|
||||
let cold_start_info = node.aux.cold_start_info;
|
||||
info!("woken up a compute node");
|
||||
let node = permit.release_result(self.do_wake_compute(ctx, user_info).await);
|
||||
match node {
|
||||
Ok(node) => {
|
||||
ctx.set_project(node.aux.clone());
|
||||
debug!(key = &*key, "created a cache entry for woken compute node");
|
||||
|
||||
// store the cached node as 'warm'
|
||||
node.aux.cold_start_info = ColdStartInfo::WarmCached;
|
||||
let (_, mut cached) = self.caches.node_info.insert(key.clone(), node);
|
||||
cached.aux.cold_start_info = cold_start_info;
|
||||
let mut stored_node = node.clone();
|
||||
// store the cached node as 'warm_cached'
|
||||
stored_node.aux.cold_start_info = ColdStartInfo::WarmCached;
|
||||
|
||||
info!(key = &*key, "created a cache entry for compute node info");
|
||||
let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node));
|
||||
|
||||
Ok(cached)
|
||||
Ok(cached.map(|()| node))
|
||||
}
|
||||
Err(err) => match err {
|
||||
WakeComputeError::ApiError(ApiError::Console(err)) => {
|
||||
let Some(status) = &err.status else {
|
||||
return Err(WakeComputeError::ApiError(ApiError::Console(err)));
|
||||
};
|
||||
|
||||
let reason = status
|
||||
.details
|
||||
.error_info
|
||||
.map_or(Reason::Unknown, |x| x.reason);
|
||||
|
||||
// if we can retry this error, do not cache it.
|
||||
if reason.can_retry() {
|
||||
return Err(WakeComputeError::ApiError(ApiError::Console(err)));
|
||||
}
|
||||
|
||||
// at this point, we should only have quota errors.
|
||||
debug!(
|
||||
key = &*key,
|
||||
"created a cache entry for the wake compute error"
|
||||
);
|
||||
|
||||
self.caches.node_info.insert_ttl(
|
||||
key,
|
||||
Err(Box::new(err.clone())),
|
||||
Duration::from_secs(30),
|
||||
);
|
||||
|
||||
Err(WakeComputeError::ApiError(ApiError::Console(err)))
|
||||
}
|
||||
err => return Err(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -540,8 +540,8 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
|
||||
},
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
let (_, node) = cache.insert("key".into(), node);
|
||||
node
|
||||
let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone()));
|
||||
node2.map(|()| node)
|
||||
}
|
||||
|
||||
fn helper_create_connect_info(
|
||||
|
||||
Reference in New Issue
Block a user