mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 14:22:58 +00:00
feat: recover file cache index asynchronously (#5087)
* feat: recover file cache index asynchronously * chore: apply suggestions from CR
This commit is contained in:
@@ -153,9 +153,8 @@ async fn build_cache_layer(
|
||||
.context(error::InitBackendSnafu)?;
|
||||
|
||||
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
|
||||
.await
|
||||
.context(error::InitBackendSnafu)?;
|
||||
|
||||
cache_layer.recover_cache(false).await;
|
||||
info!(
|
||||
"Enabled local object storage cache, path: {}, capacity: {}.",
|
||||
path, cache_capacity
|
||||
|
||||
29
src/mito2/src/cache/file_cache.rs
vendored
29
src/mito2/src/cache/file_cache.rs
vendored
@@ -20,7 +20,7 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use bytes::Bytes;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::{info, warn};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use futures::{FutureExt, TryStreamExt};
|
||||
use moka::future::Cache;
|
||||
use moka::notification::RemovalCause;
|
||||
@@ -188,10 +188,8 @@ impl FileCache {
|
||||
}
|
||||
}
|
||||
|
||||
/// Recovers the index from local store.
|
||||
pub(crate) async fn recover(&self) -> Result<()> {
|
||||
async fn recover_inner(&self) -> Result<()> {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut lister = self
|
||||
.local_store
|
||||
.lister_with(FILE_DIR)
|
||||
@@ -225,10 +223,23 @@ impl FileCache {
|
||||
total_size,
|
||||
now.elapsed()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Recovers the index from local store.
|
||||
pub(crate) async fn recover(self: &Arc<Self>, sync: bool) {
|
||||
let moved_self = self.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
if let Err(err) = moved_self.recover_inner().await {
|
||||
error!(err; "Failed to recover file cache.")
|
||||
}
|
||||
});
|
||||
|
||||
if sync {
|
||||
let _ = handle.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the cache file path for the key.
|
||||
pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
|
||||
cache_file_path(FILE_DIR, key)
|
||||
@@ -536,13 +547,17 @@ mod tests {
|
||||
}
|
||||
|
||||
// Recover the cache.
|
||||
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
|
||||
let cache = Arc::new(FileCache::new(
|
||||
local_store.clone(),
|
||||
ReadableSize::mb(10),
|
||||
None,
|
||||
));
|
||||
// No entry before recovery.
|
||||
assert!(cache
|
||||
.reader(IndexKey::new(region_id, file_ids[0], file_type))
|
||||
.await
|
||||
.is_none());
|
||||
cache.recover().await.unwrap();
|
||||
cache.recover(true).await;
|
||||
|
||||
// Check size.
|
||||
cache.memory_index.run_pending_tasks().await;
|
||||
|
||||
6
src/mito2/src/cache/write_cache.rs
vendored
6
src/mito2/src/cache/write_cache.rs
vendored
@@ -67,11 +67,11 @@ impl WriteCache {
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
intermediate_manager: IntermediateManager,
|
||||
) -> Result<Self> {
|
||||
let file_cache = FileCache::new(local_store, cache_capacity, ttl);
|
||||
file_cache.recover().await?;
|
||||
let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
|
||||
file_cache.recover(false).await;
|
||||
|
||||
Ok(Self {
|
||||
file_cache: Arc::new(file_cache),
|
||||
file_cache,
|
||||
object_store_manager,
|
||||
puffin_manager_factory,
|
||||
intermediate_manager,
|
||||
|
||||
@@ -27,6 +27,7 @@ opendal = { version = "0.49", features = [
|
||||
"services-s3",
|
||||
] }
|
||||
prometheus.workspace = true
|
||||
tokio.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -21,7 +21,9 @@ use opendal::raw::{
|
||||
};
|
||||
use opendal::Result;
|
||||
mod read_cache;
|
||||
use common_telemetry::info;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::{error, info};
|
||||
use read_cache::ReadCache;
|
||||
|
||||
/// An opendal layer with local LRU file cache supporting.
|
||||
@@ -39,19 +41,32 @@ impl<C: Access> Clone for LruCacheLayer<C> {
|
||||
}
|
||||
|
||||
impl<C: Access> LruCacheLayer<C> {
|
||||
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
|
||||
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
|
||||
/// Create a [`LruCacheLayer`] with local file cache and capacity in bytes.
|
||||
pub fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
|
||||
let read_cache = ReadCache::new(file_cache, capacity);
|
||||
let (entries, bytes) = read_cache.recover_cache().await?;
|
||||
|
||||
info!(
|
||||
"Recovered {} entries and total size {} in bytes for LruCacheLayer",
|
||||
entries, bytes
|
||||
);
|
||||
|
||||
Ok(Self { read_cache })
|
||||
}
|
||||
|
||||
/// Recovers cache
|
||||
pub async fn recover_cache(&self, sync: bool) {
|
||||
let now = Instant::now();
|
||||
let moved_read_cache = self.read_cache.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
match moved_read_cache.recover_cache().await {
|
||||
Ok((entries, bytes)) => info!(
|
||||
"Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
|
||||
entries,
|
||||
bytes,
|
||||
now.elapsed()
|
||||
),
|
||||
Err(err) => error!(err; "Failed to recover file cache."),
|
||||
}
|
||||
});
|
||||
if sync {
|
||||
let _ = handle.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true when the local cache contains the specific file
|
||||
pub async fn contains_file(&self, path: &str) -> bool {
|
||||
self.read_cache.contains_file(path).await
|
||||
|
||||
@@ -233,7 +233,9 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
|
||||
.atomic_write_dir(&cache_dir.path().to_string_lossy());
|
||||
let file_cache = Arc::new(builder.build().unwrap());
|
||||
|
||||
LruCacheLayer::new(file_cache, 32).await.unwrap()
|
||||
let cache_layer = LruCacheLayer::new(file_cache, 32).unwrap();
|
||||
cache_layer.recover_cache(true).await;
|
||||
cache_layer
|
||||
};
|
||||
|
||||
let store = OperatorBuilder::new(store)
|
||||
@@ -308,7 +310,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
||||
let cache_store = file_cache.clone();
|
||||
|
||||
// create operator for cache dir to verify cache file
|
||||
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap();
|
||||
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).unwrap();
|
||||
cache_layer.recover_cache(true).await;
|
||||
let store = store.layer(cache_layer.clone());
|
||||
|
||||
// create several object handler.
|
||||
@@ -436,7 +439,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
||||
|
||||
drop(cache_layer);
|
||||
// Test recover
|
||||
let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap();
|
||||
let cache_layer = LruCacheLayer::new(cache_store, 38).unwrap();
|
||||
cache_layer.recover_cache(true).await;
|
||||
|
||||
// The p2 `NotFound` cache will not be recovered
|
||||
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
|
||||
|
||||
Reference in New Issue
Block a user