diff --git a/src/index/src/fulltext_index/tests.rs b/src/index/src/fulltext_index/tests.rs index abdf20e22d..2198ea67b3 100644 --- a/src/index/src/fulltext_index/tests.rs +++ b/src/index/src/fulltext_index/tests.rs @@ -74,7 +74,7 @@ async fn test_search( writer.finish().await.unwrap(); let reader = puffin_manager.reader(&file_name).await.unwrap(); - let index_dir = reader.dir(&blob_key).await.unwrap(); + let (index_dir, _metrics) = reader.dir(&blob_key).await.unwrap(); let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap(); for (query, expected) in query_expected { let results = searcher.search(query).await.unwrap(); diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index a8fcf88220..6fa4b337af 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -61,10 +61,32 @@ pub struct FulltextIndexApplyMetrics { pub apply_elapsed: std::time::Duration, /// Number of blob cache misses. pub blob_cache_miss: usize, + /// Number of directory cache hits. + pub dir_cache_hit: usize, + /// Number of directory cache misses. + pub dir_cache_miss: usize, + /// Elapsed time to initialize directory data. + pub dir_init_elapsed: std::time::Duration, /// Metrics for bloom filter reads. pub bloom_filter_read_metrics: BloomFilterReadMetrics, } +impl FulltextIndexApplyMetrics { + /// Collects metrics from a directory read operation. + pub fn collect_dir_metrics( + &mut self, + elapsed: std::time::Duration, + dir_metrics: puffin::puffin_manager::DirMetrics, + ) { + self.dir_init_elapsed += elapsed; + if dir_metrics.cache_hit { + self.dir_cache_hit += 1; + } else { + self.dir_cache_miss += 1; + } + } +} + /// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files pub struct FulltextIndexApplier { /// Requests to be applied. @@ -576,20 +598,27 @@ impl IndexSource { file_id: RegionFileId, key: &str, file_size_hint: Option, - metrics: Option<&mut FulltextIndexApplyMetrics>, + mut metrics: Option<&mut FulltextIndexApplyMetrics>, ) -> Result>> { let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?; // Track cache miss if fallbacked to remote if fallbacked { - if let Some(m) = metrics { + if let Some(m) = &mut metrics { m.blob_cache_miss += 1; } } + let start = metrics.as_ref().map(|_| Instant::now()); let res = reader.dir(key).await; match res { - Ok(dir) => Ok(Some(dir)), + Ok((dir, dir_metrics)) => { + if let Some(m) = metrics { + // Safety: start is Some when metrics is Some + m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics); + } + Ok(Some(dir)) + } Err(err) if err.is_blob_not_found() => Ok(None), Err(err) => { if fallbacked { @@ -597,9 +626,16 @@ impl IndexSource { } else { warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file."); let reader = self.build_remote(file_id, file_size_hint).await?; + let start = metrics.as_ref().map(|_| Instant::now()); let res = reader.dir(key).await; match res { - Ok(dir) => Ok(Some(dir)), + Ok((dir, dir_metrics)) => { + if let Some(m) = metrics { + // Safety: start is Some when metrics is Some + m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics); + } + Ok(Some(dir)) + } Err(err) if err.is_blob_not_found() => Ok(None), Err(err) => Err(err).context(PuffinReadBlobSnafu), } diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 3f8d3f8819..0a9d25e1fe 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -245,7 +245,7 @@ mod tests { let bs = blob_reader.read(0..meta.content_length).await.unwrap(); assert_eq!(&*bs, raw_data); - let dir_guard = reader.dir(dir_key).await.unwrap(); + let (dir_guard, _metrics) = reader.dir(dir_key).await.unwrap(); let file = dir_guard.path().join("hello"); let data = tokio::fs::read(file).await.unwrap(); assert_eq!(data, raw_data); diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 9f287128c1..060bd4237b 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -32,6 +32,15 @@ use crate::blob_metadata::{BlobMetadata, CompressionCodec}; use crate::error::Result; use crate::file_metadata::FileMetadata; +/// Metrics returned by `PuffinReader::dir` operations. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct DirMetrics { + /// Whether this was a cache hit (true) or cache miss (false). + pub cache_hit: bool, + /// Size of the directory in bytes. + pub dir_size: u64, +} + /// The `PuffinManager` trait provides a unified interface for creating `PuffinReader` and `PuffinWriter`. #[async_trait] pub trait PuffinManager { @@ -106,9 +115,10 @@ pub trait PuffinReader { /// Reads a directory from the Puffin file. /// - /// The returned `GuardWithMetadata` is used to access the directory data and its metadata. + /// The returned tuple contains `GuardWithMetadata` and `DirMetrics`. + /// The `GuardWithMetadata` is used to access the directory data and its metadata. /// Users should hold the `GuardWithMetadata` until they are done with the directory data. - async fn dir(&self, key: &str) -> Result>; + async fn dir(&self, key: &str) -> Result<(GuardWithMetadata, DirMetrics)>; } /// `BlobGuard` is provided by the `PuffinReader` to access the blob data. diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 8339d32c95..c660d1e19a 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -36,7 +36,7 @@ use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef; use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager}; -use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader}; +use crate::puffin_manager::{BlobGuard, DirMetrics, GuardWithMetadata, PuffinReader}; /// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files. pub struct FsPuffinReader @@ -130,10 +130,10 @@ where Ok(GuardWithMetadata::new(blob, blob_metadata)) } - async fn dir(&self, key: &str) -> Result> { + async fn dir(&self, key: &str) -> Result<(GuardWithMetadata, DirMetrics)> { let mut file = self.puffin_reader().await?; let blob_metadata = self.get_blob_metadata(key, &mut file).await?; - let dir = self + let (dir, metrics) = self .stager .get_dir( &self.handle, @@ -153,7 +153,7 @@ where ) .await?; - Ok(GuardWithMetadata::new(dir, blob_metadata)) + Ok((GuardWithMetadata::new(dir, blob_metadata), metrics)) } } diff --git a/src/puffin/src/puffin_manager/stager.rs b/src/puffin/src/puffin_manager/stager.rs index 708053bb27..512e94f4e8 100644 --- a/src/puffin/src/puffin_manager/stager.rs +++ b/src/puffin/src/puffin_manager/stager.rs @@ -23,7 +23,7 @@ use futures::AsyncWrite; use futures::future::BoxFuture; use crate::error::Result; -use crate::puffin_manager::{BlobGuard, DirGuard}; +use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics}; pub type BoxWriter = Box; @@ -72,14 +72,15 @@ pub trait Stager: Send + Sync { /// Retrieves a directory, initializing it if necessary using the provided `init_fn`. /// - /// The returned `DirGuard` is used to access the directory in the filesystem. + /// The returned tuple contains the `DirGuard` and `DirMetrics`. + /// The `DirGuard` is used to access the directory in the filesystem. /// The caller is responsible for holding the `DirGuard` until they are done with the directory. async fn get_dir<'a>( &self, handle: &Self::FileHandle, key: &str, init_fn: Box, - ) -> Result; + ) -> Result<(Self::Dir, DirMetrics)>; /// Stores a directory in the staging area. async fn put_dir( diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index 380cce7930..dfb9285452 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -41,7 +41,7 @@ use crate::error::{ use crate::puffin_manager::stager::{ BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier, }; -use crate::puffin_manager::{BlobGuard, DirGuard}; +use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics}; const DELETE_QUEUE_SIZE: usize = 10240; const TMP_EXTENSION: &str = "tmp"; @@ -203,7 +203,7 @@ impl Stager for BoundedStager { handle: &Self::FileHandle, key: &str, init_fn: Box, - ) -> Result { + ) -> Result<(Self::Dir, DirMetrics)> { let handle_str = handle.to_string(); let cache_key = Self::encode_cache_key(&handle_str, key); @@ -242,15 +242,22 @@ impl Stager for BoundedStager { .await .context(CacheGetSnafu)?; + let dir_size = v.size(); if let Some(notifier) = self.notifier.as_ref() { if miss { - notifier.on_cache_miss(v.size()); + notifier.on_cache_miss(dir_size); } else { - notifier.on_cache_hit(v.size()); + notifier.on_cache_hit(dir_size); } } + + let metrics = DirMetrics { + cache_hit: !miss, + dir_size, + }; + match v { - CacheValue::Dir(guard) => Ok(guard), + CacheValue::Dir(guard) => Ok((guard, metrics)), _ => unreachable!(), } } @@ -882,7 +889,7 @@ mod tests { let puffin_file_name = "test_get_dir".to_string(); let key = "key"; - let dir_path = stager + let (dir_path, metrics) = stager .get_dir( &puffin_file_name, key, @@ -901,6 +908,9 @@ mod tests { .await .unwrap(); + assert!(!metrics.cache_hit); + assert!(metrics.dir_size > 0); + for (rel_path, content) in &files_in_dir { let file_path = dir_path.path().join(rel_path); let mut file = tokio::fs::File::open(&file_path).await.unwrap(); @@ -974,7 +984,7 @@ mod tests { ]; let dir_key = "dir_key"; - let guard = stager + let (guard, _metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1016,7 +1026,7 @@ mod tests { let buf = reader.read(0..m.content_length).await.unwrap(); assert_eq!(&*buf, b"hello world"); - let dir_path = stager + let (dir_path, metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1024,6 +1034,9 @@ mod tests { ) .await .unwrap(); + + assert!(metrics.cache_hit); + assert!(metrics.dir_size > 0); for (rel_path, content) in &files_in_dir { let file_path = dir_path.path().join(rel_path); let mut file = tokio::fs::File::open(&file_path).await.unwrap(); @@ -1151,7 +1164,7 @@ mod tests { ]; // First time to get the directory - let guard_0 = stager + let (guard_0, _metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1198,7 +1211,7 @@ mod tests { ); // Second time to get the directory - let guard_1 = stager + let (guard_1, _metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1237,7 +1250,7 @@ mod tests { // Third time to get the directory and all guards are dropped drop(guard_0); drop(guard_1); - let guard_2 = stager + let (guard_2, _metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1390,7 +1403,7 @@ mod tests { ]; let dir_key = "dir_key"; - let guard = stager + let (guard, _metrics) = stager .get_dir( &puffin_file_name, dir_key, diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index 715668e40e..f1ee9fabd7 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -356,7 +356,7 @@ async fn check_dir( stager: &BoundedStager, puffin_reader: &impl PuffinReader, ) { - let res_dir = puffin_reader.dir(key).await.unwrap(); + let (res_dir, _metrics) = puffin_reader.dir(key).await.unwrap(); let metadata = res_dir.metadata(); assert_eq!( metadata.properties,