[proxy] Cache node info only for TTL, even if Redis is available (#12626)

This PR simplifies our node info cache. Now we'll store entries for at
most the TTL duration, even if Redis notifications are available. This
will allow us to cache intermittent errors later (e.g. due to rate
limits) with more predictable behavior.

Related to https://github.com/neondatabase/cloud/issues/19353
This commit is contained in:
Krzysztof Szafrański
2025-07-16 18:23:05 +02:00
committed by GitHub
parent 9e154a8130
commit e2982ed3ec
2 changed files with 16 additions and 98 deletions

View File

@@ -1,13 +1,11 @@
use std::collections::{HashMap, HashSet, hash_map};
use std::convert::Infallible;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use async_trait::async_trait;
use clashmap::ClashMap;
use clashmap::mapref::one::Ref;
use rand::{Rng, thread_rng};
use tokio::sync::Mutex;
use tokio::time::Instant;
use tracing::{debug, info};
@@ -22,31 +20,23 @@ pub(crate) trait ProjectInfoCache {
fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt);
fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt);
fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt);
async fn decrement_active_listeners(&self);
async fn increment_active_listeners(&self);
}
struct Entry<T> {
created_at: Instant,
expires_at: Instant,
value: T,
}
impl<T> Entry<T> {
pub(crate) fn new(value: T) -> Self {
pub(crate) fn new(value: T, ttl: Duration) -> Self {
Self {
created_at: Instant::now(),
expires_at: Instant::now() + ttl,
value,
}
}
pub(crate) fn get(&self, valid_since: Instant) -> Option<&T> {
(valid_since < self.created_at).then_some(&self.value)
}
}
impl<T> From<T> for Entry<T> {
fn from(value: T) -> Self {
Self::new(value)
pub(crate) fn get(&self) -> Option<&T> {
(self.expires_at > Instant::now()).then_some(&self.value)
}
}
@@ -56,18 +46,12 @@ struct EndpointInfo {
}
impl EndpointInfo {
pub(crate) fn get_role_secret(
&self,
role_name: RoleNameInt,
valid_since: Instant,
) -> Option<RoleAccessControl> {
let controls = self.role_controls.get(&role_name)?;
controls.get(valid_since).cloned()
pub(crate) fn get_role_secret(&self, role_name: RoleNameInt) -> Option<RoleAccessControl> {
self.role_controls.get(&role_name)?.get().cloned()
}
pub(crate) fn get_controls(&self, valid_since: Instant) -> Option<EndpointAccessControl> {
let controls = self.controls.as_ref()?;
controls.get(valid_since).cloned()
pub(crate) fn get_controls(&self) -> Option<EndpointAccessControl> {
self.controls.as_ref()?.get().cloned()
}
pub(crate) fn invalidate_endpoint(&mut self) {
@@ -92,11 +76,8 @@ pub struct ProjectInfoCacheImpl {
project2ep: ClashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
// FIXME(stefan): we need a way to GC the account2ep map.
account2ep: ClashMap<AccountIdInt, HashSet<EndpointIdInt>>,
config: ProjectInfoCacheOptions,
start_time: Instant,
ttl_disabled_since_us: AtomicU64,
active_listeners_lock: Mutex<usize>,
config: ProjectInfoCacheOptions,
}
#[async_trait]
@@ -152,29 +133,6 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
}
}
}
async fn decrement_active_listeners(&self) {
let mut listeners_guard = self.active_listeners_lock.lock().await;
if *listeners_guard == 0 {
tracing::error!("active_listeners count is already 0, something is broken");
return;
}
*listeners_guard -= 1;
if *listeners_guard == 0 {
self.ttl_disabled_since_us
.store(u64::MAX, std::sync::atomic::Ordering::SeqCst);
}
}
async fn increment_active_listeners(&self) {
let mut listeners_guard = self.active_listeners_lock.lock().await;
*listeners_guard += 1;
if *listeners_guard == 1 {
let new_ttl = (self.start_time.elapsed() + self.config.ttl).as_micros() as u64;
self.ttl_disabled_since_us
.store(new_ttl, std::sync::atomic::Ordering::SeqCst);
}
}
}
impl ProjectInfoCacheImpl {
@@ -184,9 +142,6 @@ impl ProjectInfoCacheImpl {
project2ep: ClashMap::new(),
account2ep: ClashMap::new(),
config,
ttl_disabled_since_us: AtomicU64::new(u64::MAX),
start_time: Instant::now(),
active_listeners_lock: Mutex::new(0),
}
}
@@ -203,19 +158,17 @@ impl ProjectInfoCacheImpl {
endpoint_id: &EndpointId,
role_name: &RoleName,
) -> Option<RoleAccessControl> {
let valid_since = self.get_cache_times();
let role_name = RoleNameInt::get(role_name)?;
let endpoint_info = self.get_endpoint_cache(endpoint_id)?;
endpoint_info.get_role_secret(role_name, valid_since)
endpoint_info.get_role_secret(role_name)
}
pub(crate) fn get_endpoint_access(
&self,
endpoint_id: &EndpointId,
) -> Option<EndpointAccessControl> {
let valid_since = self.get_cache_times();
let endpoint_info = self.get_endpoint_cache(endpoint_id)?;
endpoint_info.get_controls(valid_since)
endpoint_info.get_controls()
}
pub(crate) fn insert_endpoint_access(
@@ -237,8 +190,8 @@ impl ProjectInfoCacheImpl {
return;
}
let controls = Entry::from(controls);
let role_controls = Entry::from(role_controls);
let controls = Entry::new(controls, self.config.ttl);
let role_controls = Entry::new(role_controls, self.config.ttl);
match self.cache.entry(endpoint_id) {
clashmap::Entry::Vacant(e) => {
@@ -275,27 +228,6 @@ impl ProjectInfoCacheImpl {
}
}
fn ignore_ttl_since(&self) -> Option<Instant> {
let ttl_disabled_since_us = self
.ttl_disabled_since_us
.load(std::sync::atomic::Ordering::Relaxed);
if ttl_disabled_since_us == u64::MAX {
return None;
}
Some(self.start_time + Duration::from_micros(ttl_disabled_since_us))
}
fn get_cache_times(&self) -> Instant {
let mut valid_since = Instant::now() - self.config.ttl;
if let Some(ignore_ttl_since) = self.ignore_ttl_since() {
// We are fine if entry is not older than ttl or was added before we are getting notifications.
valid_since = valid_since.min(ignore_ttl_since);
}
valid_since
}
pub fn maybe_invalidate_role_secret(&self, endpoint_id: &EndpointId, role_name: &RoleName) {
let Some(endpoint_id) = EndpointIdInt::get(endpoint_id) else {
return;
@@ -313,16 +245,7 @@ impl ProjectInfoCacheImpl {
return;
};
let created_at = role_controls.get().created_at;
let expire = match self.ignore_ttl_since() {
// if ignoring TTL, we should still try and roll the password if it's old
// and we the client gave an incorrect password. There could be some lag on the redis channel.
Some(_) => created_at + self.config.ttl < Instant::now(),
// edge case: redis is down, let's be generous and invalidate the cache immediately.
None => true,
};
if expire {
if role_controls.get().expires_at <= Instant::now() {
role_controls.remove();
}
}