From b68286e8af4d8e3c0f4d86f0fec2e0a1fa58a652 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 20 Nov 2025 13:52:41 +0800 Subject: [PATCH] feat: add metrics to bloom applier Signed-off-by: evenyag --- .../src/sst/index/bloom_filter/applier.rs | 44 ++++++++++++++++--- src/mito2/src/sst/parquet/reader.rs | 3 +- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 3fa387c8dc..3a8df9801d 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -17,6 +17,7 @@ mod builder; use std::collections::BTreeMap; use std::ops::Range; use std::sync::Arc; +use std::time::Instant; use common_base::range_read::RangeReader; use common_telemetry::warn; @@ -47,6 +48,17 @@ use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; +/// Metrics for tracking bloom filter index apply operations. +#[derive(Debug, Default, Clone)] +pub struct BloomFilterIndexApplyMetrics { + /// Total time spent applying the index. + pub apply_elapsed: std::time::Duration, + /// Number of blob cache misses. + pub blob_cache_miss: usize, + /// Total size of blobs read (in bytes). + pub blob_read_bytes: u64, +} + pub(crate) type BloomFilterIndexApplierRef = Arc; /// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file. @@ -133,15 +145,20 @@ impl BloomFilterIndexApplier { /// /// Row group id existing in the returned result means that the row group is searched. /// Empty ranges means that the row group is searched but no rows are found. + /// + /// # Arguments + /// * `file_id` - The region file ID to apply predicates to + /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads + /// * `row_groups` - Iterator of row group lengths and whether to search in the row group + /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply( &self, file_id: RegionFileId, file_size_hint: Option, row_groups: impl Iterator, + mut metrics: Option<&mut BloomFilterIndexApplyMetrics>, ) -> Result>)>> { - let _timer = INDEX_APPLY_ELAPSED - .with_label_values(&[TYPE_BLOOM_FILTER_INDEX]) - .start_timer(); + let apply_start = Instant::now(); // Calculates row groups' ranges based on start of the file. let mut input = Vec::with_capacity(row_groups.size_hint().0); @@ -163,7 +180,7 @@ impl BloomFilterIndexApplier { for (column_id, predicates) in self.predicates.iter() { let blob = match self - .blob_reader(file_id, *column_id, file_size_hint) + .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut()) .await? { Some(blob) => blob, @@ -173,6 +190,9 @@ impl BloomFilterIndexApplier { // Create appropriate reader based on whether we have caching enabled if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache { let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length; + if let Some(m) = &mut metrics { + m.blob_read_bytes += blob_size; + } let reader = CachedBloomFilterIndexBlobReader::new( file_id.file_id(), *column_id, @@ -201,6 +221,16 @@ impl BloomFilterIndexApplier { } } + // Record elapsed time to histogram and collect metrics if requested + let elapsed = apply_start.elapsed(); + INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_BLOOM_FILTER_INDEX]) + .observe(elapsed.as_secs_f64()); + + if let Some(m) = metrics { + m.apply_elapsed += elapsed; + } + Ok(output) } @@ -212,6 +242,7 @@ impl BloomFilterIndexApplier { file_id: RegionFileId, column_id: ColumnId, file_size_hint: Option, + metrics: Option<&mut BloomFilterIndexApplyMetrics>, ) -> Result> { let reader = match self .cached_blob_reader(file_id, column_id, file_size_hint) @@ -219,6 +250,9 @@ impl BloomFilterIndexApplier { { Ok(Some(puffin_reader)) => puffin_reader, other => { + if let Some(m) = metrics { + m.blob_cache_miss += 1; + } if let Err(err) = other { // Blob not found means no index for this column if is_blob_not_found(&err) { @@ -393,7 +427,7 @@ mod tests { let applier = builder.build(&exprs).unwrap().unwrap(); applier - .apply(file_id, None, row_groups.into_iter()) + .apply(file_id, None, row_groups.into_iter(), None) .await .unwrap() .into_iter() diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 4f8bc20715..8f1ef08d7e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -670,8 +670,9 @@ impl ParquetReaderBuilder { .unwrap_or(true), ) }); + // TODO(yingwen): Collect metrics for applier let apply_res = index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint), rgs) + .apply(self.file_handle.file_id(), Some(file_size_hint), rgs, None) .await; let mut selection = match apply_res { Ok(apply_output) => {