feat: collect fulltext dir metrics for applier

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-21 20:24:58 +08:00
committed by shuiyisong
parent 50b5c90d53
commit 5329efcdba
8 changed files with 88 additions and 28 deletions

View File

@@ -74,7 +74,7 @@ async fn test_search(
writer.finish().await.unwrap();
let reader = puffin_manager.reader(&file_name).await.unwrap();
let index_dir = reader.dir(&blob_key).await.unwrap();
let (index_dir, _metrics) = reader.dir(&blob_key).await.unwrap();
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap();
for (query, expected) in query_expected {
let results = searcher.search(query).await.unwrap();

View File

@@ -61,10 +61,32 @@ pub struct FulltextIndexApplyMetrics {
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses.
pub blob_cache_miss: usize,
/// Number of directory cache hits.
pub dir_cache_hit: usize,
/// Number of directory cache misses.
pub dir_cache_miss: usize,
/// Elapsed time to initialize directory data.
pub dir_init_elapsed: std::time::Duration,
/// Metrics for bloom filter reads.
pub bloom_filter_read_metrics: BloomFilterReadMetrics,
}
impl FulltextIndexApplyMetrics {
/// Collects metrics from a directory read operation.
pub fn collect_dir_metrics(
&mut self,
elapsed: std::time::Duration,
dir_metrics: puffin::puffin_manager::DirMetrics,
) {
self.dir_init_elapsed += elapsed;
if dir_metrics.cache_hit {
self.dir_cache_hit += 1;
} else {
self.dir_cache_miss += 1;
}
}
}
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
pub struct FulltextIndexApplier {
/// Requests to be applied.
@@ -576,20 +598,27 @@ impl IndexSource {
file_id: RegionFileId,
key: &str,
file_size_hint: Option<u64>,
metrics: Option<&mut FulltextIndexApplyMetrics>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
// Track cache miss if fallbacked to remote
if fallbacked {
if let Some(m) = metrics {
if let Some(m) = &mut metrics {
m.blob_cache_miss += 1;
}
}
let start = metrics.as_ref().map(|_| Instant::now());
let res = reader.dir(key).await;
match res {
Ok(dir) => Ok(Some(dir)),
Ok((dir, dir_metrics)) => {
if let Some(m) = metrics {
// Safety: start is Some when metrics is Some
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
}
Ok(Some(dir))
}
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => {
if fallbacked {
@@ -597,9 +626,16 @@ impl IndexSource {
} else {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
let reader = self.build_remote(file_id, file_size_hint).await?;
let start = metrics.as_ref().map(|_| Instant::now());
let res = reader.dir(key).await;
match res {
Ok(dir) => Ok(Some(dir)),
Ok((dir, dir_metrics)) => {
if let Some(m) = metrics {
// Safety: start is Some when metrics is Some
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
}
Ok(Some(dir))
}
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}

View File

@@ -245,7 +245,7 @@ mod tests {
let bs = blob_reader.read(0..meta.content_length).await.unwrap();
assert_eq!(&*bs, raw_data);
let dir_guard = reader.dir(dir_key).await.unwrap();
let (dir_guard, _metrics) = reader.dir(dir_key).await.unwrap();
let file = dir_guard.path().join("hello");
let data = tokio::fs::read(file).await.unwrap();
assert_eq!(data, raw_data);

View File

@@ -32,6 +32,15 @@ use crate::blob_metadata::{BlobMetadata, CompressionCodec};
use crate::error::Result;
use crate::file_metadata::FileMetadata;
/// Metrics returned by `PuffinReader::dir` operations.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct DirMetrics {
/// Whether this was a cache hit (true) or cache miss (false).
pub cache_hit: bool,
/// Size of the directory in bytes.
pub dir_size: u64,
}
/// The `PuffinManager` trait provides a unified interface for creating `PuffinReader` and `PuffinWriter`.
#[async_trait]
pub trait PuffinManager {
@@ -106,9 +115,10 @@ pub trait PuffinReader {
/// Reads a directory from the Puffin file.
///
/// The returned `GuardWithMetadata` is used to access the directory data and its metadata.
/// The returned tuple contains `GuardWithMetadata` and `DirMetrics`.
/// The `GuardWithMetadata` is used to access the directory data and its metadata.
/// Users should hold the `GuardWithMetadata` until they are done with the directory data.
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>>;
async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)>;
}
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.

View File

@@ -36,7 +36,7 @@ use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader};
use crate::puffin_manager::{BlobGuard, DirMetrics, GuardWithMetadata, PuffinReader};
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<S, F>
@@ -130,10 +130,10 @@ where
Ok(GuardWithMetadata::new(blob, blob_metadata))
}
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>> {
async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)> {
let mut file = self.puffin_reader().await?;
let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
let dir = self
let (dir, metrics) = self
.stager
.get_dir(
&self.handle,
@@ -153,7 +153,7 @@ where
)
.await?;
Ok(GuardWithMetadata::new(dir, blob_metadata))
Ok((GuardWithMetadata::new(dir, blob_metadata), metrics))
}
}

View File

@@ -23,7 +23,7 @@ use futures::AsyncWrite;
use futures::future::BoxFuture;
use crate::error::Result;
use crate::puffin_manager::{BlobGuard, DirGuard};
use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
@@ -72,14 +72,15 @@ pub trait Stager: Send + Sync {
/// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The returned tuple contains the `DirGuard` and `DirMetrics`.
/// The `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn get_dir<'a>(
&self,
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>;
) -> Result<(Self::Dir, DirMetrics)>;
/// Stores a directory in the staging area.
async fn put_dir(

View File

@@ -41,7 +41,7 @@ use crate::error::{
use crate::puffin_manager::stager::{
BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
};
use crate::puffin_manager::{BlobGuard, DirGuard};
use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
const DELETE_QUEUE_SIZE: usize = 10240;
const TMP_EXTENSION: &str = "tmp";
@@ -203,7 +203,7 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir> {
) -> Result<(Self::Dir, DirMetrics)> {
let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key);
@@ -242,15 +242,22 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
.await
.context(CacheGetSnafu)?;
let dir_size = v.size();
if let Some(notifier) = self.notifier.as_ref() {
if miss {
notifier.on_cache_miss(v.size());
notifier.on_cache_miss(dir_size);
} else {
notifier.on_cache_hit(v.size());
notifier.on_cache_hit(dir_size);
}
}
let metrics = DirMetrics {
cache_hit: !miss,
dir_size,
};
match v {
CacheValue::Dir(guard) => Ok(guard),
CacheValue::Dir(guard) => Ok((guard, metrics)),
_ => unreachable!(),
}
}
@@ -882,7 +889,7 @@ mod tests {
let puffin_file_name = "test_get_dir".to_string();
let key = "key";
let dir_path = stager
let (dir_path, metrics) = stager
.get_dir(
&puffin_file_name,
key,
@@ -901,6 +908,9 @@ mod tests {
.await
.unwrap();
assert!(!metrics.cache_hit);
assert!(metrics.dir_size > 0);
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -974,7 +984,7 @@ mod tests {
];
let dir_key = "dir_key";
let guard = stager
let (guard, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1016,7 +1026,7 @@ mod tests {
let buf = reader.read(0..m.content_length).await.unwrap();
assert_eq!(&*buf, b"hello world");
let dir_path = stager
let (dir_path, metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1024,6 +1034,9 @@ mod tests {
)
.await
.unwrap();
assert!(metrics.cache_hit);
assert!(metrics.dir_size > 0);
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -1151,7 +1164,7 @@ mod tests {
];
// First time to get the directory
let guard_0 = stager
let (guard_0, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1198,7 +1211,7 @@ mod tests {
);
// Second time to get the directory
let guard_1 = stager
let (guard_1, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1237,7 +1250,7 @@ mod tests {
// Third time to get the directory and all guards are dropped
drop(guard_0);
drop(guard_1);
let guard_2 = stager
let (guard_2, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1390,7 +1403,7 @@ mod tests {
];
let dir_key = "dir_key";
let guard = stager
let (guard, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,

View File

@@ -356,7 +356,7 @@ async fn check_dir(
stager: &BoundedStager<String>,
puffin_reader: &impl PuffinReader,
) {
let res_dir = puffin_reader.dir(key).await.unwrap();
let (res_dir, _metrics) = puffin_reader.dir(key).await.unwrap();
let metadata = res_dir.metadata();
assert_eq!(
metadata.properties,