proxy: moka cache

This commit is contained in:
Conrad Ludgate
2024-05-07 07:59:23 +01:00
parent ac7dc82103
commit ef3a9dfafa
12 changed files with 157 additions and 307 deletions

80
Cargo.lock generated
View File

@@ -213,9 +213,9 @@ dependencies = [
[[package]]
name = "async-lock"
version = "3.2.0"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7125e42787d53db9dd54261812ef17e937c95a51e4d291373b670342fa44310c"
checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b"
dependencies = [
"event-listener 4.0.0",
"event-listener-strategy",
@@ -1239,9 +1239,9 @@ dependencies = [
[[package]]
name = "concurrent-queue"
version = "2.3.0"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
@@ -1875,6 +1875,17 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "event-listener"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d9944b8ca13534cdfb2800775f8dd4902ff3fc75a50101466decadfdf322a24"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.4.0"
@@ -3121,6 +3132,30 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "moka"
version = "0.12.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e0d88686dc561d743b40de8269b26eaf0dc58781bde087b0984646602021d08"
dependencies = [
"async-lock",
"async-trait",
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"event-listener 5.3.0",
"futures-util",
"once_cell",
"parking_lot 0.12.1",
"quanta",
"rustc_version",
"smallvec",
"tagptr",
"thiserror",
"triomphe",
"uuid",
]
[[package]]
name = "multimap"
version = "0.8.3"
@@ -4377,6 +4412,7 @@ dependencies = [
"md5",
"measured",
"metrics",
"moka",
"native-tls",
"once_cell",
"opentelemetry",
@@ -4438,6 +4474,21 @@ dependencies = [
"x509-parser",
]
[[package]]
name = "quanta"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quick-xml"
version = "0.31.0"
@@ -4549,6 +4600,15 @@ dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "raw-cpuid"
version = "11.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e29830cbb1290e404f24c73af91c5d8d631ce7e128691e9477556b540cd01ecd"
dependencies = [
"bitflags 2.4.1",
]
[[package]]
name = "rayon"
version = "1.7.0"
@@ -5990,6 +6050,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "tar"
version = "0.4.40"
@@ -6641,6 +6707,12 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "triomphe"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3"
[[package]]
name = "try-lock"
version = "0.2.4"

View File

@@ -101,6 +101,7 @@ postgres-protocol.workspace = true
redis.workspace = true
workspace_hack.workspace = true
moka = { version = "0.12.7", features = ["future"] }
[dev-dependencies]
camino-tempfile.workspace = true

View File

@@ -69,8 +69,10 @@ pub enum BackendType<'a, T, D> {
Link(MaybeOwned<'a, url::ApiUrl>, D),
}
#[cfg(test)]
#[async_trait::async_trait]
pub trait TestBackend: Send + Sync + 'static {
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
async fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
fn get_allowed_ips_and_secret(
&self,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), console::errors::GetAuthInfoError>;
@@ -343,7 +345,7 @@ async fn auth_quirks(
Err(e) => {
if e.is_auth_failed() {
// The password could have been changed, so we invalidate the cache.
cached_entry.invalidate();
cached_entry.invalidate().await;
}
Err(e)
}

View File

@@ -1,7 +1,5 @@
pub mod common;
pub mod endpoints;
pub mod project_info;
mod timed_lru;
pub use common::{Cache, Cached};
pub use timed_lru::TimedLru;

View File

@@ -3,6 +3,7 @@ use std::ops::{Deref, DerefMut};
/// A generic trait which exposes types of cache's key and value,
/// as well as the notion of cache entry invalidation.
/// This is useful for [`Cached`].
#[allow(async_fn_in_trait)]
pub trait Cache {
/// Entry's key.
type Key;
@@ -15,7 +16,7 @@ pub trait Cache {
/// Invalidate an entry using a lookup info.
/// We don't have an empty default impl because it's error-prone.
fn invalidate(&self, _: &Self::LookupInfo<Self::Key>);
async fn invalidate(&self, _: &Self::LookupInfo<Self::Key>);
}
impl<C: Cache> Cache for &C {
@@ -23,8 +24,8 @@ impl<C: Cache> Cache for &C {
type Value = C::Value;
type LookupInfo<Key> = C::LookupInfo<Key>;
fn invalidate(&self, info: &Self::LookupInfo<Self::Key>) {
C::invalidate(self, info)
async fn invalidate(&self, info: &Self::LookupInfo<Self::Key>) {
C::invalidate(self, info).await
}
}
@@ -54,9 +55,9 @@ impl<C: Cache, V> Cached<C, V> {
}
/// Drop this entry from a cache if it's still there.
pub fn invalidate(self) -> V {
pub async fn invalidate(self) -> V {
if let Some((cache, info)) = &self.token {
cache.invalidate(info);
cache.invalidate(info).await;
}
self.value
}
@@ -80,3 +81,18 @@ impl<C: Cache, V> DerefMut for Cached<C, V> {
&mut self.value
}
}
impl<K, V, S> Cache for moka::future::Cache<K, V, S>
where
K: std::hash::Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
{
type Key = K;
type Value = V;
type LookupInfo<Key> = Key;
async fn invalidate(&self, key: &Self::LookupInfo<Self::Key>) {
moka::future::Cache::invalidate(self, key).await
}
}

View File

@@ -352,7 +352,7 @@ impl Cache for ProjectInfoCacheImpl {
type LookupInfo<Key> = CachedLookupInfo;
fn invalidate(&self, key: &Self::LookupInfo<SmolStr>) {
async fn invalidate(&self, key: &Self::LookupInfo<SmolStr>) {
match &key.lookup_type {
LookupType::RoleSecret(role_name) => {
if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
@@ -489,7 +489,7 @@ mod tests {
assert!(!cached.cached());
assert_eq!(cached.value, secret1);
cached.invalidate(); // Shouldn't do anything.
cached.invalidate().await; // Shouldn't do anything.
let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
assert_eq!(cached.value, secret1);

View File

@@ -1,258 +0,0 @@
use std::{
borrow::Borrow,
hash::Hash,
time::{Duration, Instant},
};
use tracing::debug;
// This seems to make more sense than `lru` or `cached`:
//
// * `near/nearcore` ditched `cached` in favor of `lru`
// (https://github.com/near/nearcore/issues?q=is%3Aissue+lru+is%3Aclosed).
//
// * `lru` methods use an obscure `KeyRef` type in their contraints (which is deliberately excluded from docs).
// This severely hinders its usage both in terms of creating wrappers and supported key types.
//
// On the other hand, `hashlink` has good download stats and appears to be maintained.
use hashlink::{linked_hash_map::RawEntryMut, LruCache};
use super::{common::Cached, *};
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:
///
/// * Whenever a new entry is inserted, the least recently accessed one is evicted.
/// The cache also keeps track of entry's insertion time (`created_at`) and TTL (`expires_at`).
///
/// * If `update_ttl_on_retrieval` is `true`. When the entry is about to be retrieved, we check its expiration timestamp.
/// If the entry has expired, we remove it from the cache; Otherwise we bump the
/// expiration timestamp (e.g. +5mins) and change its place in LRU list to prolong
/// its existence.
///
/// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct.
/// See [`timed_lru::LookupInfo`] & [`timed_lru::Cached`] for more information.
///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet).
/// There is no background job to reap the expired records.
///
/// * It's possible for an entry that has not yet expired entry to be evicted
/// before expired items. That's a bit wasteful, but probably fine in practice.
pub struct TimedLru<K, V> {
/// Cache's name for tracing.
name: &'static str,
/// The underlying cache implementation.
cache: parking_lot::Mutex<LruCache<K, Entry<V>>>,
/// Default time-to-live of a single entry.
ttl: Duration,
update_ttl_on_retrieval: bool,
}
impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
type Key = K;
type Value = V;
type LookupInfo<Key> = LookupInfo<Key>;
fn invalidate(&self, info: &Self::LookupInfo<K>) {
self.invalidate_raw(info)
}
}
struct Entry<T> {
created_at: Instant,
expires_at: Instant,
value: T,
}
impl<K: Hash + Eq, V> TimedLru<K, V> {
/// Construct a new LRU cache with timed entries.
pub fn new(
name: &'static str,
capacity: usize,
ttl: Duration,
update_ttl_on_retrieval: bool,
) -> Self {
Self {
name,
cache: LruCache::new(capacity).into(),
ttl,
update_ttl_on_retrieval,
}
}
/// Drop an entry from the cache if it's outdated.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn invalidate_raw(&self, info: &LookupInfo<K>) {
let now = Instant::now();
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let raw_entry = match cache.raw_entry_mut().from_key(&info.key) {
RawEntryMut::Vacant(_) => return,
RawEntryMut::Occupied(x) => x,
};
// Remove the entry if it was created prior to lookup timestamp.
let entry = raw_entry.get();
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
let should_remove = created_at <= info.created_at || expires_at <= now;
if should_remove {
raw_entry.remove();
}
drop(cache); // drop lock before logging
debug!(
created_at = format_args!("{created_at:?}"),
expires_at = format_args!("{expires_at:?}"),
entry_removed = should_remove,
"processed a cache entry invalidation event"
);
}
/// Try retrieving an entry by its key, then execute `extract` if it exists.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn get_raw<Q, R>(&self, key: &Q, extract: impl FnOnce(&K, &Entry<V>) -> R) -> Option<R>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let now = Instant::now();
let deadline = now.checked_add(self.ttl).expect("time overflow");
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let mut raw_entry = match cache.raw_entry_mut().from_key(key) {
RawEntryMut::Vacant(_) => return None,
RawEntryMut::Occupied(x) => x,
};
// Immeditely drop the entry if it has expired.
let entry = raw_entry.get();
if entry.expires_at <= now {
raw_entry.remove();
return None;
}
let value = extract(raw_entry.key(), entry);
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
// Update the deadline and the entry's position in the LRU list.
if self.update_ttl_on_retrieval {
raw_entry.get_mut().expires_at = deadline;
}
raw_entry.to_back();
drop(cache); // drop lock before logging
debug!(
created_at = format_args!("{created_at:?}"),
old_expires_at = format_args!("{expires_at:?}"),
new_expires_at = format_args!("{deadline:?}"),
"accessed a cache entry"
);
Some(value)
}
/// Insert an entry to the cache. If an entry with the same key already
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw(&self, key: K, value: V) -> (Instant, Option<V>) {
let created_at = Instant::now();
let expires_at = created_at.checked_add(self.ttl).expect("time overflow");
let entry = Entry {
created_at,
expires_at,
value,
};
// Do costly things before taking the lock.
let old = self
.cache
.lock()
.insert(key, entry)
.map(|entry| entry.value);
debug!(
created_at = format_args!("{created_at:?}"),
expires_at = format_args!("{expires_at:?}"),
replaced = old.is_some(),
"created a cache entry"
);
(created_at, old)
}
}
impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
pub fn insert(&self, key: K, value: V) -> (Option<V>, Cached<&Self>) {
let (created_at, old) = self.insert_raw(key.clone(), value.clone());
let cached = Cached {
token: Some((self, LookupInfo { created_at, key })),
value,
};
(old, cached)
}
}
impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
/// Retrieve a cached entry in convenient wrapper.
pub fn get<Q>(&self, key: &Q) -> Option<timed_lru::Cached<&Self>>
where
K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized,
{
self.get_raw(key, |key, entry| {
let info = LookupInfo {
created_at: entry.created_at,
key: key.clone(),
};
Cached {
token: Some((self, info)),
value: entry.value.clone(),
}
})
}
/// Retrieve a cached entry in convenient wrapper, ignoring its TTL.
pub fn get_ignoring_ttl<Q>(&self, key: &Q) -> Option<timed_lru::Cached<&Self>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let mut cache = self.cache.lock();
cache
.get(key)
.map(|entry| Cached::new_uncached(entry.value.clone()))
}
/// Remove an entry from the cache.
pub fn remove<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized,
{
let mut cache = self.cache.lock();
cache.remove(key).map(|entry| entry.value)
}
}
/// Lookup information for key invalidation.
pub struct LookupInfo<K> {
/// Time of creation of a cache [`Entry`].
/// We use this during invalidation lookups to prevent eviction of a newer
/// entry sharing the same key (it might've been inserted by a different
/// task after we got the entry we're trying to invalidate now).
created_at: Instant,
/// Search by this key.
key: K,
}

View File

@@ -411,7 +411,7 @@ pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfi
#[derive(Debug)]
pub struct CacheOptions {
/// Max number of entries.
pub size: usize,
pub size: u64,
/// Entry's time-to-live.
pub ttl: Duration,
}

View File

@@ -8,7 +8,7 @@ use crate::{
backend::{ComputeCredentialKeys, ComputeUserInfo},
IpPattern,
},
cache::{endpoints::EndpointsCache, project_info::ProjectInfoCacheImpl, Cached, TimedLru},
cache::{endpoints::EndpointsCache, project_info::ProjectInfoCacheImpl, Cached},
compute,
config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions},
context::RequestMonitoring,
@@ -326,7 +326,7 @@ impl NodeInfo {
}
}
pub type NodeInfoCache = TimedLru<EndpointCacheKey, NodeInfo>;
pub type NodeInfoCache = moka::future::Cache<EndpointCacheKey, NodeInfo>;
pub type CachedNodeInfo = Cached<&'static NodeInfoCache>;
pub type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option<AuthSecret>>;
pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPattern>>>;
@@ -412,7 +412,7 @@ impl Api for ConsoleBackend {
#[cfg(any(test, feature = "testing"))]
Postgres(api) => api.wake_compute(ctx, user_info).await,
#[cfg(test)]
Test(api) => api.wake_compute(),
Test(api) => api.wake_compute().await,
}
}
}
@@ -434,12 +434,11 @@ impl ApiCaches {
endpoint_cache_config: EndpointCacheConfig,
) -> Self {
Self {
node_info: NodeInfoCache::new(
"node_info_cache",
wake_compute_cache_config.size,
wake_compute_cache_config.ttl,
true,
),
node_info: moka::future::Cache::builder()
.max_capacity(wake_compute_cache_config.size)
.time_to_idle(wake_compute_cache_config.ttl)
.name("node_info_cache")
.build(),
project_info: Arc::new(ProjectInfoCacheImpl::new(project_info_cache_config)),
endpoints_cache: Arc::new(EndpointsCache::new(endpoint_cache_config)),
}

View File

@@ -275,10 +275,13 @@ impl super::Api for Api {
// for some time (highly depends on the console's scale-to-zero policy);
// The connection info remains the same during that period of time,
// which means that we might cache it to reduce the load and latency.
if let Some(cached) = self.caches.node_info.get(&key) {
if let Some(cached) = self.caches.node_info.get(&key).await {
info!(key = &*key, "found cached compute node info");
ctx.set_project(cached.aux.clone());
return Ok(cached);
return Ok(CachedNodeInfo {
token: Some((&self.caches.node_info, key)),
value: cached,
});
}
// check rate limit
@@ -294,10 +297,13 @@ impl super::Api for Api {
// after getting back a permit - it's possible the cache was filled
// double check
if permit.should_check_cache() {
if let Some(cached) = self.caches.node_info.get(&key) {
if let Some(cached) = self.caches.node_info.get(&key).await {
info!(key = &*key, "found cached compute node info");
ctx.set_project(cached.aux.clone());
return Ok(cached);
return Ok(CachedNodeInfo {
token: Some((&self.caches.node_info, key)),
value: cached,
});
}
}
@@ -308,12 +314,18 @@ impl super::Api for Api {
// store the cached node as 'warm'
node.aux.cold_start_info = ColdStartInfo::WarmCached;
let (_, mut cached) = self.caches.node_info.insert(key.clone(), node);
cached.aux.cold_start_info = cold_start_info;
self.caches
.node_info
.insert(key.clone(), node.clone())
.await;
node.aux.cold_start_info = cold_start_info;
info!(key = &*key, "created a cache entry for compute node info");
Ok(cached)
Ok(CachedNodeInfo {
token: Some((&self.caches.node_info, key)),
value: node,
})
}
}

View File

@@ -23,7 +23,7 @@ const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
/// (e.g. the compute node's address might've changed at the wrong time).
/// Invalidate the cache entry (if any) to prevent subsequent errors.
#[tracing::instrument(name = "invalidate_cache", skip_all)]
pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo {
pub async fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
@@ -34,7 +34,7 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo {
};
Metrics::get().proxy.connection_failures_total.inc(label);
node_info.invalidate()
node_info.invalidate().await
}
#[async_trait]
@@ -156,7 +156,7 @@ where
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
info!("compute node's state has likely changed; requesting a wake-up");
let old_node_info = invalidate_cache(node_info);
let old_node_info = invalidate_cache(node_info).await;
let mut node_info =
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
node_info.reuse_settings(old_node_info);

View File

@@ -405,12 +405,13 @@ impl TestConnectMechanism {
Self {
counter: Arc::new(std::sync::Mutex::new(0)),
sequence,
cache: Box::leak(Box::new(NodeInfoCache::new(
"test",
1,
Duration::from_secs(100),
false,
))),
cache: Box::leak(Box::new(
NodeInfoCache::builder()
.name("test")
.max_capacity(1)
.time_to_live(Duration::from_secs(100))
.build(),
)),
}
}
}
@@ -476,13 +477,17 @@ impl ConnectMechanism for TestConnectMechanism {
fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {}
}
#[async_trait]
impl TestBackend for TestConnectMechanism {
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
let mut counter = self.counter.lock().unwrap();
let action = self.sequence[*counter];
*counter += 1;
async fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
let action = {
let mut counter = self.counter.lock().unwrap();
let action = self.sequence[*counter];
*counter += 1;
action
};
match action {
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache).await),
ConnectAction::WakeFail => {
let err = console::errors::ApiError::Console {
status: http::StatusCode::FORBIDDEN,
@@ -514,7 +519,7 @@ impl TestBackend for TestConnectMechanism {
}
}
fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
async fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
let node = NodeInfo {
config: compute::ConnCfg::new(),
aux: MetricsAuxInfo {
@@ -525,8 +530,11 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
},
allow_self_signed_compute: false,
};
let (_, node) = cache.insert("key".into(), node);
node
cache.insert("key".into(), node.clone()).await;
CachedNodeInfo {
token: Some((cache, "key".into())),
value: node,
}
}
fn helper_create_connect_info(