feat: add inverted applier metrics

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-20 12:59:36 +08:00
committed by shuiyisong
parent 5472bdfc0f
commit 4519607bc6
3 changed files with 44 additions and 10 deletions

View File

@@ -16,6 +16,7 @@ pub mod builder;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
@@ -44,6 +45,17 @@ use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking inverted index apply operations.
#[derive(Debug, Default, Clone)]
pub struct InvertedIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses (0 or 1).
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
}
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
pub(crate) struct InvertedIndexApplier {
@@ -124,24 +136,30 @@ impl InvertedIndexApplier {
self
}
/// Applies predicates to the provided SST file id and returns the relevant row group ids
/// Applies predicates to the provided SST file id and returns the relevant row group ids.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
metrics: Option<&mut InvertedIndexApplyMetrics>,
) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.start_timer();
let start = Instant::now();
let context = SearchContext {
// Encountering a non-existing column indicates that it doesn't match predicates.
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
};
let mut cache_miss = 0;
let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
cache_miss += 1;
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
@@ -149,8 +167,9 @@ impl InvertedIndexApplier {
}
};
if let Some(index_cache) = &self.inverted_index_cache {
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let result = if let Some(index_cache) = &self.inverted_index_cache {
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id.file_id(),
blob_size,
@@ -167,7 +186,21 @@ impl InvertedIndexApplier {
.apply(context, &mut index_reader)
.await
.context(ApplyInvertedIndexSnafu)
};
// Record elapsed time to histogram and collect metrics if requested
let elapsed = start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(metrics) = metrics {
metrics.apply_elapsed = elapsed;
metrics.blob_cache_miss = cache_miss;
metrics.blob_read_bytes = blob_size;
}
result
}
/// Creates a blob reader from the cached index file.
@@ -297,7 +330,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let output = sst_index_applier.apply(file_id, None).await.unwrap();
let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
assert_eq!(
output,
ApplyOutput {
@@ -345,7 +378,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let res = sst_index_applier.apply(file_id, None).await;
let res = sst_index_applier.apply(file_id, None, None).await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
}

View File

@@ -615,7 +615,7 @@ mod tests {
.unwrap();
Box::pin(async move {
applier
.apply(sst_file_id, None)
.apply(sst_file_id, None, None)
.await
.unwrap()
.matched_segment_ids

View File

@@ -594,8 +594,9 @@ impl ParquetReaderBuilder {
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
// TODO(yingwen): Collect applier metrics in verbose mode.
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint))
.apply(self.file_handle.file_id(), Some(file_size_hint), None)
.await;
let selection = match apply_res {
Ok(output) => RowGroupSelection::from_inverted_index_apply_output(