From b4d5393080342a240237de04225d749989e9b9d7 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 14 Feb 2025 01:13:24 +0800 Subject: [PATCH] feat: speed up read/write cache and stager eviction (#5531) * feat: change cache policy for file cache * feat: file cache run pending task after put * feat: run pending task in put_dir * feat: run pending task after stager recovered * feat: purge recycle bin periodically * feat: use lru policy for read cache --- src/mito2/src/cache/file_cache.rs | 12 ++- .../src/layers/lru_cache/read_cache.rs | 2 + .../puffin_manager/stager/bounded_stager.rs | 80 ++++++++++++------- 3 files changed, 62 insertions(+), 32 deletions(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index b64968ee54..c0ea9629fe 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -24,6 +24,7 @@ use common_telemetry::{error, info, warn}; use futures::{FutureExt, TryStreamExt}; use moka::future::Cache; use moka::notification::RemovalCause; +use moka::policy::EvictionPolicy; use object_store::util::join_path; use object_store::{ErrorKind, ObjectStore, Reader}; use parquet::file::metadata::ParquetMetaData; @@ -65,6 +66,7 @@ impl FileCache { ) -> FileCache { let cache_store = local_store.clone(); let mut builder = Cache::builder() + .eviction_policy(EvictionPolicy::lru()) .weigher(|_key, value: &IndexValue| -> u32 { // We only measure space on local store. value.file_size @@ -112,6 +114,9 @@ impl FileCache { .with_label_values(&[FILE_TYPE]) .add(value.file_size.into()); self.memory_index.insert(key, value).await; + + // Since files are large items, we run the pending tasks immediately. + self.memory_index.run_pending_tasks().await; } pub(crate) async fn get(&self, key: IndexKey) -> Option { @@ -224,10 +229,15 @@ impl FileCache { // The metrics is a signed int gauge so we can updates it finally. CACHE_BYTES.with_label_values(&[FILE_TYPE]).add(total_size); + // Run all pending tasks of the moka cache so that the cache size is updated + // and the eviction policy is applied. + self.memory_index.run_pending_tasks().await; + info!( - "Recovered file cache, num_keys: {}, num_bytes: {}, cost: {:?}", + "Recovered file cache, num_keys: {}, num_bytes: {}, total weight: {}, cost: {:?}", total_keys, total_size, + self.memory_index.weighted_size(), now.elapsed() ); Ok(()) diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 0f427c97ef..741cd5cc74 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -18,6 +18,7 @@ use common_telemetry::debug; use futures::{FutureExt, TryStreamExt}; use moka::future::Cache; use moka::notification::ListenerFuture; +use moka::policy::EvictionPolicy; use opendal::raw::oio::{Read, Reader, Write}; use opendal::raw::{oio, Access, OpDelete, OpRead, OpStat, OpWrite, RpRead}; use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result}; @@ -120,6 +121,7 @@ impl ReadCache { file_cache, mem_cache: Cache::builder() .max_capacity(capacity as u64) + .eviction_policy(EvictionPolicy::lru()) .weigher(|_key, value: &ReadResult| -> u32 { // TODO(dennis): add key's length to weight? value.size_bytes() diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index d74c9f8213..fd5fc74876 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -44,6 +44,7 @@ use crate::puffin_manager::{BlobGuard, DirGuard}; const DELETE_QUEUE_SIZE: usize = 10240; const TMP_EXTENSION: &str = "tmp"; const DELETED_EXTENSION: &str = "deleted"; +const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60); /// `BoundedStager` is a `Stager` that uses `moka` to manage staging area. pub struct BoundedStager { @@ -74,9 +75,7 @@ impl BoundedStager { .await .context(CreateSnafu)?; - let recycle_bin = Cache::builder() - .time_to_live(Duration::from_secs(60)) - .build(); + let recycle_bin = Cache::builder().time_to_live(RECYCLE_BIN_TTL).build(); let recycle_bin_cloned = recycle_bin.clone(); let cache = Cache::builder() @@ -93,7 +92,7 @@ impl BoundedStager { .build(); let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE); - common_runtime::global_runtime().spawn(Self::delete_routine(rx)); + common_runtime::global_runtime().spawn(Self::delete_routine(rx, recycle_bin.clone())); let stager = Self { cache, @@ -211,7 +210,15 @@ impl Stager for BoundedStager { }) .await .map(|_| ()) - .context(CacheGetSnafu) + .context(CacheGetSnafu)?; + + // Dir is usually large. + // Runs pending tasks of the cache and recycle bin to free up space + // more quickly. + self.cache.run_pending_tasks().await; + self.recycle_bin.run_pending_tasks().await; + + Ok(()) } } @@ -327,6 +334,7 @@ impl BoundedStager { for (key, value) in elems { self.cache.insert(key, value).await; } + self.cache.run_pending_tasks().await; Ok(()) } @@ -349,38 +357,48 @@ impl BoundedStager { Ok(size) } - async fn delete_routine(mut receiver: Receiver) { - while let Some(task) = receiver.recv().await { - match task { - DeleteTask::File(path) => { - if let Err(err) = fs::remove_file(&path).await { - if err.kind() == std::io::ErrorKind::NotFound { - continue; - } + async fn delete_routine( + mut receiver: Receiver, + recycle_bin: Cache, + ) { + loop { + match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await { + Ok(Some(task)) => match task { + DeleteTask::File(path) => { + if let Err(err) = fs::remove_file(&path).await { + if err.kind() == std::io::ErrorKind::NotFound { + continue; + } - warn!(err; "Failed to remove the file."); + warn!(err; "Failed to remove the file."); + } } - } - DeleteTask::Dir(path) => { - let deleted_path = path.with_extension(DELETED_EXTENSION); - if let Err(err) = fs::rename(&path, &deleted_path).await { - if err.kind() == std::io::ErrorKind::NotFound { - continue; - } - - // Remove the deleted directory if the rename fails and retry - let _ = fs::remove_dir_all(&deleted_path).await; + DeleteTask::Dir(path) => { + let deleted_path = path.with_extension(DELETED_EXTENSION); if let Err(err) = fs::rename(&path, &deleted_path).await { - warn!(err; "Failed to rename the dangling directory to deleted path."); - continue; + if err.kind() == std::io::ErrorKind::NotFound { + continue; + } + + // Remove the deleted directory if the rename fails and retry + let _ = fs::remove_dir_all(&deleted_path).await; + if let Err(err) = fs::rename(&path, &deleted_path).await { + warn!(err; "Failed to rename the dangling directory to deleted path."); + continue; + } + } + if let Err(err) = fs::remove_dir_all(&deleted_path).await { + warn!(err; "Failed to remove the dangling directory."); } } - if let Err(err) = fs::remove_dir_all(&deleted_path).await { - warn!(err; "Failed to remove the dangling directory."); + DeleteTask::Terminate => { + break; } - } - DeleteTask::Terminate => { - break; + }, + Ok(None) => break, + Err(_) => { + // Purge recycle bin periodically to reclaim the space quickly. + recycle_bin.run_pending_tasks().await; } } }