diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 32f62a5775..b88d32ebf4 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -36,7 +36,7 @@ const BLOOM_META_LEN_SIZE: u64 = 4; pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB /// Metrics for bloom filter read operations. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct BloomFilterReadMetrics { /// Total byte size to read. pub total_bytes: u64, diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index b15554fcb7..182df92e86 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -31,7 +31,7 @@ mod blob; mod footer; /// Metrics for inverted index read operations. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct InvertedIndexReadMetrics { /// Total byte size to read. pub total_bytes: u64, diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index a6c4bd524f..31106013d3 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -22,7 +22,9 @@ use std::time::Instant; use common_base::range_read::RangeReader; use common_telemetry::warn; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; -use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; +use index::bloom_filter::reader::{ + BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl, +}; use index::target::IndexTarget; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; @@ -57,6 +59,8 @@ pub struct BloomFilterIndexApplyMetrics { pub blob_cache_miss: usize, /// Total size of blobs read (in bytes). pub blob_read_bytes: u64, + /// Metrics for bloom filter read operations. + pub read_metrics: BloomFilterReadMetrics, } pub(crate) type BloomFilterIndexApplierRef = Arc; @@ -201,12 +205,12 @@ impl BloomFilterIndexApplier { BloomFilterReaderImpl::new(blob), bloom_filter_cache.clone(), ); - self.apply_predicates(reader, predicates, &mut output) + self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut()) .await .context(ApplyBloomFilterIndexSnafu)?; } else { let reader = BloomFilterReaderImpl::new(blob); - self.apply_predicates(reader, predicates, &mut output) + self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut()) .await .context(ApplyBloomFilterIndexSnafu)?; } @@ -354,6 +358,7 @@ impl BloomFilterIndexApplier { reader: R, predicates: &[InListPredicate], output: &mut [(usize, Vec>)], + mut metrics: Option<&mut BloomFilterIndexApplyMetrics>, ) -> std::result::Result<(), index::bloom_filter::error::Error> { let mut applier = BloomFilterApplier::new(Box::new(reader)).await?; @@ -363,7 +368,10 @@ impl BloomFilterIndexApplier { continue; } - *row_group_output = applier.search(predicates, row_group_output, None).await?; + let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics); + *row_group_output = applier + .search(predicates, row_group_output, read_metrics) + .await?; } Ok(()) diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 3c50673505..a8fcf88220 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -21,7 +21,7 @@ use std::time::Instant; use common_base::range_read::RangeReader; use common_telemetry::warn; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; -use index::bloom_filter::reader::BloomFilterReaderImpl; +use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl}; use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer}; use index::fulltext_index::{Analyzer, Config}; @@ -61,6 +61,8 @@ pub struct FulltextIndexApplyMetrics { pub apply_elapsed: std::time::Duration, /// Number of blob cache misses. pub blob_cache_miss: usize, + /// Metrics for bloom filter reads. + pub bloom_filter_read_metrics: BloomFilterReadMetrics, } /// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files @@ -330,7 +332,7 @@ impl FulltextIndexApplier { column_id: ColumnId, terms: &[FulltextTerm], output: &mut [(usize, Vec>)], - metrics: Option<&mut FulltextIndexApplyMetrics>, + mut metrics: Option<&mut FulltextIndexApplyMetrics>, ) -> Result { let blob_key = format!( "{INDEX_BLOB_TYPE_BLOOM}-{}", @@ -338,7 +340,7 @@ impl FulltextIndexApplier { ); let Some(reader) = self .index_source - .blob(file_id, &blob_key, file_size_hint, metrics) + .blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut()) .await? else { return Ok(false); @@ -380,9 +382,14 @@ impl FulltextIndexApplier { continue; } - // TODO(yingwen): Update reader metrics. *row_group_output = applier - .search(&predicates, row_group_output, None) + .search( + &predicates, + row_group_output, + metrics + .as_deref_mut() + .map(|m| &mut m.bloom_filter_read_metrics), + ) .await .context(ApplyBloomFilterIndexSnafu)?; } diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 5e324616ff..b4a8034c3a 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -20,7 +20,7 @@ use std::time::Instant; use common_base::range_read::RangeReader; use common_telemetry::warn; -use index::inverted_index::format::reader::InvertedIndexBlobReader; +use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics}; use index::inverted_index::search::index_apply::{ ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, }; @@ -54,6 +54,8 @@ pub struct InvertedIndexApplyMetrics { pub blob_cache_miss: usize, /// Total size of blobs read (in bytes). pub blob_read_bytes: u64, + /// Metrics for inverted index reads. + pub inverted_index_read_metrics: InvertedIndexReadMetrics, } /// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files @@ -146,7 +148,7 @@ impl InvertedIndexApplier { &self, file_id: RegionFileId, file_size_hint: Option, - metrics: Option<&mut InvertedIndexApplyMetrics>, + mut metrics: Option<&mut InvertedIndexApplyMetrics>, ) -> Result { let start = Instant::now(); @@ -177,13 +179,25 @@ impl InvertedIndexApplier { index_cache.clone(), ); self.index_applier - .apply(context, &mut index_reader, None) + .apply( + context, + &mut index_reader, + metrics + .as_deref_mut() + .map(|m| &mut m.inverted_index_read_metrics), + ) .await .context(ApplyInvertedIndexSnafu) } else { let mut index_reader = InvertedIndexBlobReader::new(blob); self.index_applier - .apply(context, &mut index_reader, None) + .apply( + context, + &mut index_reader, + metrics + .as_deref_mut() + .map(|m| &mut m.inverted_index_read_metrics), + ) .await .context(ApplyInvertedIndexSnafu) };