feat: implement BloomFilterReadMetrics for BloomFilterReader

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-20 21:00:29 +08:00
committed by shuiyisong
parent 5b8f1d819f
commit d6c75ec55f
5 changed files with 78 additions and 15 deletions

View File

@@ -21,7 +21,7 @@ use itertools::Itertools;
use crate::Bytes;
use crate::bloom_filter::error::Result;
use crate::bloom_filter::reader::BloomFilterReader;
use crate::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
@@ -50,6 +50,7 @@ impl BloomFilterApplier {
&mut self,
predicates: &[InListPredicate],
search_ranges: &[Range<usize>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Range<usize>>> {
if predicates.is_empty() {
// If no predicates, return empty result
@@ -57,7 +58,7 @@ impl BloomFilterApplier {
}
let segments = self.row_ranges_to_segments(search_ranges);
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?;
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments, metrics).await?;
let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
Ok(intersect_ranges(search_ranges, &matching_row_ranges))
}
@@ -95,6 +96,7 @@ impl BloomFilterApplier {
async fn load_bloom_filters(
&mut self,
segments: &[usize],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
let segment_locations = segments
.iter()
@@ -108,7 +110,7 @@ impl BloomFilterApplier {
.map(|i| self.meta.bloom_filter_locs[i as usize])
.collect::<Vec<_>>();
let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?;
let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs, metrics).await?;
Ok((segment_locations, bloom_filters))
}
@@ -422,7 +424,7 @@ mod tests {
];
for (predicates, search_range, expected) in cases {
let result = applier.search(&predicates, &[search_range]).await.unwrap();
let result = applier.search(&predicates, &[search_range], None).await.unwrap();
assert_eq!(
result, expected,
"Expected {:?}, got {:?}",

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::ops::{Range, Rem};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use bytemuck::try_cast_slice;
@@ -34,6 +35,17 @@ const BLOOM_META_LEN_SIZE: u64 = 4;
/// Default prefetch size of bloom filter meta.
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
/// Metrics for bloom filter read operations.
#[derive(Debug, Default)]
pub struct BloomFilterReadMetrics {
/// Total byte size to read.
pub total_bytes: u64,
/// Total number of ranges to read.
pub total_ranges: usize,
/// Elapsed time of the read_vec operation.
pub elapsed: Duration,
}
/// Safely converts bytes to Vec<u64> using bytemuck for optimal performance.
/// Faster than chunking and converting each piece individually.
///
@@ -82,13 +94,28 @@ pub trait BloomFilterReader: Sync {
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>;
/// Reads bunch of ranges from the file.
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut results = Vec::with_capacity(ranges.len());
for range in ranges {
let size = (range.end - range.start) as u32;
let data = self.range_read(range.start, size).await?;
results.push(data);
}
if let Some(m) = metrics {
m.total_ranges += ranges.len();
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
if let Some(start) = start {
m.elapsed += start.elapsed();
}
}
Ok(results)
}
@@ -105,12 +132,16 @@ pub trait BloomFilterReader: Sync {
Ok(bm)
}
async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> {
async fn bloom_filter_vec(
&self,
locs: &[BloomFilterLoc],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<BloomFilter>> {
let ranges = locs
.iter()
.map(|l| l.offset..l.offset + l.size)
.collect::<Vec<_>>();
let bss = self.read_vec(&ranges).await?;
let bss = self.read_vec(&ranges, metrics).await?;
let mut result = Vec::with_capacity(bss.len());
for (bs, loc) in bss.into_iter().zip(locs.iter()) {
@@ -147,8 +178,23 @@ impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
.context(IoSnafu)
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.reader.read_vec(ranges).await.context(IoSnafu)
async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self.reader.read_vec(ranges).await.context(IoSnafu)?;
if let Some(m) = metrics {
m.total_ranges += ranges.len();
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
if let Some(start) = start {
m.elapsed += start.elapsed();
}
}
Ok(result)
}
async fn metadata(&self) -> Result<BloomFilterMeta> {

View File

@@ -14,12 +14,13 @@
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
use index::bloom_filter::reader::BloomFilterReader;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
use store_api::storage::{ColumnId, FileId};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
@@ -122,13 +123,19 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await
.map(|b| b.into())
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut pages = Vec::with_capacity(ranges.len());
for range in ranges {
let inner = &self.inner;
@@ -139,13 +146,21 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
self.blob_size,
range.start,
(range.end - range.start) as u32,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await?;
pages.push(Bytes::from(page));
}
if let Some(m) = metrics {
m.total_ranges += ranges.len();
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
if let Some(start) = start {
m.elapsed += start.elapsed();
}
}
Ok(pages)
}

View File

@@ -363,7 +363,7 @@ impl BloomFilterIndexApplier {
continue;
}
*row_group_output = applier.search(predicates, row_group_output).await?;
*row_group_output = applier.search(predicates, row_group_output, None).await?;
}
Ok(())

View File

@@ -375,7 +375,7 @@ impl FulltextIndexApplier {
}
*row_group_output = applier
.search(&predicates, row_group_output)
.search(&predicates, row_group_output, None)
.await
.context(ApplyBloomFilterIndexSnafu)?;
}