From d6c75ec55f1b56e174c75887a5e2f586e2b5ce5d Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 20 Nov 2025 21:00:29 +0800 Subject: [PATCH] feat: implement BloomFilterReadMetrics for BloomFilterReader Signed-off-by: evenyag --- src/index/src/bloom_filter/applier.rs | 10 ++-- src/index/src/bloom_filter/reader.rs | 56 +++++++++++++++++-- .../src/cache/index/bloom_filter_index.rs | 23 ++++++-- .../src/sst/index/bloom_filter/applier.rs | 2 +- .../src/sst/index/fulltext_index/applier.rs | 2 +- 5 files changed, 78 insertions(+), 15 deletions(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 18332d4815..6217b642bb 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use crate::Bytes; use crate::bloom_filter::error::Result; -use crate::bloom_filter::reader::BloomFilterReader; +use crate::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader}; /// `InListPredicate` contains a list of acceptable values. A value needs to match at least /// one of the elements (logical OR semantic) for the predicate to be satisfied. @@ -50,6 +50,7 @@ impl BloomFilterApplier { &mut self, predicates: &[InListPredicate], search_ranges: &[Range], + metrics: Option<&mut BloomFilterReadMetrics>, ) -> Result>> { if predicates.is_empty() { // If no predicates, return empty result @@ -57,7 +58,7 @@ impl BloomFilterApplier { } let segments = self.row_ranges_to_segments(search_ranges); - let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?; + let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments, metrics).await?; let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates); Ok(intersect_ranges(search_ranges, &matching_row_ranges)) } @@ -95,6 +96,7 @@ impl BloomFilterApplier { async fn load_bloom_filters( &mut self, segments: &[usize], + metrics: Option<&mut BloomFilterReadMetrics>, ) -> Result<(Vec<(u64, usize)>, Vec)> { let segment_locations = segments .iter() @@ -108,7 +110,7 @@ impl BloomFilterApplier { .map(|i| self.meta.bloom_filter_locs[i as usize]) .collect::>(); - let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?; + let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs, metrics).await?; Ok((segment_locations, bloom_filters)) } @@ -422,7 +424,7 @@ mod tests { ]; for (predicates, search_range, expected) in cases { - let result = applier.search(&predicates, &[search_range]).await.unwrap(); + let result = applier.search(&predicates, &[search_range], None).await.unwrap(); assert_eq!( result, expected, "Expected {:?}, got {:?}", diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 466024f0d7..9df1ad6bad 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::ops::{Range, Rem}; +use std::time::{Duration, Instant}; use async_trait::async_trait; use bytemuck::try_cast_slice; @@ -34,6 +35,17 @@ const BLOOM_META_LEN_SIZE: u64 = 4; /// Default prefetch size of bloom filter meta. pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB +/// Metrics for bloom filter read operations. +#[derive(Debug, Default)] +pub struct BloomFilterReadMetrics { + /// Total byte size to read. + pub total_bytes: u64, + /// Total number of ranges to read. + pub total_ranges: usize, + /// Elapsed time of the read_vec operation. + pub elapsed: Duration, +} + /// Safely converts bytes to Vec using bytemuck for optimal performance. /// Faster than chunking and converting each piece individually. /// @@ -82,13 +94,28 @@ pub trait BloomFilterReader: Sync { async fn range_read(&self, offset: u64, size: u32) -> Result; /// Reads bunch of ranges from the file. - async fn read_vec(&self, ranges: &[Range]) -> Result> { + async fn read_vec( + &self, + ranges: &[Range], + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let mut results = Vec::with_capacity(ranges.len()); for range in ranges { let size = (range.end - range.start) as u32; let data = self.range_read(range.start, size).await?; results.push(data); } + + if let Some(m) = metrics { + m.total_ranges += ranges.len(); + m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::(); + if let Some(start) = start { + m.elapsed += start.elapsed(); + } + } + Ok(results) } @@ -105,12 +132,16 @@ pub trait BloomFilterReader: Sync { Ok(bm) } - async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result> { + async fn bloom_filter_vec( + &self, + locs: &[BloomFilterLoc], + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result> { let ranges = locs .iter() .map(|l| l.offset..l.offset + l.size) .collect::>(); - let bss = self.read_vec(&ranges).await?; + let bss = self.read_vec(&ranges, metrics).await?; let mut result = Vec::with_capacity(bss.len()); for (bs, loc) in bss.into_iter().zip(locs.iter()) { @@ -147,8 +178,23 @@ impl BloomFilterReader for BloomFilterReaderImpl { .context(IoSnafu) } - async fn read_vec(&self, ranges: &[Range]) -> Result> { - self.reader.read_vec(ranges).await.context(IoSnafu) + async fn read_vec( + &self, + ranges: &[Range], + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let result = self.reader.read_vec(ranges).await.context(IoSnafu)?; + + if let Some(m) = metrics { + m.total_ranges += ranges.len(); + m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::(); + if let Some(start) = start { + m.elapsed += start.elapsed(); + } + } + + Ok(result) } async fn metadata(&self) -> Result { diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index b4e7804b93..e25e324ab2 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -14,12 +14,13 @@ use std::ops::Range; use std::sync::Arc; +use std::time::Instant; use api::v1::index::{BloomFilterLoc, BloomFilterMeta}; use async_trait::async_trait; use bytes::Bytes; use index::bloom_filter::error::Result; -use index::bloom_filter::reader::BloomFilterReader; +use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader}; use store_api::storage::{ColumnId, FileId}; use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey}; @@ -122,13 +123,19 @@ impl BloomFilterReader for CachedBloomFilterIndexBl self.blob_size, offset, size, - move |ranges| async move { inner.read_vec(&ranges).await }, + move |ranges| async move { inner.read_vec(&ranges, None).await }, ) .await .map(|b| b.into()) } - async fn read_vec(&self, ranges: &[Range]) -> Result> { + async fn read_vec( + &self, + ranges: &[Range], + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let mut pages = Vec::with_capacity(ranges.len()); for range in ranges { let inner = &self.inner; @@ -139,13 +146,21 @@ impl BloomFilterReader for CachedBloomFilterIndexBl self.blob_size, range.start, (range.end - range.start) as u32, - move |ranges| async move { inner.read_vec(&ranges).await }, + move |ranges| async move { inner.read_vec(&ranges, None).await }, ) .await?; pages.push(Bytes::from(page)); } + if let Some(m) = metrics { + m.total_ranges += ranges.len(); + m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::(); + if let Some(start) = start { + m.elapsed += start.elapsed(); + } + } + Ok(pages) } diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 3a8df9801d..a6c4bd524f 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -363,7 +363,7 @@ impl BloomFilterIndexApplier { continue; } - *row_group_output = applier.search(predicates, row_group_output).await?; + *row_group_output = applier.search(predicates, row_group_output, None).await?; } Ok(()) diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 48d1d4e2a6..72e2c528a0 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -375,7 +375,7 @@ impl FulltextIndexApplier { } *row_group_output = applier - .search(&predicates, row_group_output) + .search(&predicates, row_group_output, None) .await .context(ApplyBloomFilterIndexSnafu)?; }