From 4519607bc6b249fd86691e928aadde6adac747ed Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 20 Nov 2025 12:59:36 +0800 Subject: [PATCH] feat: add inverted applier metrics Signed-off-by: evenyag --- .../src/sst/index/inverted_index/applier.rs | 49 ++++++++++++++++--- .../src/sst/index/inverted_index/creator.rs | 2 +- src/mito2/src/sst/parquet/reader.rs | 3 +- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 350880cc9f..0cbb8386d9 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -16,6 +16,7 @@ pub mod builder; use std::collections::BTreeMap; use std::sync::Arc; +use std::time::Instant; use common_base::range_read::RangeReader; use common_telemetry::warn; @@ -44,6 +45,17 @@ use crate::sst::index::TYPE_INVERTED_INDEX; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; +/// Metrics for tracking inverted index apply operations. +#[derive(Debug, Default, Clone)] +pub struct InvertedIndexApplyMetrics { + /// Total time spent applying the index. + pub apply_elapsed: std::time::Duration, + /// Number of blob cache misses (0 or 1). + pub blob_cache_miss: usize, + /// Total size of blobs read (in bytes). + pub blob_read_bytes: u64, +} + /// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files /// and returning the relevant row group ids for further scan. pub(crate) struct InvertedIndexApplier { @@ -124,24 +136,30 @@ impl InvertedIndexApplier { self } - /// Applies predicates to the provided SST file id and returns the relevant row group ids + /// Applies predicates to the provided SST file id and returns the relevant row group ids. + /// + /// # Arguments + /// * `file_id` - The region file ID to apply predicates to + /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads + /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply( &self, file_id: RegionFileId, file_size_hint: Option, + metrics: Option<&mut InvertedIndexApplyMetrics>, ) -> Result { - let _timer = INDEX_APPLY_ELAPSED - .with_label_values(&[TYPE_INVERTED_INDEX]) - .start_timer(); + let start = Instant::now(); let context = SearchContext { // Encountering a non-existing column indicates that it doesn't match predicates. index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, }; + let mut cache_miss = 0; let blob = match self.cached_blob_reader(file_id, file_size_hint).await { Ok(Some(puffin_reader)) => puffin_reader, other => { + cache_miss += 1; if let Err(err) = other { warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") } @@ -149,8 +167,9 @@ impl InvertedIndexApplier { } }; - if let Some(index_cache) = &self.inverted_index_cache { - let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length; + let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length; + + let result = if let Some(index_cache) = &self.inverted_index_cache { let mut index_reader = CachedInvertedIndexBlobReader::new( file_id.file_id(), blob_size, @@ -167,7 +186,21 @@ impl InvertedIndexApplier { .apply(context, &mut index_reader) .await .context(ApplyInvertedIndexSnafu) + }; + + // Record elapsed time to histogram and collect metrics if requested + let elapsed = start.elapsed(); + INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_INVERTED_INDEX]) + .observe(elapsed.as_secs_f64()); + + if let Some(metrics) = metrics { + metrics.apply_elapsed = elapsed; + metrics.blob_cache_miss = cache_miss; + metrics.blob_read_bytes = blob_size; } + + result } /// Creates a blob reader from the cached index file. @@ -297,7 +330,7 @@ mod tests { puffin_manager_factory, Default::default(), ); - let output = sst_index_applier.apply(file_id, None).await.unwrap(); + let output = sst_index_applier.apply(file_id, None, None).await.unwrap(); assert_eq!( output, ApplyOutput { @@ -345,7 +378,7 @@ mod tests { puffin_manager_factory, Default::default(), ); - let res = sst_index_applier.apply(file_id, None).await; + let res = sst_index_applier.apply(file_id, None, None).await; assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found")); } } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index f31cfaf1dc..a784d01c21 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -615,7 +615,7 @@ mod tests { .unwrap(); Box::pin(async move { applier - .apply(sst_file_id, None) + .apply(sst_file_id, None, None) .await .unwrap() .matched_segment_ids diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 2c77145e5b..4f8bc20715 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -594,8 +594,9 @@ impl ParquetReaderBuilder { // Slow path: apply the index from the file. let file_size_hint = self.file_handle.meta_ref().index_file_size(); + // TODO(yingwen): Collect applier metrics in verbose mode. let apply_res = index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint)) + .apply(self.file_handle.file_id(), Some(file_size_hint), None) .await; let selection = match apply_res { Ok(output) => RowGroupSelection::from_inverted_index_apply_output(