diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index d55f9fa57b..e8ede07674 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -153,9 +153,8 @@ async fn build_cache_layer( .context(error::InitBackendSnafu)?; let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) - .await .context(error::InitBackendSnafu)?; - + cache_layer.recover_cache(false).await; info!( "Enabled local object storage cache, path: {}, capacity: {}.", path, cache_capacity diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 9aebcc48e9..9e5742ca04 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -20,7 +20,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use common_base::readable_size::ReadableSize; -use common_telemetry::{info, warn}; +use common_telemetry::{error, info, warn}; use futures::{FutureExt, TryStreamExt}; use moka::future::Cache; use moka::notification::RemovalCause; @@ -188,10 +188,8 @@ impl FileCache { } } - /// Recovers the index from local store. - pub(crate) async fn recover(&self) -> Result<()> { + async fn recover_inner(&self) -> Result<()> { let now = Instant::now(); - let mut lister = self .local_store .lister_with(FILE_DIR) @@ -225,10 +223,23 @@ impl FileCache { total_size, now.elapsed() ); - Ok(()) } + /// Recovers the index from local store. + pub(crate) async fn recover(self: &Arc, sync: bool) { + let moved_self = self.clone(); + let handle = tokio::spawn(async move { + if let Err(err) = moved_self.recover_inner().await { + error!(err; "Failed to recover file cache.") + } + }); + + if sync { + let _ = handle.await; + } + } + /// Returns the cache file path for the key. pub(crate) fn cache_file_path(&self, key: IndexKey) -> String { cache_file_path(FILE_DIR, key) @@ -536,13 +547,17 @@ mod tests { } // Recover the cache. - let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None); + let cache = Arc::new(FileCache::new( + local_store.clone(), + ReadableSize::mb(10), + None, + )); // No entry before recovery. assert!(cache .reader(IndexKey::new(region_id, file_ids[0], file_type)) .await .is_none()); - cache.recover().await.unwrap(); + cache.recover(true).await; // Check size. cache.memory_index.run_pending_tasks().await; diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 2bea36ad3d..4e2fe357fd 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -67,11 +67,11 @@ impl WriteCache { puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, ) -> Result { - let file_cache = FileCache::new(local_store, cache_capacity, ttl); - file_cache.recover().await?; + let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl)); + file_cache.recover(false).await; Ok(Self { - file_cache: Arc::new(file_cache), + file_cache, object_store_manager, puffin_manager_factory, intermediate_manager, diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 53c65d7d20..72e0e2bfbe 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -27,6 +27,7 @@ opendal = { version = "0.49", features = [ "services-s3", ] } prometheus.workspace = true +tokio.workspace = true uuid.workspace = true [dev-dependencies] diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 3fea6945e7..197a222162 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -21,7 +21,9 @@ use opendal::raw::{ }; use opendal::Result; mod read_cache; -use common_telemetry::info; +use std::time::Instant; + +use common_telemetry::{error, info}; use read_cache::ReadCache; /// An opendal layer with local LRU file cache supporting. @@ -39,19 +41,32 @@ impl Clone for LruCacheLayer { } impl LruCacheLayer { - /// Create a `[LruCacheLayer]` with local file cache and capacity in bytes. - pub async fn new(file_cache: Arc, capacity: usize) -> Result { + /// Create a [`LruCacheLayer`] with local file cache and capacity in bytes. + pub fn new(file_cache: Arc, capacity: usize) -> Result { let read_cache = ReadCache::new(file_cache, capacity); - let (entries, bytes) = read_cache.recover_cache().await?; - - info!( - "Recovered {} entries and total size {} in bytes for LruCacheLayer", - entries, bytes - ); - Ok(Self { read_cache }) } + /// Recovers cache + pub async fn recover_cache(&self, sync: bool) { + let now = Instant::now(); + let moved_read_cache = self.read_cache.clone(); + let handle = tokio::spawn(async move { + match moved_read_cache.recover_cache().await { + Ok((entries, bytes)) => info!( + "Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}", + entries, + bytes, + now.elapsed() + ), + Err(err) => error!(err; "Failed to recover file cache."), + } + }); + if sync { + let _ = handle.await; + } + } + /// Returns true when the local cache contains the specific file pub async fn contains_file(&self, path: &str) -> bool { self.read_cache.contains_file(path).await diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 2528dd10a9..497decffab 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -233,7 +233,9 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { .atomic_write_dir(&cache_dir.path().to_string_lossy()); let file_cache = Arc::new(builder.build().unwrap()); - LruCacheLayer::new(file_cache, 32).await.unwrap() + let cache_layer = LruCacheLayer::new(file_cache, 32).unwrap(); + cache_layer.recover_cache(true).await; + cache_layer }; let store = OperatorBuilder::new(store) @@ -308,7 +310,8 @@ async fn test_object_store_cache_policy() -> Result<()> { let cache_store = file_cache.clone(); // create operator for cache dir to verify cache file - let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap(); + let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).unwrap(); + cache_layer.recover_cache(true).await; let store = store.layer(cache_layer.clone()); // create several object handler. @@ -436,7 +439,8 @@ async fn test_object_store_cache_policy() -> Result<()> { drop(cache_layer); // Test recover - let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap(); + let cache_layer = LruCacheLayer::new(cache_store, 38).unwrap(); + cache_layer.recover_cache(true).await; // The p2 `NotFound` cache will not be recovered assert_eq!(cache_layer.read_cache_stat().await, (3, 34));