feat: collect read metrics in appliers

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-21 18:31:33 +08:00
committed by shuiyisong
parent fea2966dec
commit 50b5c90d53
5 changed files with 44 additions and 15 deletions

View File

@@ -36,7 +36,7 @@ const BLOOM_META_LEN_SIZE: u64 = 4;
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
/// Metrics for bloom filter read operations.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct BloomFilterReadMetrics {
/// Total byte size to read.
pub total_bytes: u64,

View File

@@ -31,7 +31,7 @@ mod blob;
mod footer;
/// Metrics for inverted index read operations.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct InvertedIndexReadMetrics {
/// Total byte size to read.
pub total_bytes: u64,

View File

@@ -22,7 +22,9 @@ use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use index::bloom_filter::reader::{
BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
};
use index::target::IndexTarget;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
@@ -57,6 +59,8 @@ pub struct BloomFilterIndexApplyMetrics {
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
/// Metrics for bloom filter read operations.
pub read_metrics: BloomFilterReadMetrics,
}
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
@@ -201,12 +205,12 @@ impl BloomFilterIndexApplier {
BloomFilterReaderImpl::new(blob),
bloom_filter_cache.clone(),
);
self.apply_predicates(reader, predicates, &mut output)
self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
.await
.context(ApplyBloomFilterIndexSnafu)?;
} else {
let reader = BloomFilterReaderImpl::new(blob);
self.apply_predicates(reader, predicates, &mut output)
self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
.await
.context(ApplyBloomFilterIndexSnafu)?;
}
@@ -354,6 +358,7 @@ impl BloomFilterIndexApplier {
reader: R,
predicates: &[InListPredicate],
output: &mut [(usize, Vec<Range<usize>>)],
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> std::result::Result<(), index::bloom_filter::error::Error> {
let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
@@ -363,7 +368,10 @@ impl BloomFilterIndexApplier {
continue;
}
*row_group_output = applier.search(predicates, row_group_output, None).await?;
let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
*row_group_output = applier
.search(predicates, row_group_output, read_metrics)
.await?;
}
Ok(())

View File

@@ -21,7 +21,7 @@ use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::BloomFilterReaderImpl;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
use index::fulltext_index::{Analyzer, Config};
@@ -61,6 +61,8 @@ pub struct FulltextIndexApplyMetrics {
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses.
pub blob_cache_miss: usize,
/// Metrics for bloom filter reads.
pub bloom_filter_read_metrics: BloomFilterReadMetrics,
}
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
@@ -330,7 +332,7 @@ impl FulltextIndexApplier {
column_id: ColumnId,
terms: &[FulltextTerm],
output: &mut [(usize, Vec<Range<usize>>)],
metrics: Option<&mut FulltextIndexApplyMetrics>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<bool> {
let blob_key = format!(
"{INDEX_BLOB_TYPE_BLOOM}-{}",
@@ -338,7 +340,7 @@ impl FulltextIndexApplier {
);
let Some(reader) = self
.index_source
.blob(file_id, &blob_key, file_size_hint, metrics)
.blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut())
.await?
else {
return Ok(false);
@@ -380,9 +382,14 @@ impl FulltextIndexApplier {
continue;
}
// TODO(yingwen): Update reader metrics.
*row_group_output = applier
.search(&predicates, row_group_output, None)
.search(
&predicates,
row_group_output,
metrics
.as_deref_mut()
.map(|m| &mut m.bloom_filter_read_metrics),
)
.await
.context(ApplyBloomFilterIndexSnafu)?;
}

View File

@@ -20,7 +20,7 @@ use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
@@ -54,6 +54,8 @@ pub struct InvertedIndexApplyMetrics {
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
/// Metrics for inverted index reads.
pub inverted_index_read_metrics: InvertedIndexReadMetrics,
}
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
@@ -146,7 +148,7 @@ impl InvertedIndexApplier {
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
metrics: Option<&mut InvertedIndexApplyMetrics>,
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
) -> Result<ApplyOutput> {
let start = Instant::now();
@@ -177,13 +179,25 @@ impl InvertedIndexApplier {
index_cache.clone(),
);
self.index_applier
.apply(context, &mut index_reader, None)
.apply(
context,
&mut index_reader,
metrics
.as_deref_mut()
.map(|m| &mut m.inverted_index_read_metrics),
)
.await
.context(ApplyInvertedIndexSnafu)
} else {
let mut index_reader = InvertedIndexBlobReader::new(blob);
self.index_applier
.apply(context, &mut index_reader, None)
.apply(
context,
&mut index_reader,
metrics
.as_deref_mut()
.map(|m| &mut m.inverted_index_read_metrics),
)
.await
.context(ApplyInvertedIndexSnafu)
};