From 858dae7b23e08a67cd7a260ecb9c203e536a1c44 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 14 Feb 2025 15:49:26 +0800 Subject: [PATCH] feat: add stager nofitier to collect metrics (#5530) * feat: add stager nofitier to collect metrics Signed-off-by: Zhenchi * apply prev commit Signed-off-by: Zhenchi * remove dup size Signed-off-by: Zhenchi * add load cost Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/index/src/fulltext_index/tests.rs | 2 +- src/mito2/src/sst/index/puffin_manager.rs | 2 +- src/puffin/src/puffin_manager/stager.rs | 38 ++ .../puffin_manager/stager/bounded_stager.rs | 434 +++++++++++++++--- src/puffin/src/puffin_manager/tests.rs | 2 +- 5 files changed, 422 insertions(+), 56 deletions(-) diff --git a/src/index/src/fulltext_index/tests.rs b/src/index/src/fulltext_index/tests.rs index 90449f9dde..f0c0649575 100644 --- a/src/index/src/fulltext_index/tests.rs +++ b/src/index/src/fulltext_index/tests.rs @@ -30,7 +30,7 @@ async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc) { let path = staging_dir.path().to_path_buf(); ( staging_dir, - Arc::new(BoundedStager::new(path, 102400).await.unwrap()), + Arc::new(BoundedStager::new(path, 102400, None).await.unwrap()), ) } diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 498ed254f2..4b8b6a3dcb 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -63,7 +63,7 @@ impl PuffinManagerFactory { write_buffer_size: Option, ) -> Result { let staging_dir = aux_path.as_ref().join(STAGING_DIR); - let stager = BoundedStager::new(staging_dir, staging_capacity) + let stager = BoundedStager::new(staging_dir, staging_capacity, None) .await .context(PuffinInitStagerSnafu)?; Ok(Self { diff --git a/src/puffin/src/puffin_manager/stager.rs b/src/puffin/src/puffin_manager/stager.rs index 6e1581cddb..5dc2cb31fc 100644 --- a/src/puffin/src/puffin_manager/stager.rs +++ b/src/puffin/src/puffin_manager/stager.rs @@ -15,6 +15,7 @@ mod bounded_stager; use std::path::PathBuf; +use std::time::Duration; use async_trait::async_trait; pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard}; @@ -88,3 +89,40 @@ pub trait Stager: Send + Sync { dir_size: u64, ) -> Result<()>; } + +/// `StagerNotifier` provides a way to notify the caller of the staging events. +pub trait StagerNotifier: Send + Sync { + /// Notifies the caller that a cache hit occurred. + /// `size` is the size of the content that was hit in the cache. + fn on_cache_hit(&self, size: u64); + + /// Notifies the caller that a cache miss occurred. + /// `size` is the size of the content that was missed in the cache. + fn on_cache_miss(&self, size: u64); + + /// Notifies the caller that a blob or directory was inserted into the cache. + /// `size` is the size of the content that was inserted into the cache. + /// + /// Note: not only cache misses will trigger this event, but recoveries and recycles as well. + fn on_cache_insert(&self, size: u64); + + /// Notifies the caller that a directory was inserted into the cache. + /// `duration` is the time it took to load the directory. + fn on_load_dir(&self, duration: Duration); + + /// Notifies the caller that a blob was inserted into the cache. + /// `duration` is the time it took to load the blob. + fn on_load_blob(&self, duration: Duration); + + /// Notifies the caller that a blob or directory was evicted from the cache. + /// `size` is the size of the content that was evicted from the cache. + fn on_cache_evict(&self, size: u64); + + /// Notifies the caller that a blob or directory was dropped to the recycle bin. + /// `size` is the size of the content that was dropped to the recycle bin. + fn on_recycle_insert(&self, size: u64); + + /// Notifies the caller that the recycle bin was cleared. + /// `size` is the size of the content that was cleared from the recycle bin. + fn on_recycle_clear(&self, size: u64); +} diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index fd5fc74876..46ea2548ad 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use async_walkdir::{Filtering, WalkDir}; @@ -34,6 +34,7 @@ use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{Receiver, Sender}; use tokio_util::compat::TokioAsyncWriteCompatExt; +use super::StagerNotifier; use crate::error::{ CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu, Result, WalkDirSnafu, @@ -67,10 +68,17 @@ pub struct BoundedStager { /// 3. sent the delete task to the delete queue on drop /// 4. background routine removes the file or directory delete_queue: Sender, + + /// Notifier for the stager. + notifier: Option>, } impl BoundedStager { - pub async fn new(base_dir: PathBuf, capacity: u64) -> Result { + pub async fn new( + base_dir: PathBuf, + capacity: u64, + notifier: Option>, + ) -> Result { tokio::fs::create_dir_all(&base_dir) .await .context(CreateSnafu)?; @@ -78,12 +86,17 @@ impl BoundedStager { let recycle_bin = Cache::builder().time_to_live(RECYCLE_BIN_TTL).build(); let recycle_bin_cloned = recycle_bin.clone(); + let notifier_cloned = notifier.clone(); let cache = Cache::builder() .max_capacity(capacity) .weigher(|_: &String, v: &CacheValue| v.weight()) .eviction_policy(EvictionPolicy::lru()) .async_eviction_listener(move |k, v, _| { let recycle_bin = recycle_bin_cloned.clone(); + if let Some(notifier) = notifier_cloned.as_ref() { + notifier.on_cache_evict(v.size()); + notifier.on_recycle_insert(v.size()); + } async move { recycle_bin.insert(k.as_str().to_string(), v).await; } @@ -92,13 +105,18 @@ impl BoundedStager { .build(); let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE); - common_runtime::global_runtime().spawn(Self::delete_routine(rx, recycle_bin.clone())); - + let notifier_cloned = notifier.clone(); + common_runtime::global_runtime().spawn(Self::delete_routine( + rx, + recycle_bin.clone(), + notifier_cloned, + )); let stager = Self { cache, base_dir, delete_queue, recycle_bin, + notifier, }; stager.recover().await?; @@ -120,29 +138,48 @@ impl Stager for BoundedStager { ) -> Result { let cache_key = Self::encode_cache_key(puffin_file_name, key); + let mut miss = false; let v = self .cache - .try_get_with(cache_key.clone(), async { + .try_get_with_by_ref(&cache_key, async { if let Some(v) = self.recycle_bin.remove(&cache_key).await { + if let Some(notifier) = self.notifier.as_ref() { + let size = v.size(); + notifier.on_cache_insert(size); + notifier.on_recycle_clear(size); + } return Ok(v); } + miss = true; + let timer = Instant::now(); let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4()); let path = self.base_dir.join(&file_name); let size = Self::write_blob(&path, init_fn).await?; - + if let Some(notifier) = self.notifier.as_ref() { + notifier.on_cache_insert(size); + notifier.on_load_blob(timer.elapsed()); + } let guard = Arc::new(FsBlobGuard { path, delete_queue: self.delete_queue.clone(), + size, }); - Ok(CacheValue::File { guard, size }) + Ok(CacheValue::File(guard)) }) .await .context(CacheGetSnafu)?; + if let Some(notifier) = self.notifier.as_ref() { + if miss { + notifier.on_cache_miss(v.size()); + } else { + notifier.on_cache_hit(v.size()); + } + } match v { - CacheValue::File { guard, .. } => Ok(guard), + CacheValue::File(guard) => Ok(guard), _ => unreachable!(), } } @@ -155,29 +192,48 @@ impl Stager for BoundedStager { ) -> Result { let cache_key = Self::encode_cache_key(puffin_file_name, key); + let mut miss = false; let v = self .cache - .try_get_with(cache_key.clone(), async { + .try_get_with_by_ref(&cache_key, async { if let Some(v) = self.recycle_bin.remove(&cache_key).await { + if let Some(notifier) = self.notifier.as_ref() { + let size = v.size(); + notifier.on_cache_insert(size); + notifier.on_recycle_clear(size); + } return Ok(v); } + miss = true; + let timer = Instant::now(); let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4()); let path = self.base_dir.join(&dir_name); let size = Self::write_dir(&path, init_fn).await?; - + if let Some(notifier) = self.notifier.as_ref() { + notifier.on_cache_insert(size); + notifier.on_load_dir(timer.elapsed()); + } let guard = Arc::new(FsDirGuard { path, + size, delete_queue: self.delete_queue.clone(), }); - Ok(CacheValue::Dir { guard, size }) + Ok(CacheValue::Dir(guard)) }) .await .context(CacheGetSnafu)?; + if let Some(notifier) = self.notifier.as_ref() { + if miss { + notifier.on_cache_miss(v.size()); + } else { + notifier.on_cache_hit(v.size()); + } + } match v { - CacheValue::Dir { guard, .. } => Ok(guard), + CacheValue::Dir(guard) => Ok(guard), _ => unreachable!(), } } @@ -194,6 +250,11 @@ impl Stager for BoundedStager { self.cache .try_get_with(cache_key.clone(), async move { if let Some(v) = self.recycle_bin.remove(&cache_key).await { + if let Some(notifier) = self.notifier.as_ref() { + let size = v.size(); + notifier.on_cache_insert(size); + notifier.on_recycle_clear(size); + } return Ok(v); } @@ -201,12 +262,15 @@ impl Stager for BoundedStager { let path = self.base_dir.join(&dir_name); fs::rename(&dir_path, &path).await.context(RenameSnafu)?; - + if let Some(notifier) = self.notifier.as_ref() { + notifier.on_cache_insert(size); + } let guard = Arc::new(FsDirGuard { path, + size, delete_queue: self.delete_queue.clone(), }); - Ok(CacheValue::Dir { guard, size }) + Ok(CacheValue::Dir(guard)) }) .await .map(|_| ()) @@ -308,32 +372,35 @@ impl BoundedStager { if meta.is_dir() { let size = Self::get_dir_size(&path).await?; - let v = CacheValue::Dir { - guard: Arc::new(FsDirGuard { - path, - delete_queue: self.delete_queue.clone(), - }), + let v = CacheValue::Dir(Arc::new(FsDirGuard { + path, size, - }; + delete_queue: self.delete_queue.clone(), + })); // A duplicate dir will be moved to the delete queue. let _dup_dir = elems.insert(key, v); } else { - let v = CacheValue::File { - guard: Arc::new(FsBlobGuard { - path, - delete_queue: self.delete_queue.clone(), - }), - size: meta.len(), - }; + let size = meta.len(); + let v = CacheValue::File(Arc::new(FsBlobGuard { + path, + size, + delete_queue: self.delete_queue.clone(), + })); // A duplicate file will be moved to the delete queue. let _dup_file = elems.insert(key, v); } } } + let mut size = 0; for (key, value) in elems { + size += value.size(); self.cache.insert(key, value).await; } + if let Some(notifier) = self.notifier.as_ref() { + notifier.on_cache_insert(size); + } + self.cache.run_pending_tasks().await; Ok(()) @@ -360,11 +427,12 @@ impl BoundedStager { async fn delete_routine( mut receiver: Receiver, recycle_bin: Cache, + notifier: Option>, ) { loop { match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await { Ok(Some(task)) => match task { - DeleteTask::File(path) => { + DeleteTask::File(path, size) => { if let Err(err) = fs::remove_file(&path).await { if err.kind() == std::io::ErrorKind::NotFound { continue; @@ -372,8 +440,13 @@ impl BoundedStager { warn!(err; "Failed to remove the file."); } + + if let Some(notifier) = notifier.as_ref() { + notifier.on_recycle_clear(size); + } } - DeleteTask::Dir(path) => { + + DeleteTask::Dir(path, size) => { let deleted_path = path.with_extension(DELETED_EXTENSION); if let Err(err) = fs::rename(&path, &deleted_path).await { if err.kind() == std::io::ErrorKind::NotFound { @@ -390,6 +463,9 @@ impl BoundedStager { if let Err(err) = fs::remove_dir_all(&deleted_path).await { warn!(err; "Failed to remove the dangling directory."); } + if let Some(notifier) = notifier.as_ref() { + notifier.on_recycle_clear(size); + } } DeleteTask::Terminate => { break; @@ -415,15 +491,15 @@ impl Drop for BoundedStager { #[derive(Debug, Clone)] enum CacheValue { - File { guard: Arc, size: u64 }, - Dir { guard: Arc, size: u64 }, + File(Arc), + Dir(Arc), } impl CacheValue { fn size(&self) -> u64 { match self { - CacheValue::File { size, .. } => *size, - CacheValue::Dir { size, .. } => *size, + CacheValue::File(guard) => guard.size, + CacheValue::Dir(guard) => guard.size, } } @@ -433,8 +509,8 @@ impl CacheValue { } enum DeleteTask { - File(PathBuf), - Dir(PathBuf), + File(PathBuf, u64), + Dir(PathBuf, u64), Terminate, } @@ -443,6 +519,7 @@ enum DeleteTask { #[derive(Debug)] pub struct FsBlobGuard { path: PathBuf, + size: u64, delete_queue: Sender, } @@ -459,7 +536,7 @@ impl Drop for FsBlobGuard { fn drop(&mut self) { if let Err(err) = self .delete_queue - .try_send(DeleteTask::File(self.path.clone())) + .try_send(DeleteTask::File(self.path.clone(), self.size)) { if matches!(err, TrySendError::Closed(_)) { return; @@ -474,6 +551,7 @@ impl Drop for FsBlobGuard { #[derive(Debug)] pub struct FsDirGuard { path: PathBuf, + size: u64, delete_queue: Sender, } @@ -487,7 +565,7 @@ impl Drop for FsDirGuard { fn drop(&mut self) { if let Err(err) = self .delete_queue - .try_send(DeleteTask::Dir(self.path.clone())) + .try_send(DeleteTask::Dir(self.path.clone(), self.size)) { if matches!(err, TrySendError::Closed(_)) { return; @@ -526,7 +604,7 @@ impl BoundedStager { let cache_key = Self::encode_cache_key(puffin_file_name, key); let value = self.cache.get(&cache_key).await.unwrap(); let path = match &value { - CacheValue::File { guard, .. } => &guard.path, + CacheValue::File(guard) => &guard.path, _ => panic!("Expected a file, but got a directory."), }; fs::File::open(path).await.unwrap() @@ -536,7 +614,7 @@ impl BoundedStager { let cache_key = Self::encode_cache_key(puffin_file_name, key); let value = self.cache.get(&cache_key).await.unwrap(); let path = match &value { - CacheValue::Dir { guard, .. } => &guard.path, + CacheValue::Dir(guard) => &guard.path, _ => panic!("Expected a directory, but got a file."), }; path.clone() @@ -550,6 +628,8 @@ impl BoundedStager { #[cfg(test)] mod tests { + use std::sync::atomic::AtomicU64; + use common_base::range_read::RangeReader; use common_test_util::temp_dir::create_temp_dir; use futures::AsyncWriteExt; @@ -559,12 +639,124 @@ mod tests { use crate::error::BlobNotFoundSnafu; use crate::puffin_manager::stager::Stager; + struct MockNotifier { + cache_insert_size: AtomicU64, + cache_evict_size: AtomicU64, + cache_hit_count: AtomicU64, + cache_hit_size: AtomicU64, + cache_miss_count: AtomicU64, + cache_miss_size: AtomicU64, + recycle_insert_size: AtomicU64, + recycle_clear_size: AtomicU64, + } + + #[derive(Debug, PartialEq, Eq)] + struct Stats { + cache_insert_size: u64, + cache_evict_size: u64, + cache_hit_count: u64, + cache_hit_size: u64, + cache_miss_count: u64, + cache_miss_size: u64, + recycle_insert_size: u64, + recycle_clear_size: u64, + } + + impl MockNotifier { + fn build() -> Arc { + Arc::new(Self { + cache_insert_size: AtomicU64::new(0), + cache_evict_size: AtomicU64::new(0), + cache_hit_count: AtomicU64::new(0), + cache_hit_size: AtomicU64::new(0), + cache_miss_count: AtomicU64::new(0), + cache_miss_size: AtomicU64::new(0), + recycle_insert_size: AtomicU64::new(0), + recycle_clear_size: AtomicU64::new(0), + }) + } + + fn stats(&self) -> Stats { + Stats { + cache_insert_size: self + .cache_insert_size + .load(std::sync::atomic::Ordering::Relaxed), + cache_evict_size: self + .cache_evict_size + .load(std::sync::atomic::Ordering::Relaxed), + cache_hit_count: self + .cache_hit_count + .load(std::sync::atomic::Ordering::Relaxed), + cache_hit_size: self + .cache_hit_size + .load(std::sync::atomic::Ordering::Relaxed), + cache_miss_count: self + .cache_miss_count + .load(std::sync::atomic::Ordering::Relaxed), + cache_miss_size: self + .cache_miss_size + .load(std::sync::atomic::Ordering::Relaxed), + recycle_insert_size: self + .recycle_insert_size + .load(std::sync::atomic::Ordering::Relaxed), + recycle_clear_size: self + .recycle_clear_size + .load(std::sync::atomic::Ordering::Relaxed), + } + } + } + + impl StagerNotifier for MockNotifier { + fn on_cache_insert(&self, size: u64) { + self.cache_insert_size + .fetch_add(size, std::sync::atomic::Ordering::Relaxed); + } + + fn on_cache_evict(&self, size: u64) { + self.cache_evict_size + .fetch_add(size, std::sync::atomic::Ordering::Relaxed); + } + + fn on_cache_hit(&self, size: u64) { + self.cache_hit_count + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.cache_hit_size + .fetch_add(size, std::sync::atomic::Ordering::Relaxed); + } + + fn on_cache_miss(&self, size: u64) { + self.cache_miss_count + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.cache_miss_size + .fetch_add(size, std::sync::atomic::Ordering::Relaxed); + } + + fn on_recycle_insert(&self, size: u64) { + self.recycle_insert_size + .fetch_add(size, std::sync::atomic::Ordering::Relaxed); + } + + fn on_recycle_clear(&self, size: u64) { + self.recycle_clear_size + .fetch_add(size, std::sync::atomic::Ordering::Relaxed); + } + + fn on_load_blob(&self, _duration: Duration) {} + + fn on_load_dir(&self, _duration: Duration) {} + } + #[tokio::test] async fn test_get_blob() { let tempdir = create_temp_dir("test_get_blob_"); - let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) - .await - .unwrap(); + let notifier = MockNotifier::build(); + let stager = BoundedStager::new( + tempdir.path().to_path_buf(), + u64::MAX, + Some(notifier.clone()), + ) + .await + .unwrap(); let puffin_file_name = "test_get_blob"; let key = "key"; @@ -593,14 +785,34 @@ mod tests { let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"hello world"); + + let stats = notifier.stats(); + assert_eq!( + stats, + Stats { + cache_insert_size: 11, + cache_evict_size: 0, + cache_hit_count: 0, + cache_hit_size: 0, + cache_miss_count: 1, + cache_miss_size: 11, + recycle_insert_size: 0, + recycle_clear_size: 0, + } + ); } #[tokio::test] async fn test_get_dir() { let tempdir = create_temp_dir("test_get_dir_"); - let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) - .await - .unwrap(); + let notifier = MockNotifier::build(); + let stager = BoundedStager::new( + tempdir.path().to_path_buf(), + u64::MAX, + Some(notifier.clone()), + ) + .await + .unwrap(); let files_in_dir = [ ("file_a", "Hello, world!".as_bytes()), @@ -618,11 +830,13 @@ mod tests { key, Box::new(|writer_provider| { Box::pin(async move { + let mut size = 0; for (rel_path, content) in &files_in_dir { + size += content.len(); let mut writer = writer_provider.writer(rel_path).await.unwrap(); writer.write_all(content).await.unwrap(); } - Ok(0) + Ok(size as _) }) }), ) @@ -645,14 +859,34 @@ mod tests { file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, *content); } + + let stats = notifier.stats(); + assert_eq!( + stats, + Stats { + cache_insert_size: 70, + cache_evict_size: 0, + cache_hit_count: 0, + cache_hit_size: 0, + cache_miss_count: 1, + cache_miss_size: 70, + recycle_insert_size: 0, + recycle_clear_size: 0 + } + ); } #[tokio::test] async fn test_recover() { let tempdir = create_temp_dir("test_recover_"); - let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) - .await - .unwrap(); + let notifier = MockNotifier::build(); + let stager = BoundedStager::new( + tempdir.path().to_path_buf(), + u64::MAX, + Some(notifier.clone()), + ) + .await + .unwrap(); // initialize stager let puffin_file_name = "test_recover"; @@ -687,11 +921,13 @@ mod tests { dir_key, Box::new(|writer_provider| { Box::pin(async move { + let mut size = 0; for (rel_path, content) in &files_in_dir { + size += content.len(); let mut writer = writer_provider.writer(rel_path).await.unwrap(); writer.write_all(content).await.unwrap(); } - Ok(0) + Ok(size as _) }) }), ) @@ -701,7 +937,7 @@ mod tests { // recover stager drop(stager); - let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None) .await .unwrap(); @@ -736,14 +972,31 @@ mod tests { file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, *content); } + + let stats = notifier.stats(); + assert_eq!( + stats, + Stats { + cache_insert_size: 81, + cache_evict_size: 0, + cache_hit_count: 0, + cache_hit_size: 0, + cache_miss_count: 2, + cache_miss_size: 81, + recycle_insert_size: 0, + recycle_clear_size: 0 + } + ); } #[tokio::test] async fn test_eviction() { let tempdir = create_temp_dir("test_eviction_"); + let notifier = MockNotifier::build(); let stager = BoundedStager::new( tempdir.path().to_path_buf(), 1, /* extremely small size */ + Some(notifier.clone()), ) .await .unwrap(); @@ -773,6 +1026,21 @@ mod tests { stager.cache.run_pending_tasks().await; assert!(!stager.in_cache(puffin_file_name, blob_key)); + let stats = notifier.stats(); + assert_eq!( + stats, + Stats { + cache_insert_size: 11, + cache_evict_size: 11, + cache_hit_count: 0, + cache_hit_size: 0, + cache_miss_count: 1, + cache_miss_size: 11, + recycle_insert_size: 11, + recycle_clear_size: 0 + } + ); + let m = reader.metadata().await.unwrap(); let buf = reader.read(0..m.content_length).await.unwrap(); assert_eq!(&*buf, b"Hello world"); @@ -794,6 +1062,21 @@ mod tests { stager.cache.run_pending_tasks().await; assert!(!stager.in_cache(puffin_file_name, blob_key)); + let stats = notifier.stats(); + assert_eq!( + stats, + Stats { + cache_insert_size: 22, + cache_evict_size: 22, + cache_hit_count: 1, + cache_hit_size: 11, + cache_miss_count: 1, + cache_miss_size: 11, + recycle_insert_size: 22, + recycle_clear_size: 11 + } + ); + let m = reader.metadata().await.unwrap(); let buf = reader.read(0..m.content_length).await.unwrap(); assert_eq!(&*buf, b"Hello world"); @@ -839,6 +1122,21 @@ mod tests { stager.cache.run_pending_tasks().await; assert!(!stager.in_cache(puffin_file_name, dir_key)); + let stats = notifier.stats(); + assert_eq!( + stats, + Stats { + cache_insert_size: 92, + cache_evict_size: 92, + cache_hit_count: 1, + cache_hit_size: 11, + cache_miss_count: 2, + cache_miss_size: 81, + recycle_insert_size: 92, + recycle_clear_size: 11 + } + ); + // Second time to get the directory let guard_1 = stager .get_dir( @@ -861,6 +1159,21 @@ mod tests { stager.cache.run_pending_tasks().await; assert!(!stager.in_cache(puffin_file_name, dir_key)); + let stats = notifier.stats(); + assert_eq!( + stats, + Stats { + cache_insert_size: 162, + cache_evict_size: 162, + cache_hit_count: 2, + cache_hit_size: 81, + cache_miss_count: 2, + cache_miss_size: 81, + recycle_insert_size: 162, + recycle_clear_size: 81 + } + ); + // Third time to get the directory and all guards are dropped drop(guard_0); drop(guard_1); @@ -884,12 +1197,27 @@ mod tests { file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, *content); } + + let stats = notifier.stats(); + assert_eq!( + stats, + Stats { + cache_insert_size: 232, + cache_evict_size: 232, + cache_hit_count: 3, + cache_hit_size: 151, + cache_miss_count: 2, + cache_miss_size: 81, + recycle_insert_size: 232, + recycle_clear_size: 151 + } + ); } #[tokio::test] async fn test_get_blob_concurrency_on_fail() { let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_"); - let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None) .await .unwrap(); @@ -926,7 +1254,7 @@ mod tests { #[tokio::test] async fn test_get_dir_concurrency_on_fail() { let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_"); - let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None) .await .unwrap(); diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index 23756aec64..b4d3450fd5 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -32,7 +32,7 @@ async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc