From 7a6befcad3ffbae2c58a0ab627cca06a746bff5a Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 20 Nov 2025 23:03:49 +0800 Subject: [PATCH] feat: collect read metrics for inverted index Signed-off-by: evenyag --- src/index/src/bloom_filter/applier.rs | 10 +- src/index/src/inverted_index/format/reader.rs | 72 ++++++++++++--- .../src/inverted_index/format/reader/blob.rs | 47 ++++++++-- .../src/inverted_index/format/writer/blob.rs | 44 +++++++-- .../search/fst_values_mapper.rs | 71 +++++++++------ .../src/inverted_index/search/index_apply.rs | 5 +- .../search/index_apply/predicates_apply.rs | 87 ++++++++++-------- src/mito2/src/cache/index/inverted_index.rs | 91 ++++++++++++++++--- .../src/sst/index/fulltext_index/applier.rs | 9 +- .../src/sst/index/fulltext_index/creator.rs | 7 +- .../src/sst/index/inverted_index/applier.rs | 6 +- 11 files changed, 329 insertions(+), 120 deletions(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 6217b642bb..ec1ee26113 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -110,7 +110,10 @@ impl BloomFilterApplier { .map(|i| self.meta.bloom_filter_locs[i as usize]) .collect::>(); - let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs, metrics).await?; + let bloom_filters = self + .reader + .bloom_filter_vec(&bloom_filter_locs, metrics) + .await?; Ok((segment_locations, bloom_filters)) } @@ -424,7 +427,10 @@ mod tests { ]; for (predicates, search_range, expected) in cases { - let result = applier.search(&predicates, &[search_range], None).await.unwrap(); + let result = applier + .search(&predicates, &[search_range], None) + .await + .unwrap(); assert_eq!( result, expected, "Expected {:?}, got {:?}", diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 40fa22130a..d8b492a8e5 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; @@ -29,19 +30,44 @@ pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader; mod blob; mod footer; +/// Metrics for inverted index read operations. +#[derive(Debug, Default)] +pub struct InvertedIndexReadMetrics { + /// Total byte size to read. + pub total_bytes: u64, + /// Total number of ranges to read. + pub total_ranges: usize, + /// Elapsed time of the read_vec operation. + pub elapsed: Duration, +} + /// InvertedIndexReader defines an asynchronous reader of inverted index data #[mockall::automock] #[async_trait] pub trait InvertedIndexReader: Send + Sync { /// Seeks to given offset and reads data with exact size as provided. - async fn range_read(&self, offset: u64, size: u32) -> Result>; + async fn range_read<'a>( + &self, + offset: u64, + size: u32, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result>; /// Reads the bytes in the given ranges. - async fn read_vec(&self, ranges: &[Range]) -> Result> { + async fn read_vec<'a>( + &self, + ranges: &[Range], + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let mut metrics = metrics; let mut result = Vec::with_capacity(ranges.len()); for range in ranges { let data = self - .range_read(range.start, (range.end - range.start) as u32) + .range_read( + range.start, + (range.end - range.start) as u32, + metrics.as_deref_mut(), + ) .await?; result.push(Bytes::from(data)); } @@ -52,14 +78,24 @@ pub trait InvertedIndexReader: Send + Sync { async fn metadata(&self) -> Result>; /// Retrieves the finite state transducer (FST) map from the given offset and size. - async fn fst(&self, offset: u64, size: u32) -> Result { - let fst_data = self.range_read(offset, size).await?; + async fn fst<'a>( + &self, + offset: u64, + size: u32, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result { + let fst_data = self.range_read(offset, size, metrics).await?; FstMap::new(fst_data).context(DecodeFstSnafu) } /// Retrieves the multiple finite state transducer (FST) maps from the given ranges. - async fn fst_vec(&mut self, ranges: &[Range]) -> Result> { - self.read_vec(ranges) + async fn fst_vec<'a>( + &mut self, + ranges: &[Range], + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let mut metrics = metrics; + self.read_vec(ranges, metrics.as_deref_mut()) .await? .into_iter() .map(|bytes| FstMap::new(bytes.to_vec()).context(DecodeFstSnafu)) @@ -67,19 +103,29 @@ pub trait InvertedIndexReader: Send + Sync { } /// Retrieves the bitmap from the given offset and size. - async fn bitmap(&self, offset: u64, size: u32, bitmap_type: BitmapType) -> Result { - self.range_read(offset, size).await.and_then(|bytes| { - Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu) - }) + async fn bitmap<'a>( + &self, + offset: u64, + size: u32, + bitmap_type: BitmapType, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result { + self.range_read(offset, size, metrics) + .await + .and_then(|bytes| { + Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu) + }) } /// Retrieves the multiple bitmaps from the given ranges. - async fn bitmap_deque( + async fn bitmap_deque<'a>( &mut self, ranges: &[(Range, BitmapType)], + metrics: Option<&'a mut InvertedIndexReadMetrics>, ) -> Result> { + let mut metrics = metrics; let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip(); - let bytes = self.read_vec(&ranges).await?; + let bytes = self.read_vec(&ranges, metrics.as_deref_mut()).await?; bytes .into_iter() .zip(types) diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index f48791e8f4..1acdb74d8c 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -14,6 +14,7 @@ use std::ops::Range; use std::sync::Arc; +use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; @@ -23,10 +24,10 @@ use snafu::{ResultExt, ensure}; use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu}; use crate::inverted_index::format::MIN_BLOB_SIZE; -use crate::inverted_index::format::reader::InvertedIndexReader; use crate::inverted_index::format::reader::footer::{ DEFAULT_PREFETCH_SIZE, InvertedIndexFooterReader, }; +use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; /// Inverted index blob reader, implements [`InvertedIndexReader`] pub struct InvertedIndexBlobReader { @@ -53,17 +54,45 @@ impl InvertedIndexBlobReader { #[async_trait] impl InvertedIndexReader for InvertedIndexBlobReader { - async fn range_read(&self, offset: u64, size: u32) -> Result> { + async fn range_read<'a>( + &self, + offset: u64, + size: u32, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let buf = self .source .read(offset..offset + size as u64) .await .context(CommonIoSnafu)?; + + if let Some(m) = metrics { + m.total_bytes += size as u64; + m.total_ranges += 1; + m.elapsed += start.unwrap().elapsed(); + } + Ok(buf.into()) } - async fn read_vec(&self, ranges: &[Range]) -> Result> { - self.source.read_vec(ranges).await.context(CommonIoSnafu) + async fn read_vec<'a>( + &self, + ranges: &[Range], + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + + let result = self.source.read_vec(ranges).await.context(CommonIoSnafu)?; + + if let Some(m) = metrics { + m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::(); + m.total_ranges += ranges.len(); + m.elapsed += start.unwrap().elapsed(); + } + + Ok(result) } async fn metadata(&self) -> Result> { @@ -207,6 +236,7 @@ mod tests { .fst( meta.base_offset + meta.relative_fst_offset as u64, meta.fst_size, + None, ) .await .unwrap(); @@ -219,6 +249,7 @@ mod tests { .fst( meta.base_offset + meta.relative_fst_offset as u64, meta.fst_size, + None, ) .await .unwrap(); @@ -236,12 +267,12 @@ mod tests { let meta = metas.metas.get("tag0").unwrap(); let bitmap = blob_reader - .bitmap(meta.base_offset, 26, BitmapType::Roaring) + .bitmap(meta.base_offset, 26, BitmapType::Roaring, None) .await .unwrap(); assert_eq!(bitmap, mock_bitmap()); let bitmap = blob_reader - .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring) + .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None) .await .unwrap(); assert_eq!(bitmap, mock_bitmap()); @@ -250,12 +281,12 @@ mod tests { let meta = metas.metas.get("tag1").unwrap(); let bitmap = blob_reader - .bitmap(meta.base_offset, 26, BitmapType::Roaring) + .bitmap(meta.base_offset, 26, BitmapType::Roaring, None) .await .unwrap(); assert_eq!(bitmap, mock_bitmap()); let bitmap = blob_reader - .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring) + .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None) .await .unwrap(); assert_eq!(bitmap, mock_bitmap()); diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index 5991284869..32a4219179 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -198,13 +198,19 @@ mod tests { .fst( tag0.base_offset + tag0.relative_fst_offset as u64, tag0.fst_size, + None, ) .await .unwrap(); assert_eq!(fst0.len(), 3); let [offset, size] = unpack(fst0.get(b"a").unwrap()); let bitmap = reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -213,7 +219,12 @@ mod tests { ); let [offset, size] = unpack(fst0.get(b"b").unwrap()); let bitmap = reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -222,7 +233,12 @@ mod tests { ); let [offset, size] = unpack(fst0.get(b"c").unwrap()); let bitmap = reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -241,13 +257,19 @@ mod tests { .fst( tag1.base_offset + tag1.relative_fst_offset as u64, tag1.fst_size, + None, ) .await .unwrap(); assert_eq!(fst1.len(), 3); let [offset, size] = unpack(fst1.get(b"x").unwrap()); let bitmap = reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -256,7 +278,12 @@ mod tests { ); let [offset, size] = unpack(fst1.get(b"y").unwrap()); let bitmap = reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -265,7 +292,12 @@ mod tests { ); let [offset, size] = unpack(fst1.get(b"z").unwrap()); let bitmap = reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( diff --git a/src/index/src/inverted_index/search/fst_values_mapper.rs b/src/index/src/inverted_index/search/fst_values_mapper.rs index f9c15c40d8..91078623cb 100644 --- a/src/index/src/inverted_index/search/fst_values_mapper.rs +++ b/src/index/src/inverted_index/search/fst_values_mapper.rs @@ -16,7 +16,7 @@ use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta}; use crate::bitmap::Bitmap; use crate::inverted_index::error::Result; -use crate::inverted_index::format::reader::InvertedIndexReader; +use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; /// `ParallelFstValuesMapper` enables parallel mapping of multiple FST value groups to their /// corresponding bitmaps within an inverted index. @@ -33,10 +33,12 @@ impl<'a> ParallelFstValuesMapper<'a> { Self { reader } } - pub async fn map_values_vec( + pub async fn map_values_vec<'b>( &mut self, - value_and_meta_vec: &[(Vec, &'a InvertedIndexMeta)], + value_and_meta_vec: &[(Vec, &'b InvertedIndexMeta)], + metrics: Option<&mut InvertedIndexReadMetrics>, ) -> Result> { + let mut metrics = metrics; let groups = value_and_meta_vec .iter() .map(|(values, _)| values.len()) @@ -64,7 +66,10 @@ impl<'a> ParallelFstValuesMapper<'a> { } common_telemetry::debug!("fetch ranges: {:?}", fetch_ranges); - let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges).await?; + let mut bitmaps = self + .reader + .bitmap_deque(&fetch_ranges, metrics.as_deref_mut()) + .await?; let mut output = Vec::with_capacity(groups.len()); for counter in groups { @@ -95,23 +100,25 @@ mod tests { #[tokio::test] async fn test_map_values_vec() { let mut mock_reader = MockInvertedIndexReader::new(); - mock_reader.expect_bitmap_deque().returning(|ranges| { - let mut output = VecDeque::new(); - for (range, bitmap_type) in ranges { - let offset = range.start; - let size = range.end - range.start; - match (offset, size, bitmap_type) { - (1, 1, BitmapType::Roaring) => { - output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + mock_reader + .expect_bitmap_deque() + .returning(|ranges, _metrics| { + let mut output = VecDeque::new(); + for (range, bitmap_type) in ranges { + let offset = range.start; + let size = range.end - range.start; + match (offset, size, bitmap_type) { + (1, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + } + (2, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type)) + } + _ => unreachable!(), } - (2, 1, BitmapType::Roaring) => { - output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type)) - } - _ => unreachable!(), } - } - Ok(output) - }); + Ok(output) + }); let meta = InvertedIndexMeta { bitmap_type: BitmapType::Roaring.into(), @@ -120,13 +127,13 @@ mod tests { let mut values_mapper = ParallelFstValuesMapper::new(&mut mock_reader); let result = values_mapper - .map_values_vec(&[(vec![], &meta)]) + .map_values_vec(&[(vec![], &meta)], None) .await .unwrap(); assert_eq!(result[0].count_ones(), 0); let result = values_mapper - .map_values_vec(&[(vec![value(1, 1)], &meta)]) + .map_values_vec(&[(vec![value(1, 1)], &meta)], None) .await .unwrap(); assert_eq!( @@ -135,7 +142,7 @@ mod tests { ); let result = values_mapper - .map_values_vec(&[(vec![value(2, 1)], &meta)]) + .map_values_vec(&[(vec![value(2, 1)], &meta)], None) .await .unwrap(); assert_eq!( @@ -144,7 +151,7 @@ mod tests { ); let result = values_mapper - .map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)]) + .map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)], None) .await .unwrap(); assert_eq!( @@ -153,7 +160,7 @@ mod tests { ); let result = values_mapper - .map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)]) + .map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)], None) .await .unwrap(); assert_eq!( @@ -162,7 +169,10 @@ mod tests { ); let result = values_mapper - .map_values_vec(&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)]) + .map_values_vec( + &[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)], + None, + ) .await .unwrap(); assert_eq!( @@ -174,10 +184,13 @@ mod tests { Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring) ); let result = values_mapper - .map_values_vec(&[ - (vec![value(2, 1), value(1, 1)], &meta), - (vec![value(1, 1)], &meta), - ]) + .map_values_vec( + &[ + (vec![value(2, 1), value(1, 1)], &meta), + (vec![value(1, 1)], &meta), + ], + None, + ) .await .unwrap(); assert_eq!( diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs index a80f102e02..02a1f96450 100644 --- a/src/index/src/inverted_index/search/index_apply.rs +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -19,7 +19,7 @@ pub use predicates_apply::PredicatesIndexApplier; use crate::bitmap::Bitmap; use crate::inverted_index::error::Result; -use crate::inverted_index::format::reader::InvertedIndexReader; +use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; /// The output of an apply operation. #[derive(Clone, Debug, PartialEq)] @@ -44,10 +44,11 @@ pub trait IndexApplier: Send + Sync { /// Applies the predefined predicates to the data read by the given index reader, returning /// a list of relevant indices (e.g., post IDs, group IDs, row IDs). #[allow(unused_parens)] - async fn apply<'a>( + async fn apply<'a, 'b>( &self, context: SearchContext, reader: &mut (dyn InvertedIndexReader + 'a), + metrics: Option<&'b mut InvertedIndexReadMetrics>, ) -> Result; /// Returns the memory usage of the applier. diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs index ae22e79c74..0b5297b8b0 100644 --- a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -19,7 +19,7 @@ use greptime_proto::v1::index::InvertedIndexMetas; use crate::bitmap::Bitmap; use crate::inverted_index::error::{IndexNotFoundSnafu, Result}; -use crate::inverted_index::format::reader::InvertedIndexReader; +use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; use crate::inverted_index::search::fst_apply::{ FstApplier, IntersectionFstApplier, KeysFstApplier, }; @@ -43,11 +43,13 @@ pub struct PredicatesIndexApplier { impl IndexApplier for PredicatesIndexApplier { /// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual /// bitmaps obtained for each index to result in a final set of indices. - async fn apply<'a>( + async fn apply<'a, 'b>( &self, context: SearchContext, reader: &mut (dyn InvertedIndexReader + 'a), + metrics: Option<&'b mut InvertedIndexReadMetrics>, ) -> Result { + let mut metrics = metrics; let metadata = reader.metadata().await?; let mut output = ApplyOutput { matched_segment_ids: Bitmap::new_bitvec(), @@ -84,7 +86,7 @@ impl IndexApplier for PredicatesIndexApplier { return Ok(output); } - let fsts = reader.fst_vec(&fst_ranges).await?; + let fsts = reader.fst_vec(&fst_ranges, metrics.as_deref_mut()).await?; let value_and_meta_vec = fsts .into_iter() .zip(appliers) @@ -92,7 +94,9 @@ impl IndexApplier for PredicatesIndexApplier { .collect::>(); let mut mapper = ParallelFstValuesMapper::new(reader); - let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?; + let mut bm_vec = mapper + .map_values_vec(&value_and_meta_vec, metrics.as_deref_mut()) + .await?; let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty for bm in bm_vec { @@ -222,25 +226,27 @@ mod tests { mock_reader .expect_metadata() .returning(|| Ok(mock_metas([("tag-0", 0)]))); - mock_reader.expect_fst_vec().returning(|_ranges| { + mock_reader.expect_fst_vec().returning(|_ranges, _metrics| { Ok(vec![ FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(), ]) }); - mock_reader.expect_bitmap_deque().returning(|arg| { - assert_eq!(arg.len(), 1); - let range = &arg[0].0; - let bitmap_type = arg[0].1; - assert_eq!(*range, 2..3); - assert_eq!(bitmap_type, BitmapType::Roaring); - Ok(VecDeque::from([Bitmap::from_lsb0_bytes( - &[0b10101010], - bitmap_type, - )])) - }); + mock_reader + .expect_bitmap_deque() + .returning(|arg, _metrics| { + assert_eq!(arg.len(), 1); + let range = &arg[0].0; + let bitmap_type = arg[0].1; + assert_eq!(*range, 2..3); + assert_eq!(bitmap_type, BitmapType::Roaring); + Ok(VecDeque::from([Bitmap::from_lsb0_bytes( + &[0b10101010], + bitmap_type, + )])) + }); let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert_eq!( @@ -253,13 +259,13 @@ mod tests { mock_reader .expect_metadata() .returning(|| Ok(mock_metas([("tag-0", 0)]))); - mock_reader.expect_fst_vec().returning(|_range| { + mock_reader.expect_fst_vec().returning(|_range, _metrics| { Ok(vec![ FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(), ]) }); let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert_eq!(output.matched_segment_ids.count_ones(), 0); @@ -280,7 +286,7 @@ mod tests { mock_reader .expect_metadata() .returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)]))); - mock_reader.expect_fst_vec().returning(|ranges| { + mock_reader.expect_fst_vec().returning(|ranges, _metrics| { let mut output = vec![]; for range in ranges { match range.start { @@ -293,27 +299,29 @@ mod tests { } Ok(output) }); - mock_reader.expect_bitmap_deque().returning(|ranges| { - let mut output = VecDeque::new(); - for (range, bitmap_type) in ranges { - let offset = range.start; - let size = range.end - range.start; - match (offset, size, bitmap_type) { - (1, 1, BitmapType::Roaring) => { - output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + mock_reader + .expect_bitmap_deque() + .returning(|ranges, _metrics| { + let mut output = VecDeque::new(); + for (range, bitmap_type) in ranges { + let offset = range.start; + let size = range.end - range.start; + match (offset, size, bitmap_type) { + (1, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + } + (2, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type)) + } + _ => unreachable!(), } - (2, 1, BitmapType::Roaring) => { - output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type)) - } - _ => unreachable!(), } - } - Ok(output) - }); + Ok(output) + }); let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert_eq!( @@ -334,7 +342,7 @@ mod tests { .returning(|| Ok(mock_metas([("tag-0", 0)]))); let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan @@ -359,7 +367,7 @@ mod tests { }; let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert!(output.matched_segment_ids.is_empty()); @@ -385,6 +393,7 @@ mod tests { index_not_found_strategy: IndexNotFoundStrategy::ThrowError, }, &mut mock_reader, + None, ) .await; assert!(matches!(result, Err(Error::IndexNotFound { .. }))); @@ -395,6 +404,7 @@ mod tests { index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, }, &mut mock_reader, + None, ) .await .unwrap(); @@ -406,6 +416,7 @@ mod tests { index_not_found_strategy: IndexNotFoundStrategy::Ignore, }, &mut mock_reader, + None, ) .await .unwrap(); diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 06a7a3f6d4..c85aa2548a 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -14,12 +14,13 @@ use core::ops::Range; use std::sync::Arc; +use std::time::Instant; use api::v1::index::InvertedIndexMetas; use async_trait::async_trait; use bytes::Bytes; use index::inverted_index::error::Result; -use index::inverted_index::format::reader::InvertedIndexReader; +use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; use prost::Message; use store_api::storage::FileId; @@ -83,20 +84,42 @@ impl CachedInvertedIndexBlobReader { #[async_trait] impl InvertedIndexReader for CachedInvertedIndexBlobReader { - async fn range_read(&self, offset: u64, size: u32) -> Result> { + async fn range_read<'a>( + &self, + offset: u64, + size: u32, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let inner = &self.inner; - self.cache + let result = self + .cache .get_or_load( self.file_id, self.blob_size, offset, size, - move |ranges| async move { inner.read_vec(&ranges).await }, + move |ranges| async move { inner.read_vec(&ranges, None).await }, ) - .await + .await?; + + if let Some(m) = metrics { + m.total_bytes += size as u64; + m.total_ranges += 1; + m.elapsed += start.unwrap().elapsed(); + } + + Ok(result) } - async fn read_vec(&self, ranges: &[Range]) -> Result> { + async fn read_vec<'a>( + &self, + ranges: &[Range], + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let mut pages = Vec::with_capacity(ranges.len()); for range in ranges { let inner = &self.inner; @@ -107,13 +130,19 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead self.blob_size, range.start, (range.end - range.start) as u32, - move |ranges| async move { inner.read_vec(&ranges).await }, + move |ranges| async move { inner.read_vec(&ranges, None).await }, ) .await?; pages.push(Bytes::from(page)); } + if let Some(m) = metrics { + m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::(); + m.total_ranges += ranges.len(); + m.elapsed += start.unwrap().elapsed(); + } + Ok(pages) } @@ -292,13 +321,19 @@ mod test { .fst( tag0.base_offset + tag0.relative_fst_offset as u64, tag0.fst_size, + None, ) .await .unwrap(); assert_eq!(fst0.len(), 3); let [offset, size] = unpack(fst0.get(b"a").unwrap()); let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -307,7 +342,12 @@ mod test { ); let [offset, size] = unpack(fst0.get(b"b").unwrap()); let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -316,7 +356,12 @@ mod test { ); let [offset, size] = unpack(fst0.get(b"c").unwrap()); let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -335,13 +380,19 @@ mod test { .fst( tag1.base_offset + tag1.relative_fst_offset as u64, tag1.fst_size, + None, ) .await .unwrap(); assert_eq!(fst1.len(), 3); let [offset, size] = unpack(fst1.get(b"x").unwrap()); let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -350,7 +401,12 @@ mod test { ); let [offset, size] = unpack(fst1.get(b"y").unwrap()); let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -359,7 +415,12 @@ mod test { ); let [offset, size] = unpack(fst1.get(b"z").unwrap()); let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -372,7 +433,7 @@ mod test { for _ in 0..FUZZ_REPEAT_TIMES { let offset = rng.random_range(0..file_size); let size = rng.random_range(0..file_size as u32 - offset as u32); - let expected = cached_reader.range_read(offset, size).await.unwrap(); + let expected = cached_reader.range_read(offset, size, None).await.unwrap(); let inner = &cached_reader.inner; let read = cached_reader .cache @@ -381,7 +442,7 @@ mod test { file_size, offset, size, - |ranges| async move { inner.read_vec(&ranges).await }, + |ranges| async move { inner.read_vec(&ranges, None).await }, ) .await .unwrap(); diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 72e2c528a0..3c50673505 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -154,7 +154,13 @@ impl FulltextIndexApplier { } let Some(result) = self - .apply_fine_one_column(file_size_hint, file_id, *column_id, request, metrics.as_deref_mut()) + .apply_fine_one_column( + file_size_hint, + file_id, + *column_id, + request, + metrics.as_deref_mut(), + ) .await? else { continue; @@ -374,6 +380,7 @@ impl FulltextIndexApplier { continue; } + // TODO(yingwen): Update reader metrics. *row_group_output = applier .search(&predicates, row_group_output, None) .await diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index a26147856c..32ad178d3b 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -723,9 +723,10 @@ mod tests { let backend = backend.clone(); async move { match backend { - FulltextBackend::Tantivy => { - applier.apply_fine(region_file_id, None, None).await.unwrap() - } + FulltextBackend::Tantivy => applier + .apply_fine(region_file_id, None, None) + .await + .unwrap(), FulltextBackend::Bloom => { let coarse_mask = coarse_mask.unwrap_or_default(); let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i])); diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 0cbb8386d9..5e324616ff 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -177,13 +177,13 @@ impl InvertedIndexApplier { index_cache.clone(), ); self.index_applier - .apply(context, &mut index_reader) + .apply(context, &mut index_reader, None) .await .context(ApplyInvertedIndexSnafu) } else { let mut index_reader = InvertedIndexBlobReader::new(blob); self.index_applier - .apply(context, &mut index_reader) + .apply(context, &mut index_reader, None) .await .context(ApplyInvertedIndexSnafu) }; @@ -314,7 +314,7 @@ mod tests { let mut mock_index_applier = MockIndexApplier::new(); mock_index_applier.expect_memory_usage().returning(|| 100); - mock_index_applier.expect_apply().returning(|_, _| { + mock_index_applier.expect_apply().returning(|_, _, _| { Ok(ApplyOutput { matched_segment_ids: Bitmap::new_bitvec(), total_row_count: 100,