From ef3a9dfafae1f75c76d1697351e04b4a7b6daf5e Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 7 May 2024 07:59:23 +0100 Subject: [PATCH] proxy: moka cache --- Cargo.lock | 80 ++++++++- proxy/Cargo.toml | 1 + proxy/src/auth/backend.rs | 6 +- proxy/src/cache.rs | 2 - proxy/src/cache/common.rs | 26 ++- proxy/src/cache/project_info.rs | 4 +- proxy/src/cache/timed_lru.rs | 258 ----------------------------- proxy/src/config.rs | 2 +- proxy/src/console/provider.rs | 17 +- proxy/src/console/provider/neon.rs | 26 ++- proxy/src/proxy/connect_compute.rs | 6 +- proxy/src/proxy/tests.rs | 36 ++-- 12 files changed, 157 insertions(+), 307 deletions(-) delete mode 100644 proxy/src/cache/timed_lru.rs diff --git a/Cargo.lock b/Cargo.lock index 8438dad41b..4e0845f5de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,9 +213,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "3.2.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7125e42787d53db9dd54261812ef17e937c95a51e4d291373b670342fa44310c" +checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" dependencies = [ "event-listener 4.0.0", "event-listener-strategy", @@ -1239,9 +1239,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.3.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ "crossbeam-utils", ] @@ -1875,6 +1875,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-listener" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d9944b8ca13534cdfb2800775f8dd4902ff3fc75a50101466decadfdf322a24" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + [[package]] name = "event-listener-strategy" version = "0.4.0" @@ -3121,6 +3132,30 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moka" +version = "0.12.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e0d88686dc561d743b40de8269b26eaf0dc58781bde087b0984646602021d08" +dependencies = [ + "async-lock", + "async-trait", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "event-listener 5.3.0", + "futures-util", + "once_cell", + "parking_lot 0.12.1", + "quanta", + "rustc_version", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multimap" version = "0.8.3" @@ -4377,6 +4412,7 @@ dependencies = [ "md5", "measured", "metrics", + "moka", "native-tls", "once_cell", "opentelemetry", @@ -4438,6 +4474,21 @@ dependencies = [ "x509-parser", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.31.0" @@ -4549,6 +4600,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "raw-cpuid" +version = "11.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e29830cbb1290e404f24c73af91c5d8d631ce7e128691e9477556b540cd01ecd" +dependencies = [ + "bitflags 2.4.1", +] + [[package]] name = "rayon" version = "1.7.0" @@ -5990,6 +6050,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tar" version = "0.4.40" @@ -6641,6 +6707,12 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "try-lock" version = "0.2.4" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 0e8d03906b..dbdefd0da4 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -101,6 +101,7 @@ postgres-protocol.workspace = true redis.workspace = true workspace_hack.workspace = true +moka = { version = "0.12.7", features = ["future"] } [dev-dependencies] camino-tempfile.workspace = true diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index 3795e3b608..dd80f502eb 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -69,8 +69,10 @@ pub enum BackendType<'a, T, D> { Link(MaybeOwned<'a, url::ApiUrl>, D), } +#[cfg(test)] +#[async_trait::async_trait] pub trait TestBackend: Send + Sync + 'static { - fn wake_compute(&self) -> Result; + async fn wake_compute(&self) -> Result; fn get_allowed_ips_and_secret( &self, ) -> Result<(CachedAllowedIps, Option), console::errors::GetAuthInfoError>; @@ -343,7 +345,7 @@ async fn auth_quirks( Err(e) => { if e.is_auth_failed() { // The password could have been changed, so we invalidate the cache. - cached_entry.invalidate(); + cached_entry.invalidate().await; } Err(e) } diff --git a/proxy/src/cache.rs b/proxy/src/cache.rs index d1d4087241..28e66c0a6a 100644 --- a/proxy/src/cache.rs +++ b/proxy/src/cache.rs @@ -1,7 +1,5 @@ pub mod common; pub mod endpoints; pub mod project_info; -mod timed_lru; pub use common::{Cache, Cached}; -pub use timed_lru::TimedLru; diff --git a/proxy/src/cache/common.rs b/proxy/src/cache/common.rs index bc1c37512b..f228e7c93f 100644 --- a/proxy/src/cache/common.rs +++ b/proxy/src/cache/common.rs @@ -3,6 +3,7 @@ use std::ops::{Deref, DerefMut}; /// A generic trait which exposes types of cache's key and value, /// as well as the notion of cache entry invalidation. /// This is useful for [`Cached`]. +#[allow(async_fn_in_trait)] pub trait Cache { /// Entry's key. type Key; @@ -15,7 +16,7 @@ pub trait Cache { /// Invalidate an entry using a lookup info. /// We don't have an empty default impl because it's error-prone. - fn invalidate(&self, _: &Self::LookupInfo); + async fn invalidate(&self, _: &Self::LookupInfo); } impl Cache for &C { @@ -23,8 +24,8 @@ impl Cache for &C { type Value = C::Value; type LookupInfo = C::LookupInfo; - fn invalidate(&self, info: &Self::LookupInfo) { - C::invalidate(self, info) + async fn invalidate(&self, info: &Self::LookupInfo) { + C::invalidate(self, info).await } } @@ -54,9 +55,9 @@ impl Cached { } /// Drop this entry from a cache if it's still there. - pub fn invalidate(self) -> V { + pub async fn invalidate(self) -> V { if let Some((cache, info)) = &self.token { - cache.invalidate(info); + cache.invalidate(info).await; } self.value } @@ -80,3 +81,18 @@ impl DerefMut for Cached { &mut self.value } } + +impl Cache for moka::future::Cache +where + K: std::hash::Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: std::hash::BuildHasher + Clone + Send + Sync + 'static, +{ + type Key = K; + type Value = V; + type LookupInfo = Key; + + async fn invalidate(&self, key: &Self::LookupInfo) { + moka::future::Cache::invalidate(self, key).await + } +} diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 10cc4ceee1..4820c7ec74 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -352,7 +352,7 @@ impl Cache for ProjectInfoCacheImpl { type LookupInfo = CachedLookupInfo; - fn invalidate(&self, key: &Self::LookupInfo) { + async fn invalidate(&self, key: &Self::LookupInfo) { match &key.lookup_type { LookupType::RoleSecret(role_name) => { if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) { @@ -489,7 +489,7 @@ mod tests { assert!(!cached.cached()); assert_eq!(cached.value, secret1); - cached.invalidate(); // Shouldn't do anything. + cached.invalidate().await; // Shouldn't do anything. let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap(); assert_eq!(cached.value, secret1); diff --git a/proxy/src/cache/timed_lru.rs b/proxy/src/cache/timed_lru.rs deleted file mode 100644 index 3b21381bb9..0000000000 --- a/proxy/src/cache/timed_lru.rs +++ /dev/null @@ -1,258 +0,0 @@ -use std::{ - borrow::Borrow, - hash::Hash, - time::{Duration, Instant}, -}; -use tracing::debug; - -// This seems to make more sense than `lru` or `cached`: -// -// * `near/nearcore` ditched `cached` in favor of `lru` -// (https://github.com/near/nearcore/issues?q=is%3Aissue+lru+is%3Aclosed). -// -// * `lru` methods use an obscure `KeyRef` type in their contraints (which is deliberately excluded from docs). -// This severely hinders its usage both in terms of creating wrappers and supported key types. -// -// On the other hand, `hashlink` has good download stats and appears to be maintained. -use hashlink::{linked_hash_map::RawEntryMut, LruCache}; - -use super::{common::Cached, *}; - -/// An implementation of timed LRU cache with fixed capacity. -/// Key properties: -/// -/// * Whenever a new entry is inserted, the least recently accessed one is evicted. -/// The cache also keeps track of entry's insertion time (`created_at`) and TTL (`expires_at`). -/// -/// * If `update_ttl_on_retrieval` is `true`. When the entry is about to be retrieved, we check its expiration timestamp. -/// If the entry has expired, we remove it from the cache; Otherwise we bump the -/// expiration timestamp (e.g. +5mins) and change its place in LRU list to prolong -/// its existence. -/// -/// * There's an API for immediate invalidation (removal) of a cache entry; -/// It's useful in case we know for sure that the entry is no longer correct. -/// See [`timed_lru::LookupInfo`] & [`timed_lru::Cached`] for more information. -/// -/// * Expired entries are kept in the cache, until they are evicted by the LRU policy, -/// or by a successful lookup (i.e. the entry hasn't expired yet). -/// There is no background job to reap the expired records. -/// -/// * It's possible for an entry that has not yet expired entry to be evicted -/// before expired items. That's a bit wasteful, but probably fine in practice. -pub struct TimedLru { - /// Cache's name for tracing. - name: &'static str, - - /// The underlying cache implementation. - cache: parking_lot::Mutex>>, - - /// Default time-to-live of a single entry. - ttl: Duration, - - update_ttl_on_retrieval: bool, -} - -impl Cache for TimedLru { - type Key = K; - type Value = V; - type LookupInfo = LookupInfo; - - fn invalidate(&self, info: &Self::LookupInfo) { - self.invalidate_raw(info) - } -} - -struct Entry { - created_at: Instant, - expires_at: Instant, - value: T, -} - -impl TimedLru { - /// Construct a new LRU cache with timed entries. - pub fn new( - name: &'static str, - capacity: usize, - ttl: Duration, - update_ttl_on_retrieval: bool, - ) -> Self { - Self { - name, - cache: LruCache::new(capacity).into(), - ttl, - update_ttl_on_retrieval, - } - } - - /// Drop an entry from the cache if it's outdated. - #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)] - fn invalidate_raw(&self, info: &LookupInfo) { - let now = Instant::now(); - - // Do costly things before taking the lock. - let mut cache = self.cache.lock(); - let raw_entry = match cache.raw_entry_mut().from_key(&info.key) { - RawEntryMut::Vacant(_) => return, - RawEntryMut::Occupied(x) => x, - }; - - // Remove the entry if it was created prior to lookup timestamp. - let entry = raw_entry.get(); - let (created_at, expires_at) = (entry.created_at, entry.expires_at); - let should_remove = created_at <= info.created_at || expires_at <= now; - - if should_remove { - raw_entry.remove(); - } - - drop(cache); // drop lock before logging - debug!( - created_at = format_args!("{created_at:?}"), - expires_at = format_args!("{expires_at:?}"), - entry_removed = should_remove, - "processed a cache entry invalidation event" - ); - } - - /// Try retrieving an entry by its key, then execute `extract` if it exists. - #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)] - fn get_raw(&self, key: &Q, extract: impl FnOnce(&K, &Entry) -> R) -> Option - where - K: Borrow, - Q: Hash + Eq + ?Sized, - { - let now = Instant::now(); - let deadline = now.checked_add(self.ttl).expect("time overflow"); - - // Do costly things before taking the lock. - let mut cache = self.cache.lock(); - let mut raw_entry = match cache.raw_entry_mut().from_key(key) { - RawEntryMut::Vacant(_) => return None, - RawEntryMut::Occupied(x) => x, - }; - - // Immeditely drop the entry if it has expired. - let entry = raw_entry.get(); - if entry.expires_at <= now { - raw_entry.remove(); - return None; - } - - let value = extract(raw_entry.key(), entry); - let (created_at, expires_at) = (entry.created_at, entry.expires_at); - - // Update the deadline and the entry's position in the LRU list. - if self.update_ttl_on_retrieval { - raw_entry.get_mut().expires_at = deadline; - } - raw_entry.to_back(); - - drop(cache); // drop lock before logging - debug!( - created_at = format_args!("{created_at:?}"), - old_expires_at = format_args!("{expires_at:?}"), - new_expires_at = format_args!("{deadline:?}"), - "accessed a cache entry" - ); - - Some(value) - } - - /// Insert an entry to the cache. If an entry with the same key already - /// existed, return the previous value and its creation timestamp. - #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)] - fn insert_raw(&self, key: K, value: V) -> (Instant, Option) { - let created_at = Instant::now(); - let expires_at = created_at.checked_add(self.ttl).expect("time overflow"); - - let entry = Entry { - created_at, - expires_at, - value, - }; - - // Do costly things before taking the lock. - let old = self - .cache - .lock() - .insert(key, entry) - .map(|entry| entry.value); - - debug!( - created_at = format_args!("{created_at:?}"), - expires_at = format_args!("{expires_at:?}"), - replaced = old.is_some(), - "created a cache entry" - ); - - (created_at, old) - } -} - -impl TimedLru { - pub fn insert(&self, key: K, value: V) -> (Option, Cached<&Self>) { - let (created_at, old) = self.insert_raw(key.clone(), value.clone()); - - let cached = Cached { - token: Some((self, LookupInfo { created_at, key })), - value, - }; - - (old, cached) - } -} - -impl TimedLru { - /// Retrieve a cached entry in convenient wrapper. - pub fn get(&self, key: &Q) -> Option> - where - K: Borrow + Clone, - Q: Hash + Eq + ?Sized, - { - self.get_raw(key, |key, entry| { - let info = LookupInfo { - created_at: entry.created_at, - key: key.clone(), - }; - - Cached { - token: Some((self, info)), - value: entry.value.clone(), - } - }) - } - - /// Retrieve a cached entry in convenient wrapper, ignoring its TTL. - pub fn get_ignoring_ttl(&self, key: &Q) -> Option> - where - K: Borrow, - Q: Hash + Eq + ?Sized, - { - let mut cache = self.cache.lock(); - cache - .get(key) - .map(|entry| Cached::new_uncached(entry.value.clone())) - } - - /// Remove an entry from the cache. - pub fn remove(&self, key: &Q) -> Option - where - K: Borrow + Clone, - Q: Hash + Eq + ?Sized, - { - let mut cache = self.cache.lock(); - cache.remove(key).map(|entry| entry.value) - } -} - -/// Lookup information for key invalidation. -pub struct LookupInfo { - /// Time of creation of a cache [`Entry`]. - /// We use this during invalidation lookups to prevent eviction of a newer - /// entry sharing the same key (it might've been inserted by a different - /// task after we got the entry we're trying to invalidate now). - created_at: Instant, - - /// Search by this key. - key: K, -} diff --git a/proxy/src/config.rs b/proxy/src/config.rs index e090407756..92ac71f853 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -411,7 +411,7 @@ pub fn remote_storage_from_toml(s: &str) -> anyhow::Result; +pub type NodeInfoCache = moka::future::Cache; pub type CachedNodeInfo = Cached<&'static NodeInfoCache>; pub type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option>; pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc>>; @@ -412,7 +412,7 @@ impl Api for ConsoleBackend { #[cfg(any(test, feature = "testing"))] Postgres(api) => api.wake_compute(ctx, user_info).await, #[cfg(test)] - Test(api) => api.wake_compute(), + Test(api) => api.wake_compute().await, } } } @@ -434,12 +434,11 @@ impl ApiCaches { endpoint_cache_config: EndpointCacheConfig, ) -> Self { Self { - node_info: NodeInfoCache::new( - "node_info_cache", - wake_compute_cache_config.size, - wake_compute_cache_config.ttl, - true, - ), + node_info: moka::future::Cache::builder() + .max_capacity(wake_compute_cache_config.size) + .time_to_idle(wake_compute_cache_config.ttl) + .name("node_info_cache") + .build(), project_info: Arc::new(ProjectInfoCacheImpl::new(project_info_cache_config)), endpoints_cache: Arc::new(EndpointsCache::new(endpoint_cache_config)), } diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index ec66641d01..5ebea76728 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -275,10 +275,13 @@ impl super::Api for Api { // for some time (highly depends on the console's scale-to-zero policy); // The connection info remains the same during that period of time, // which means that we might cache it to reduce the load and latency. - if let Some(cached) = self.caches.node_info.get(&key) { + if let Some(cached) = self.caches.node_info.get(&key).await { info!(key = &*key, "found cached compute node info"); ctx.set_project(cached.aux.clone()); - return Ok(cached); + return Ok(CachedNodeInfo { + token: Some((&self.caches.node_info, key)), + value: cached, + }); } // check rate limit @@ -294,10 +297,13 @@ impl super::Api for Api { // after getting back a permit - it's possible the cache was filled // double check if permit.should_check_cache() { - if let Some(cached) = self.caches.node_info.get(&key) { + if let Some(cached) = self.caches.node_info.get(&key).await { info!(key = &*key, "found cached compute node info"); ctx.set_project(cached.aux.clone()); - return Ok(cached); + return Ok(CachedNodeInfo { + token: Some((&self.caches.node_info, key)), + value: cached, + }); } } @@ -308,12 +314,18 @@ impl super::Api for Api { // store the cached node as 'warm' node.aux.cold_start_info = ColdStartInfo::WarmCached; - let (_, mut cached) = self.caches.node_info.insert(key.clone(), node); - cached.aux.cold_start_info = cold_start_info; + self.caches + .node_info + .insert(key.clone(), node.clone()) + .await; + node.aux.cold_start_info = cold_start_info; info!(key = &*key, "created a cache entry for compute node info"); - Ok(cached) + Ok(CachedNodeInfo { + token: Some((&self.caches.node_info, key)), + value: node, + }) } } diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index c8528d0296..a4a4e2e2ba 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -23,7 +23,7 @@ const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2); /// (e.g. the compute node's address might've changed at the wrong time). /// Invalidate the cache entry (if any) to prevent subsequent errors. #[tracing::instrument(name = "invalidate_cache", skip_all)] -pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo { +pub async fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo { let is_cached = node_info.cached(); if is_cached { warn!("invalidating stalled compute node info cache entry"); @@ -34,7 +34,7 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo { }; Metrics::get().proxy.connection_failures_total.inc(label); - node_info.invalidate() + node_info.invalidate().await } #[async_trait] @@ -156,7 +156,7 @@ where } else { // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node info!("compute node's state has likely changed; requesting a wake-up"); - let old_node_info = invalidate_cache(node_info); + let old_node_info = invalidate_cache(node_info).await; let mut node_info = wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; node_info.reuse_settings(old_node_info); diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index ad48af0093..0424dceb27 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -405,12 +405,13 @@ impl TestConnectMechanism { Self { counter: Arc::new(std::sync::Mutex::new(0)), sequence, - cache: Box::leak(Box::new(NodeInfoCache::new( - "test", - 1, - Duration::from_secs(100), - false, - ))), + cache: Box::leak(Box::new( + NodeInfoCache::builder() + .name("test") + .max_capacity(1) + .time_to_live(Duration::from_secs(100)) + .build(), + )), } } } @@ -476,13 +477,17 @@ impl ConnectMechanism for TestConnectMechanism { fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {} } +#[async_trait] impl TestBackend for TestConnectMechanism { - fn wake_compute(&self) -> Result { - let mut counter = self.counter.lock().unwrap(); - let action = self.sequence[*counter]; - *counter += 1; + async fn wake_compute(&self) -> Result { + let action = { + let mut counter = self.counter.lock().unwrap(); + let action = self.sequence[*counter]; + *counter += 1; + action + }; match action { - ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)), + ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache).await), ConnectAction::WakeFail => { let err = console::errors::ApiError::Console { status: http::StatusCode::FORBIDDEN, @@ -514,7 +519,7 @@ impl TestBackend for TestConnectMechanism { } } -fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo { +async fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo { let node = NodeInfo { config: compute::ConnCfg::new(), aux: MetricsAuxInfo { @@ -525,8 +530,11 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn }, allow_self_signed_compute: false, }; - let (_, node) = cache.insert("key".into(), node); - node + cache.insert("key".into(), node.clone()).await; + CachedNodeInfo { + token: Some((cache, "key".into())), + value: node, + } } fn helper_create_connect_info(