diff --git a/Cargo.lock b/Cargo.lock index 084f867c57..b43f4fdea0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5431,7 +5431,6 @@ dependencies = [ "futures", "gettid", "hashbrown 0.14.5", - "hashlink", "hex", "hmac", "hostname", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 5fd280c5b0..eb8c0ed037 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -33,7 +33,6 @@ env_logger.workspace = true framed-websockets.workspace = true futures.workspace = true hashbrown.workspace = true -hashlink.workspace = true hex.workspace = true hmac.workspace = true hostname.workspace = true diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index b06ed3a0ae..2a02748a10 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -8,11 +8,12 @@ use tracing::{info, info_span}; use crate::auth::backend::ComputeUserInfo; use crate::cache::Cached; +use crate::cache::node_info::CachedNodeInfo; use crate::compute::AuthInfo; use crate::config::AuthenticationConfig; use crate::context::RequestContext; use crate::control_plane::client::cplane_proxy_v1; -use crate::control_plane::{self, CachedNodeInfo, NodeInfo}; +use crate::control_plane::{self, NodeInfo}; use crate::error::{ReportableError, UserFacingError}; use crate::pqproto::BeMessage; use crate::proxy::NeonOptions; diff --git a/proxy/src/auth/backend/mod.rs b/proxy/src/auth/backend/mod.rs index e7805d8bfe..30229cec23 100644 --- a/proxy/src/auth/backend/mod.rs +++ b/proxy/src/auth/backend/mod.rs @@ -16,14 +16,14 @@ use tracing::{debug, info}; use crate::auth::{self, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange}; use crate::cache::Cached; +use crate::cache::node_info::CachedNodeInfo; use crate::config::AuthenticationConfig; use crate::context::RequestContext; use crate::control_plane::client::ControlPlaneClient; use crate::control_plane::errors::GetAuthInfoError; use crate::control_plane::messages::EndpointRateLimitConfig; use crate::control_plane::{ - self, AccessBlockerFlags, AuthSecret, CachedNodeInfo, ControlPlaneApi, EndpointAccessControl, - RoleAccessControl, + self, AccessBlockerFlags, AuthSecret, ControlPlaneApi, EndpointAccessControl, RoleAccessControl, }; use crate::intern::EndpointIdInt; use crate::pqproto::BeMessage; @@ -433,11 +433,12 @@ mod tests { use super::auth_quirks; use super::jwt::JwkCache; use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern}; + use crate::cache::node_info::CachedNodeInfo; use crate::config::AuthenticationConfig; use crate::context::RequestContext; use crate::control_plane::messages::EndpointRateLimitConfig; use crate::control_plane::{ - self, AccessBlockerFlags, CachedNodeInfo, EndpointAccessControl, RoleAccessControl, + self, AccessBlockerFlags, EndpointAccessControl, RoleAccessControl, }; use crate::proxy::NeonOptions; use crate::rate_limiter::EndpointRateLimiter; diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 4148f4bc62..aa0e1b8f98 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -538,7 +538,7 @@ pub async fn run() -> anyhow::Result<()> { maintenance_tasks.spawn(async move { loop { tokio::time::sleep(Duration::from_secs(600)).await; - db_schema_cache.flush(); + db_schema_cache.0.run_pending_tasks(); } }); } @@ -711,12 +711,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { info!("Using DbSchemaCache with options={db_schema_cache_config:?}"); let db_schema_cache = if args.is_rest_broker { - Some(DbSchemaCache::new( - "db_schema_cache", - db_schema_cache_config.size, - db_schema_cache_config.ttl, - true, - )) + Some(DbSchemaCache::new(db_schema_cache_config)) } else { None }; diff --git a/proxy/src/cache/common.rs b/proxy/src/cache/common.rs index 939bf960b0..bced5be972 100644 --- a/proxy/src/cache/common.rs +++ b/proxy/src/cache/common.rs @@ -1,7 +1,5 @@ -use std::{ - ops::{Deref, DerefMut}, - time::{Duration, Instant}, -}; +use std::ops::{Deref, DerefMut}; +use std::time::{Duration, Instant}; use moka::Expiry; @@ -20,20 +18,16 @@ pub(crate) trait Cache { /// Entry's value. type Value; - /// Used for entry invalidation. - type LookupInfo; - /// Invalidate an entry using a lookup info. /// We don't have an empty default impl because it's error-prone. - fn invalidate(&self, _: &Self::LookupInfo); + fn invalidate(&self, _: &Self::Key); } impl Cache for &C { type Key = C::Key; type Value = C::Value; - type LookupInfo = C::LookupInfo; - fn invalidate(&self, info: &Self::LookupInfo) { + fn invalidate(&self, info: &Self::Key) { C::invalidate(self, info); } } @@ -41,7 +35,7 @@ impl Cache for &C { /// Wrapper for convenient entry invalidation. pub(crate) struct Cached::Value> { /// Cache + lookup info. - pub(crate) token: Option<(C, C::LookupInfo)>, + pub(crate) token: Option<(C, C::Key)>, /// The value itself. pub(crate) value: V, @@ -53,23 +47,6 @@ impl Cached { Self { token: None, value } } - pub(crate) fn take_value(self) -> (Cached, V) { - ( - Cached { - token: self.token, - value: (), - }, - self.value, - ) - } - - pub(crate) fn map(self, f: impl FnOnce(V) -> U) -> Cached { - Cached { - token: self.token, - value: f(self.value), - } - } - /// Drop this entry from a cache if it's still there. pub(crate) fn invalidate(self) -> V { if let Some((cache, info)) = &self.token { diff --git a/proxy/src/cache/mod.rs b/proxy/src/cache/mod.rs index ce7f781213..0a607a1409 100644 --- a/proxy/src/cache/mod.rs +++ b/proxy/src/cache/mod.rs @@ -1,6 +1,5 @@ pub(crate) mod common; +pub(crate) mod node_info; pub(crate) mod project_info; -mod timed_lru; -pub(crate) use common::{Cache, Cached}; -pub(crate) use timed_lru::TimedLru; +pub(crate) use common::{Cached, ControlPlaneResult, CplaneExpiry}; diff --git a/proxy/src/cache/node_info.rs b/proxy/src/cache/node_info.rs new file mode 100644 index 0000000000..22f4d9dc04 --- /dev/null +++ b/proxy/src/cache/node_info.rs @@ -0,0 +1,47 @@ +use crate::cache::common::Cache; +use crate::cache::{Cached, ControlPlaneResult, CplaneExpiry}; +use crate::config::CacheOptions; +use crate::control_plane::NodeInfo; +use crate::types::EndpointCacheKey; + +pub(crate) struct NodeInfoCache(moka::sync::Cache>); +pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>; + +impl Cache for NodeInfoCache { + type Key = EndpointCacheKey; + type Value = ControlPlaneResult; + + fn invalidate(&self, info: &EndpointCacheKey) { + self.0.invalidate(info); + } +} + +impl NodeInfoCache { + pub fn new(config: CacheOptions) -> Self { + let builder = moka::sync::Cache::builder() + .name("node_info_cache") + .expire_after(CplaneExpiry::default()); + let builder = config.moka(builder); + Self(builder.build()) + } + + pub fn insert(&self, key: EndpointCacheKey, value: ControlPlaneResult) { + self.0.insert(key, value); + } + + pub fn get(&'static self, key: &EndpointCacheKey) -> Option> { + self.0.get(key) + } + + pub fn get_entry( + &'static self, + key: &EndpointCacheKey, + ) -> Option> { + self.get(key).map(|res| { + res.map(|value| Cached { + token: Some((self, key.clone())), + value, + }) + }) + } +} diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 3c7ed1ae8d..a6af9c158b 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -215,12 +215,13 @@ impl ProjectInfoCache { #[cfg(test)] mod tests { + use std::sync::Arc; + use std::time::Duration; + use super::*; use crate::control_plane::messages::{Details, EndpointRateLimitConfig, ErrorInfo, Status}; use crate::control_plane::{AccessBlockerFlags, AuthSecret}; use crate::scram::ServerSecret; - use std::sync::Arc; - use std::time::Duration; #[tokio::test] async fn test_project_info_cache_settings() { diff --git a/proxy/src/cache/timed_lru.rs b/proxy/src/cache/timed_lru.rs deleted file mode 100644 index 6518978120..0000000000 --- a/proxy/src/cache/timed_lru.rs +++ /dev/null @@ -1,259 +0,0 @@ -use std::borrow::Borrow; -use std::hash::Hash; -use std::time::{Duration, Instant}; - -// This seems to make more sense than `lru` or `cached`: -// -// * `near/nearcore` ditched `cached` in favor of `lru` -// (https://github.com/near/nearcore/issues?q=is%3Aissue+lru+is%3Aclosed). -// -// * `lru` methods use an obscure `KeyRef` type in their contraints (which is deliberately excluded from docs). -// This severely hinders its usage both in terms of creating wrappers and supported key types. -// -// On the other hand, `hashlink` has good download stats and appears to be maintained. -use hashlink::{LruCache, linked_hash_map::RawEntryMut}; -use tracing::debug; - -use super::Cache; -use super::common::Cached; - -/// An implementation of timed LRU cache with fixed capacity. -/// Key properties: -/// -/// * Whenever a new entry is inserted, the least recently accessed one is evicted. -/// The cache also keeps track of entry's insertion time (`created_at`) and TTL (`expires_at`). -/// -/// * If `update_ttl_on_retrieval` is `true`. When the entry is about to be retrieved, we check its expiration timestamp. -/// If the entry has expired, we remove it from the cache; Otherwise we bump the -/// expiration timestamp (e.g. +5mins) and change its place in LRU list to prolong -/// its existence. -/// -/// * There's an API for immediate invalidation (removal) of a cache entry; -/// It's useful in case we know for sure that the entry is no longer correct. -/// See [`Cached`] for more information. -/// -/// * Expired entries are kept in the cache, until they are evicted by the LRU policy, -/// or by a successful lookup (i.e. the entry hasn't expired yet). -/// There is no background job to reap the expired records. -/// -/// * It's possible for an entry that has not yet expired entry to be evicted -/// before expired items. That's a bit wasteful, but probably fine in practice. -pub(crate) struct TimedLru { - /// Cache's name for tracing. - name: &'static str, - - /// The underlying cache implementation. - cache: parking_lot::Mutex>>, - - /// Default time-to-live of a single entry. - ttl: Duration, - - update_ttl_on_retrieval: bool, -} - -impl Cache for TimedLru { - type Key = K; - type Value = V; - type LookupInfo = Key; - - fn invalidate(&self, info: &Self::LookupInfo) { - self.invalidate_raw(info); - } -} - -struct Entry { - created_at: Instant, - expires_at: Instant, - ttl: Duration, - update_ttl_on_retrieval: bool, - value: T, -} - -impl TimedLru { - /// Construct a new LRU cache with timed entries. - pub(crate) fn new( - name: &'static str, - capacity: usize, - ttl: Duration, - update_ttl_on_retrieval: bool, - ) -> Self { - Self { - name, - cache: LruCache::new(capacity).into(), - ttl, - update_ttl_on_retrieval, - } - } - - /// Drop an entry from the cache if it's outdated. - #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)] - fn invalidate_raw(&self, key: &K) { - // Do costly things before taking the lock. - let mut cache = self.cache.lock(); - let entry = match cache.raw_entry_mut().from_key(key) { - RawEntryMut::Vacant(_) => return, - RawEntryMut::Occupied(x) => x.remove(), - }; - drop(cache); // drop lock before logging - - let Entry { - created_at, - expires_at, - .. - } = entry; - - debug!( - ?created_at, - ?expires_at, - "processed a cache entry invalidation event" - ); - } - - /// Try retrieving an entry by its key, then execute `extract` if it exists. - #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)] - fn get_raw(&self, key: &Q, extract: impl FnOnce(&K, &Entry) -> R) -> Option - where - K: Borrow, - Q: Hash + Eq + ?Sized, - { - let now = Instant::now(); - - // Do costly things before taking the lock. - let mut cache = self.cache.lock(); - let mut raw_entry = match cache.raw_entry_mut().from_key(key) { - RawEntryMut::Vacant(_) => return None, - RawEntryMut::Occupied(x) => x, - }; - - // Immeditely drop the entry if it has expired. - let entry = raw_entry.get(); - if entry.expires_at <= now { - raw_entry.remove(); - return None; - } - - let value = extract(raw_entry.key(), entry); - let (created_at, expires_at) = (entry.created_at, entry.expires_at); - - // Update the deadline and the entry's position in the LRU list. - let deadline = now.checked_add(raw_entry.get().ttl).expect("time overflow"); - if raw_entry.get().update_ttl_on_retrieval { - raw_entry.get_mut().expires_at = deadline; - } - raw_entry.to_back(); - - drop(cache); // drop lock before logging - debug!( - created_at = format_args!("{created_at:?}"), - old_expires_at = format_args!("{expires_at:?}"), - new_expires_at = format_args!("{deadline:?}"), - "accessed a cache entry" - ); - - Some(value) - } - - /// Insert an entry to the cache. If an entry with the same key already - /// existed, return the previous value and its creation timestamp. - #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)] - fn insert_raw(&self, key: K, value: V) -> (Instant, Option) { - self.insert_raw_ttl(key, value, self.ttl, self.update_ttl_on_retrieval) - } - - /// Insert an entry to the cache. If an entry with the same key already - /// existed, return the previous value and its creation timestamp. - #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)] - fn insert_raw_ttl( - &self, - key: K, - value: V, - ttl: Duration, - update: bool, - ) -> (Instant, Option) { - let created_at = Instant::now(); - let expires_at = created_at.checked_add(ttl).expect("time overflow"); - - let entry = Entry { - created_at, - expires_at, - ttl, - update_ttl_on_retrieval: update, - value, - }; - - // Do costly things before taking the lock. - let old = self - .cache - .lock() - .insert(key, entry) - .map(|entry| entry.value); - - debug!( - created_at = format_args!("{created_at:?}"), - expires_at = format_args!("{expires_at:?}"), - replaced = old.is_some(), - "created a cache entry" - ); - - (created_at, old) - } -} - -impl TimedLru { - pub(crate) fn insert_ttl(&self, key: K, value: V, ttl: Duration) { - self.insert_raw_ttl(key, value, ttl, false); - } - - #[cfg(feature = "rest_broker")] - pub(crate) fn insert(&self, key: K, value: V) { - self.insert_raw_ttl(key, value, self.ttl, self.update_ttl_on_retrieval); - } - - pub(crate) fn insert_unit(&self, key: K, value: V) -> (Option, Cached<&Self, ()>) { - let (_, old) = self.insert_raw(key.clone(), value); - - let cached = Cached { - token: Some((self, key)), - value: (), - }; - - (old, cached) - } - - #[cfg(feature = "rest_broker")] - pub(crate) fn flush(&self) { - let now = Instant::now(); - let mut cache = self.cache.lock(); - - // Collect keys of expired entries first - let expired_keys: Vec<_> = cache - .iter() - .filter_map(|(key, entry)| { - if entry.expires_at <= now { - Some(key.clone()) - } else { - None - } - }) - .collect(); - - // Remove expired entries - for key in expired_keys { - cache.remove(&key); - } - } -} - -impl TimedLru { - /// Retrieve a cached entry in convenient wrapper, alongside timing information. - pub(crate) fn get(&self, key: &Q) -> Option::Value>> - where - K: Borrow + Clone, - Q: Hash + Eq + ?Sized, - { - self.get_raw(key, |key, entry| Cached { - token: Some((self, key.clone())), - value: entry.value.clone(), - }) - } -} diff --git a/proxy/src/config.rs b/proxy/src/config.rs index f7f681f18b..04080efcca 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -107,20 +107,23 @@ pub fn remote_storage_from_toml(s: &str) -> anyhow::Result #[derive(Debug)] pub struct CacheOptions { /// Max number of entries. - pub size: usize, + pub size: Option, /// Entry's time-to-live. - pub ttl: Duration, + pub absolute_ttl: Option, + /// Entry's time-to-idle. + pub idle_ttl: Option, } impl CacheOptions { - /// Default options for [`crate::control_plane::NodeInfoCache`]. - pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=4000,ttl=4m"; + /// Default options for [`crate::cache::node_info::NodeInfoCache`]. + pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=4000,idle_ttl=4m"; /// Parse cache options passed via cmdline. /// Example: [`Self::CACHE_DEFAULT_OPTIONS`]. fn parse(options: &str) -> anyhow::Result { let mut size = None; - let mut ttl = None; + let mut absolute_ttl = None; + let mut idle_ttl = None; for option in options.split(',') { let (key, value) = option @@ -129,21 +132,34 @@ impl CacheOptions { match key { "size" => size = Some(value.parse()?), - "ttl" => ttl = Some(humantime::parse_duration(value)?), + "absolute_ttl" | "ttl" => absolute_ttl = Some(humantime::parse_duration(value)?), + "idle_ttl" | "tti" => idle_ttl = Some(humantime::parse_duration(value)?), unknown => bail!("unknown key: {unknown}"), } } - // TTL doesn't matter if cache is always empty. - if let Some(0) = size { - ttl.get_or_insert(Duration::default()); - } - Ok(Self { - size: size.context("missing `size`")?, - ttl: ttl.context("missing `ttl`")?, + size, + absolute_ttl, + idle_ttl, }) } + + pub fn moka( + &self, + mut builder: moka::sync::CacheBuilder, + ) -> moka::sync::CacheBuilder { + if let Some(size) = self.size { + builder = builder.max_capacity(size); + } + if let Some(ttl) = self.absolute_ttl { + builder = builder.time_to_live(ttl); + } + if let Some(tti) = self.idle_ttl { + builder = builder.time_to_idle(tti); + } + builder + } } impl FromStr for CacheOptions { @@ -169,7 +185,7 @@ pub struct ProjectInfoCacheOptions { } impl ProjectInfoCacheOptions { - /// Default options for [`crate::control_plane::NodeInfoCache`]. + /// Default options for [`crate::cache::project_info::ProjectInfoCache`]. pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=10000,ttl=4m,max_roles=10,gc_interval=60m"; @@ -496,21 +512,37 @@ mod tests { #[test] fn test_parse_cache_options() -> anyhow::Result<()> { - let CacheOptions { size, ttl } = "size=4096,ttl=5min".parse()?; - assert_eq!(size, 4096); - assert_eq!(ttl, Duration::from_secs(5 * 60)); + let CacheOptions { + size, + absolute_ttl, + idle_ttl: _, + } = "size=4096,ttl=5min".parse()?; + assert_eq!(size, Some(4096)); + assert_eq!(absolute_ttl, Some(Duration::from_secs(5 * 60))); - let CacheOptions { size, ttl } = "ttl=4m,size=2".parse()?; - assert_eq!(size, 2); - assert_eq!(ttl, Duration::from_secs(4 * 60)); + let CacheOptions { + size, + absolute_ttl, + idle_ttl: _, + } = "ttl=4m,size=2".parse()?; + assert_eq!(size, Some(2)); + assert_eq!(absolute_ttl, Some(Duration::from_secs(4 * 60))); - let CacheOptions { size, ttl } = "size=0,ttl=1s".parse()?; - assert_eq!(size, 0); - assert_eq!(ttl, Duration::from_secs(1)); + let CacheOptions { + size, + absolute_ttl, + idle_ttl: _, + } = "size=0,ttl=1s".parse()?; + assert_eq!(size, Some(0)); + assert_eq!(absolute_ttl, Some(Duration::from_secs(1))); - let CacheOptions { size, ttl } = "size=0".parse()?; - assert_eq!(size, 0); - assert_eq!(ttl, Duration::default()); + let CacheOptions { + size, + absolute_ttl, + idle_ttl: _, + } = "size=0".parse()?; + assert_eq!(size, Some(0)); + assert_eq!(absolute_ttl, None); Ok(()) } diff --git a/proxy/src/control_plane/client/cplane_proxy_v1.rs b/proxy/src/control_plane/client/cplane_proxy_v1.rs index 1a9f22e250..b76b13e2c2 100644 --- a/proxy/src/control_plane/client/cplane_proxy_v1.rs +++ b/proxy/src/control_plane/client/cplane_proxy_v1.rs @@ -16,7 +16,8 @@ use tracing::{Instrument, debug, info, info_span, warn}; use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute}; use crate::auth::backend::ComputeUserInfo; use crate::auth::backend::jwt::AuthRule; -use crate::cache::common::DEFAULT_ERROR_TTL; +use crate::cache::Cached; +use crate::cache::node_info::CachedNodeInfo; use crate::context::RequestContext; use crate::control_plane::caches::ApiCaches; use crate::control_plane::errors::{ @@ -25,8 +26,7 @@ use crate::control_plane::errors::{ use crate::control_plane::locks::ApiLocks; use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse}; use crate::control_plane::{ - AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo, - RoleAccessControl, + AccessBlockerFlags, AuthInfo, AuthSecret, EndpointAccessControl, NodeInfo, RoleAccessControl, }; use crate::metrics::Metrics; use crate::proxy::retry::CouldRetry; @@ -415,8 +415,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { macro_rules! check_cache { () => { - if let Some(cached) = self.caches.node_info.get(&key) { - let (cached, info) = cached.take_value(); + if let Some(info) = self.caches.node_info.get_entry(&key) { return match info { Err(msg) => { info!(key = &*key, "found cached wake_compute error"); @@ -428,7 +427,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { Ok(info) => { debug!(key = &*key, "found cached compute node info"); ctx.set_project(info.aux.clone()); - Ok(cached.map(|()| info)) + Ok(info) } }; } @@ -467,10 +466,12 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { let mut stored_node = node.clone(); // store the cached node as 'warm_cached' stored_node.aux.cold_start_info = ColdStartInfo::WarmCached; + self.caches.node_info.insert(key.clone(), Ok(stored_node)); - let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node)); - - Ok(cached.map(|()| node)) + Ok(Cached { + token: Some((&self.caches.node_info, key)), + value: node, + }) } Err(err) => match err { WakeComputeError::ControlPlane(ControlPlaneError::Message(ref msg)) => { @@ -487,9 +488,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { "created a cache entry for the wake compute error" ); - let ttl = retry_info.map_or(DEFAULT_ERROR_TTL, |r| r.retry_at - Instant::now()); - - self.caches.node_info.insert_ttl(key, Err(msg.clone()), ttl); + self.caches.node_info.insert(key, Err(msg.clone())); Err(err) } diff --git a/proxy/src/control_plane/client/mock.rs b/proxy/src/control_plane/client/mock.rs index b84dba6b09..9e48d91340 100644 --- a/proxy/src/control_plane/client/mock.rs +++ b/proxy/src/control_plane/client/mock.rs @@ -15,6 +15,7 @@ use crate::auth::IpPattern; use crate::auth::backend::ComputeUserInfo; use crate::auth::backend::jwt::AuthRule; use crate::cache::Cached; +use crate::cache::node_info::CachedNodeInfo; use crate::compute::ConnectInfo; use crate::context::RequestContext; use crate::control_plane::errors::{ @@ -22,8 +23,7 @@ use crate::control_plane::errors::{ }; use crate::control_plane::messages::{EndpointRateLimitConfig, MetricsAuxInfo}; use crate::control_plane::{ - AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo, - RoleAccessControl, + AccessBlockerFlags, AuthInfo, AuthSecret, EndpointAccessControl, NodeInfo, RoleAccessControl, }; use crate::intern::RoleNameInt; use crate::scram; diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index 445b44770f..ec26746873 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -13,10 +13,11 @@ use tracing::{debug, info}; use super::{EndpointAccessControl, RoleAccessControl}; use crate::auth::backend::ComputeUserInfo; use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError}; +use crate::cache::node_info::{CachedNodeInfo, NodeInfoCache}; use crate::cache::project_info::ProjectInfoCache; use crate::config::{CacheOptions, ProjectInfoCacheOptions}; use crate::context::RequestContext; -use crate::control_plane::{CachedNodeInfo, ControlPlaneApi, NodeInfoCache, errors}; +use crate::control_plane::{ControlPlaneApi, errors}; use crate::error::ReportableError; use crate::metrics::ApiLockMetrics; use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token}; @@ -128,12 +129,7 @@ impl ApiCaches { project_info_cache_config: ProjectInfoCacheOptions, ) -> Self { Self { - node_info: NodeInfoCache::new( - "node_info_cache", - wake_compute_cache_config.size, - wake_compute_cache_config.ttl, - true, - ), + node_info: NodeInfoCache::new(wake_compute_cache_config), project_info: Arc::new(ProjectInfoCache::new(project_info_cache_config)), } } diff --git a/proxy/src/control_plane/mod.rs b/proxy/src/control_plane/mod.rs index 5bfa24c92d..6f326d789a 100644 --- a/proxy/src/control_plane/mod.rs +++ b/proxy/src/control_plane/mod.rs @@ -16,13 +16,13 @@ use messages::EndpointRateLimitConfig; use crate::auth::backend::ComputeUserInfo; use crate::auth::backend::jwt::AuthRule; use crate::auth::{AuthError, IpPattern, check_peer_addr_is_in_list}; -use crate::cache::{Cached, TimedLru}; +use crate::cache::node_info::CachedNodeInfo; use crate::context::RequestContext; -use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo}; +use crate::control_plane::messages::MetricsAuxInfo; use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt}; use crate::protocol2::ConnectionInfoExtra; use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig}; -use crate::types::{EndpointCacheKey, EndpointId, RoleName}; +use crate::types::{EndpointId, RoleName}; use crate::{compute, scram}; /// Various cache-related types. @@ -77,10 +77,6 @@ pub(crate) struct AccessBlockerFlags { pub vpc_access_blocked: bool, } -pub(crate) type NodeInfoCache = - TimedLru>>; -pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>; - #[derive(Clone, Debug)] pub struct RoleAccessControl { pub secret: Option, diff --git a/proxy/src/proxy/connect_auth.rs b/proxy/src/proxy/connect_auth.rs index 5a1d1ae314..77578c71b1 100644 --- a/proxy/src/proxy/connect_auth.rs +++ b/proxy/src/proxy/connect_auth.rs @@ -2,7 +2,7 @@ use thiserror::Error; use crate::auth::Backend; use crate::auth::backend::ComputeUserInfo; -use crate::cache::Cache; +use crate::cache::common::Cache; use crate::compute::{AuthInfo, ComputeConnection, ConnectionError, PostgresError}; use crate::config::ProxyConfig; use crate::context::RequestContext; diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index 1a4e5f77d2..515f925236 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -1,11 +1,12 @@ use tokio::time; use tracing::{debug, info, warn}; +use crate::cache::node_info::CachedNodeInfo; use crate::compute::{self, COULD_NOT_CONNECT, ComputeConnection}; use crate::config::{ComputeConfig, ProxyConfig, RetryConfig}; use crate::context::RequestContext; +use crate::control_plane::NodeInfo; use crate::control_plane::locks::ApiLocks; -use crate::control_plane::{self, NodeInfo}; use crate::metrics::{ ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType, }; @@ -17,7 +18,7 @@ use crate::types::Host; /// (e.g. the compute node's address might've changed at the wrong time). /// Invalidate the cache entry (if any) to prevent subsequent errors. #[tracing::instrument(skip_all)] -pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> NodeInfo { +pub(crate) fn invalidate_cache(node_info: CachedNodeInfo) -> NodeInfo { let is_cached = node_info.cached(); if is_cached { warn!("invalidating stalled compute node info cache entry"); @@ -37,7 +38,7 @@ pub(crate) trait ConnectMechanism { async fn connect_once( &self, ctx: &RequestContext, - node_info: &control_plane::CachedNodeInfo, + node_info: &CachedNodeInfo, config: &ComputeConfig, ) -> Result; } @@ -66,7 +67,7 @@ impl ConnectMechanism for TcpMechanism<'_> { async fn connect_once( &self, ctx: &RequestContext, - node_info: &control_plane::CachedNodeInfo, + node_info: &CachedNodeInfo, config: &ComputeConfig, ) -> Result { let permit = self.locks.get_permit(&node_info.conn_info.host).await?; diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index 0efedb1487..7e0710749e 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -20,11 +20,12 @@ use tracing_test::traced_test; use super::retry::CouldRetry; use crate::auth::backend::{ComputeUserInfo, MaybeOwned}; -use crate::config::{ComputeConfig, RetryConfig, TlsConfig}; +use crate::cache::node_info::{CachedNodeInfo, NodeInfoCache}; +use crate::config::{CacheOptions, ComputeConfig, RetryConfig, TlsConfig}; use crate::context::RequestContext; use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient}; use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status}; -use crate::control_plane::{self, CachedNodeInfo, NodeInfo, NodeInfoCache}; +use crate::control_plane::{self, NodeInfo}; use crate::error::ErrorKind; use crate::pglb::ERR_INSECURE_CONNECTION; use crate::pglb::handshake::{HandshakeData, handshake}; @@ -418,12 +419,11 @@ impl TestConnectMechanism { Self { counter: Arc::new(std::sync::Mutex::new(0)), sequence, - cache: Box::leak(Box::new(NodeInfoCache::new( - "test", - 1, - Duration::from_secs(100), - false, - ))), + cache: Box::leak(Box::new(NodeInfoCache::new(CacheOptions { + size: Some(1), + absolute_ttl: Some(Duration::from_secs(100)), + idle_ttl: None, + }))), } } } @@ -437,7 +437,7 @@ impl ConnectMechanism for TestConnectMechanism { async fn connect_once( &self, _ctx: &RequestContext, - _node_info: &control_plane::CachedNodeInfo, + _node_info: &CachedNodeInfo, _config: &ComputeConfig, ) -> Result { let mut counter = self.counter.lock().unwrap(); @@ -547,8 +547,11 @@ fn helper_create_uncached_node_info() -> NodeInfo { fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo { let node = helper_create_uncached_node_info(); - let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone())); - node2.map(|()| node) + cache.insert("key".into(), Ok(node.clone())); + CachedNodeInfo { + token: Some((cache, "key".into())), + value: node, + } } fn helper_create_connect_info( diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs index b8edf9fd5c..66f2df2af4 100644 --- a/proxy/src/proxy/wake_compute.rs +++ b/proxy/src/proxy/wake_compute.rs @@ -1,9 +1,9 @@ use async_trait::async_trait; use tracing::{error, info}; +use crate::cache::node_info::CachedNodeInfo; use crate::config::RetryConfig; use crate::context::RequestContext; -use crate::control_plane::CachedNodeInfo; use crate::control_plane::errors::{ControlPlaneError, WakeComputeError}; use crate::error::ReportableError; use crate::metrics::{ diff --git a/proxy/src/serverless/rest.rs b/proxy/src/serverless/rest.rs index 7f85fb53c5..632b308e5d 100644 --- a/proxy/src/serverless/rest.rs +++ b/proxy/src/serverless/rest.rs @@ -12,6 +12,7 @@ use hyper::body::Incoming; use hyper::http::{HeaderName, HeaderValue}; use hyper::{Request, Response, StatusCode}; use indexmap::IndexMap; +use moka::sync::Cache; use ouroboros::self_referencing; use serde::de::DeserializeOwned; use serde::{Deserialize, Deserializer}; @@ -53,7 +54,6 @@ use super::http_util::{ }; use super::json::JsonConversionError; use crate::auth::backend::ComputeCredentialKeys; -use crate::cache::{Cached, TimedLru}; use crate::config::ProxyConfig; use crate::context::RequestContext; use crate::error::{ErrorKind, ReportableError, UserFacingError}; @@ -138,8 +138,15 @@ pub struct ApiConfig { } // The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint -pub(crate) type DbSchemaCache = TimedLru>; +pub(crate) struct DbSchemaCache(pub Cache>); impl DbSchemaCache { + pub fn new(config: crate::config::CacheOptions) -> Self { + let builder = Cache::builder().name("db_schema_cache"); + let builder = config.moka(builder); + + Self(builder.build()) + } + pub async fn get_cached_or_remote( &self, endpoint_id: &EndpointCacheKey, @@ -149,8 +156,8 @@ impl DbSchemaCache { ctx: &RequestContext, config: &'static ProxyConfig, ) -> Result, RestError> { - match self.get(endpoint_id) { - Some(Cached { value: v, .. }) => Ok(v), + match self.0.get(endpoint_id) { + Some(v) => Ok(v), None => { info!("db_schema cache miss for endpoint: {:?}", endpoint_id); let remote_value = self @@ -173,7 +180,7 @@ impl DbSchemaCache { db_extra_search_path: None, }; let value = Arc::new((api_config, schema_owned)); - self.insert(endpoint_id.clone(), value); + self.0.insert(endpoint_id.clone(), value); return Err(e); } Err(e) => { @@ -181,7 +188,7 @@ impl DbSchemaCache { } }; let value = Arc::new((api_config, schema_owned)); - self.insert(endpoint_id.clone(), value.clone()); + self.0.insert(endpoint_id.clone(), value.clone()); Ok(value) } }