From cce4d56e00f97c0be1ad275898b4ced7114ae824 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 24 Nov 2025 21:06:07 +0800 Subject: [PATCH] feat: add apply metrics Signed-off-by: evenyag --- src/index/src/bloom_filter/reader.rs | 11 +++ src/index/src/inverted_index/format/reader.rs | 11 +++ src/mito2/src/read/scan_util.rs | 4 ++ .../src/sst/index/bloom_filter/applier.rs | 10 +++ .../src/sst/index/fulltext_index/applier.rs | 11 +++ .../src/sst/index/inverted_index/applier.rs | 11 +++ src/mito2/src/sst/parquet/reader.rs | 72 +++++++++++++++---- 7 files changed, 116 insertions(+), 14 deletions(-) diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index b88d32ebf4..7c58fc9de5 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -50,6 +50,17 @@ pub struct BloomFilterReadMetrics { pub cache_miss: usize, } +impl BloomFilterReadMetrics { + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.total_bytes += other.total_bytes; + self.total_ranges += other.total_ranges; + self.fetch_elapsed += other.fetch_elapsed; + self.cache_hit += other.cache_hit; + self.cache_miss += other.cache_miss; + } +} + /// Safely converts bytes to Vec using bytemuck for optimal performance. /// Faster than chunking and converting each piece individually. /// diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 182df92e86..eb78afbfe1 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -45,6 +45,17 @@ pub struct InvertedIndexReadMetrics { pub cache_miss: usize, } +impl InvertedIndexReadMetrics { + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.total_bytes += other.total_bytes; + self.total_ranges += other.total_ranges; + self.fetch_elapsed += other.fetch_elapsed; + self.cache_hit += other.cache_hit; + self.cache_miss += other.cache_miss; + } +} + /// InvertedIndexReader defines an asynchronous reader of inverted index data #[mockall::automock] #[async_trait] diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index b62d8b5229..cc2f567a21 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -305,6 +305,10 @@ impl ScanMetricsSet { rows_inverted_filtered, rows_bloom_filtered, rows_precise_filtered, + // Optional applier metrics are not collected here. + inverted_index_apply_metrics: _, + bloom_filter_apply_metrics: _, + fulltext_index_apply_metrics: _, }, num_record_batches, num_batches, diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 31106013d3..39e9f9bb32 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -63,6 +63,16 @@ pub struct BloomFilterIndexApplyMetrics { pub read_metrics: BloomFilterReadMetrics, } +impl BloomFilterIndexApplyMetrics { + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.apply_elapsed += other.apply_elapsed; + self.blob_cache_miss += other.blob_cache_miss; + self.blob_read_bytes += other.blob_read_bytes; + self.read_metrics.merge_from(&other.read_metrics); + } +} + pub(crate) type BloomFilterIndexApplierRef = Arc; /// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file. diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 6fa4b337af..7eb6b71b32 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -85,6 +85,17 @@ impl FulltextIndexApplyMetrics { self.dir_cache_miss += 1; } } + + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.apply_elapsed += other.apply_elapsed; + self.blob_cache_miss += other.blob_cache_miss; + self.dir_cache_hit += other.dir_cache_hit; + self.dir_cache_miss += other.dir_cache_miss; + self.dir_init_elapsed += other.dir_init_elapsed; + self.bloom_filter_read_metrics + .merge_from(&other.bloom_filter_read_metrics); + } } /// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index b4a8034c3a..aede343df9 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -58,6 +58,17 @@ pub struct InvertedIndexApplyMetrics { pub inverted_index_read_metrics: InvertedIndexReadMetrics, } +impl InvertedIndexApplyMetrics { + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.apply_elapsed += other.apply_elapsed; + self.blob_cache_miss += other.blob_cache_miss; + self.blob_read_bytes += other.blob_read_bytes; + self.inverted_index_read_metrics + .merge_from(&other.inverted_index_read_metrics); + } +} + /// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files /// and returning the relevant row group ids for further scan. pub(crate) struct InvertedIndexApplier { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index aacfd53cdf..2f4d204837 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -52,9 +52,15 @@ use crate::metrics::{ use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileHandle; -use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; -use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; -use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; +use crate::sst::index::bloom_filter::applier::{ + BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics, +}; +use crate::sst::index::fulltext_index::applier::{ + FulltextIndexApplierRef, FulltextIndexApplyMetrics, +}; +use crate::sst::index::inverted_index::applier::{ + InvertedIndexApplierRef, InvertedIndexApplyMetrics, +}; use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete, }; @@ -537,9 +543,12 @@ impl ParquetReaderBuilder { // Slow path: apply the index from the file. let file_size_hint = self.file_handle.meta_ref().index_file_size(); - // TODO(yingwen): Collect applier metrics in verbose mode. let apply_res = index_applier - .apply_fine(self.file_handle.file_id(), Some(file_size_hint), None) + .apply_fine( + self.file_handle.file_id(), + Some(file_size_hint), + metrics.fulltext_index_apply_metrics.as_mut(), + ) .await; let selection = match apply_res { Ok(Some(res)) => { @@ -606,15 +615,18 @@ impl ParquetReaderBuilder { // Slow path: apply the index from the file. let file_size_hint = self.file_handle.meta_ref().index_file_size(); - // TODO(yingwen): Collect applier metrics in verbose mode. let apply_res = index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint), None) + .apply( + self.file_handle.file_id(), + Some(file_size_hint), + metrics.inverted_index_apply_metrics.as_mut(), + ) .await; let selection = match apply_res { - Ok(output) => RowGroupSelection::from_inverted_index_apply_output( + Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output( row_group_size, num_row_groups, - output, + apply_output, ), Err(err) => { handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED); @@ -682,9 +694,13 @@ impl ParquetReaderBuilder { .unwrap_or(true), ) }); - // TODO(yingwen): Collect metrics for applier let apply_res = index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint), rgs, None) + .apply( + self.file_handle.file_id(), + Some(file_size_hint), + rgs, + metrics.bloom_filter_apply_metrics.as_mut(), + ) .await; let mut selection = match apply_res { Ok(apply_output) => { @@ -761,9 +777,13 @@ impl ParquetReaderBuilder { .unwrap_or(true), ) }); - // TODO(yingwen): Collect applier metrics in verbose mode. let apply_res = index_applier - .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs, None) + .apply_coarse( + self.file_handle.file_id(), + Some(file_size_hint), + rgs, + metrics.fulltext_index_apply_metrics.as_mut(), + ) .await; let mut selection = match apply_res { Ok(Some(apply_output)) => { @@ -907,7 +927,7 @@ fn all_required_row_groups_searched( } /// Metrics of filtering rows groups and rows. -#[derive(Debug, Default, Clone, Copy)] +#[derive(Debug, Default, Clone)] pub(crate) struct ReaderFilterMetrics { /// Number of row groups before filtering. pub(crate) rg_total: usize, @@ -930,6 +950,13 @@ pub(crate) struct ReaderFilterMetrics { pub(crate) rows_bloom_filtered: usize, /// Number of rows filtered by precise filter. pub(crate) rows_precise_filtered: usize, + + /// Optional metrics for inverted index applier. + pub(crate) inverted_index_apply_metrics: Option, + /// Optional metrics for bloom filter index applier. + pub(crate) bloom_filter_apply_metrics: Option, + /// Optional metrics for fulltext index applier. + pub(crate) fulltext_index_apply_metrics: Option, } impl ReaderFilterMetrics { @@ -946,6 +973,23 @@ impl ReaderFilterMetrics { self.rows_inverted_filtered += other.rows_inverted_filtered; self.rows_bloom_filtered += other.rows_bloom_filtered; self.rows_precise_filtered += other.rows_precise_filtered; + + // Merge optional applier metrics + if let Some(other_metrics) = &other.inverted_index_apply_metrics { + self.inverted_index_apply_metrics + .get_or_insert_with(Default::default) + .merge_from(other_metrics); + } + if let Some(other_metrics) = &other.bloom_filter_apply_metrics { + self.bloom_filter_apply_metrics + .get_or_insert_with(Default::default) + .merge_from(other_metrics); + } + if let Some(other_metrics) = &other.fulltext_index_apply_metrics { + self.fulltext_index_apply_metrics + .get_or_insert_with(Default::default) + .merge_from(other_metrics); + } } /// Reports metrics.