feat: add metrics to bloom applier

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-20 13:52:41 +08:00
committed by shuiyisong
parent 4519607bc6
commit b68286e8af
2 changed files with 41 additions and 6 deletions

View File

@@ -17,6 +17,7 @@ mod builder;
use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
@@ -47,6 +48,17 @@ use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking bloom filter index apply operations.
#[derive(Debug, Default, Clone)]
pub struct BloomFilterIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses.
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
}
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
@@ -133,15 +145,20 @@ impl BloomFilterIndexApplier {
///
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
.start_timer();
let apply_start = Instant::now();
// Calculates row groups' ranges based on start of the file.
let mut input = Vec::with_capacity(row_groups.size_hint().0);
@@ -163,7 +180,7 @@ impl BloomFilterIndexApplier {
for (column_id, predicates) in self.predicates.iter() {
let blob = match self
.blob_reader(file_id, *column_id, file_size_hint)
.blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
.await?
{
Some(blob) => blob,
@@ -173,6 +190,9 @@ impl BloomFilterIndexApplier {
// Create appropriate reader based on whether we have caching enabled
if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
if let Some(m) = &mut metrics {
m.blob_read_bytes += blob_size;
}
let reader = CachedBloomFilterIndexBlobReader::new(
file_id.file_id(),
*column_id,
@@ -201,6 +221,16 @@ impl BloomFilterIndexApplier {
}
}
// Record elapsed time to histogram and collect metrics if requested
let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
}
Ok(output)
}
@@ -212,6 +242,7 @@ impl BloomFilterIndexApplier {
file_id: RegionFileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Option<BlobReader>> {
let reader = match self
.cached_blob_reader(file_id, column_id, file_size_hint)
@@ -219,6 +250,9 @@ impl BloomFilterIndexApplier {
{
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Some(m) = metrics {
m.blob_cache_miss += 1;
}
if let Err(err) = other {
// Blob not found means no index for this column
if is_blob_not_found(&err) {
@@ -393,7 +427,7 @@ mod tests {
let applier = builder.build(&exprs).unwrap().unwrap();
applier
.apply(file_id, None, row_groups.into_iter())
.apply(file_id, None, row_groups.into_iter(), None)
.await
.unwrap()
.into_iter()

View File

@@ -670,8 +670,9 @@ impl ParquetReaderBuilder {
.unwrap_or(true),
)
});
// TODO(yingwen): Collect metrics for applier
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs, None)
.await;
let mut selection = match apply_res {
Ok(apply_output) => {