feat: add apply metrics

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-24 21:06:07 +08:00
committed by shuiyisong
parent 69cf13b33a
commit cce4d56e00
7 changed files with 116 additions and 14 deletions

View File

@@ -50,6 +50,17 @@ pub struct BloomFilterReadMetrics {
pub cache_miss: usize,
}
impl BloomFilterReadMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.total_bytes += other.total_bytes;
self.total_ranges += other.total_ranges;
self.fetch_elapsed += other.fetch_elapsed;
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
}
}
/// Safely converts bytes to Vec<u64> using bytemuck for optimal performance.
/// Faster than chunking and converting each piece individually.
///

View File

@@ -45,6 +45,17 @@ pub struct InvertedIndexReadMetrics {
pub cache_miss: usize,
}
impl InvertedIndexReadMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.total_bytes += other.total_bytes;
self.total_ranges += other.total_ranges;
self.fetch_elapsed += other.fetch_elapsed;
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
}
}
/// InvertedIndexReader defines an asynchronous reader of inverted index data
#[mockall::automock]
#[async_trait]

View File

@@ -305,6 +305,10 @@ impl ScanMetricsSet {
rows_inverted_filtered,
rows_bloom_filtered,
rows_precise_filtered,
// Optional applier metrics are not collected here.
inverted_index_apply_metrics: _,
bloom_filter_apply_metrics: _,
fulltext_index_apply_metrics: _,
},
num_record_batches,
num_batches,

View File

@@ -63,6 +63,16 @@ pub struct BloomFilterIndexApplyMetrics {
pub read_metrics: BloomFilterReadMetrics,
}
impl BloomFilterIndexApplyMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.blob_read_bytes += other.blob_read_bytes;
self.read_metrics.merge_from(&other.read_metrics);
}
}
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.

View File

@@ -85,6 +85,17 @@ impl FulltextIndexApplyMetrics {
self.dir_cache_miss += 1;
}
}
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.dir_cache_hit += other.dir_cache_hit;
self.dir_cache_miss += other.dir_cache_miss;
self.dir_init_elapsed += other.dir_init_elapsed;
self.bloom_filter_read_metrics
.merge_from(&other.bloom_filter_read_metrics);
}
}
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files

View File

@@ -58,6 +58,17 @@ pub struct InvertedIndexApplyMetrics {
pub inverted_index_read_metrics: InvertedIndexReadMetrics,
}
impl InvertedIndexApplyMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.blob_read_bytes += other.blob_read_bytes;
self.inverted_index_read_metrics
.merge_from(&other.inverted_index_read_metrics);
}
}
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
pub(crate) struct InvertedIndexApplier {

View File

@@ -52,9 +52,15 @@ use crate::metrics::{
use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::index::bloom_filter::applier::{
BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
};
use crate::sst::index::fulltext_index::applier::{
FulltextIndexApplierRef, FulltextIndexApplyMetrics,
};
use crate::sst::index::inverted_index::applier::{
InvertedIndexApplierRef, InvertedIndexApplyMetrics,
};
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
};
@@ -537,9 +543,12 @@ impl ParquetReaderBuilder {
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
// TODO(yingwen): Collect applier metrics in verbose mode.
let apply_res = index_applier
.apply_fine(self.file_handle.file_id(), Some(file_size_hint), None)
.apply_fine(
self.file_handle.file_id(),
Some(file_size_hint),
metrics.fulltext_index_apply_metrics.as_mut(),
)
.await;
let selection = match apply_res {
Ok(Some(res)) => {
@@ -606,15 +615,18 @@ impl ParquetReaderBuilder {
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
// TODO(yingwen): Collect applier metrics in verbose mode.
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), None)
.apply(
self.file_handle.file_id(),
Some(file_size_hint),
metrics.inverted_index_apply_metrics.as_mut(),
)
.await;
let selection = match apply_res {
Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
row_group_size,
num_row_groups,
output,
apply_output,
),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
@@ -682,9 +694,13 @@ impl ParquetReaderBuilder {
.unwrap_or(true),
)
});
// TODO(yingwen): Collect metrics for applier
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs, None)
.apply(
self.file_handle.file_id(),
Some(file_size_hint),
rgs,
metrics.bloom_filter_apply_metrics.as_mut(),
)
.await;
let mut selection = match apply_res {
Ok(apply_output) => {
@@ -761,9 +777,13 @@ impl ParquetReaderBuilder {
.unwrap_or(true),
)
});
// TODO(yingwen): Collect applier metrics in verbose mode.
let apply_res = index_applier
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs, None)
.apply_coarse(
self.file_handle.file_id(),
Some(file_size_hint),
rgs,
metrics.fulltext_index_apply_metrics.as_mut(),
)
.await;
let mut selection = match apply_res {
Ok(Some(apply_output)) => {
@@ -907,7 +927,7 @@ fn all_required_row_groups_searched(
}
/// Metrics of filtering rows groups and rows.
#[derive(Debug, Default, Clone, Copy)]
#[derive(Debug, Default, Clone)]
pub(crate) struct ReaderFilterMetrics {
/// Number of row groups before filtering.
pub(crate) rg_total: usize,
@@ -930,6 +950,13 @@ pub(crate) struct ReaderFilterMetrics {
pub(crate) rows_bloom_filtered: usize,
/// Number of rows filtered by precise filter.
pub(crate) rows_precise_filtered: usize,
/// Optional metrics for inverted index applier.
pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
/// Optional metrics for bloom filter index applier.
pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
/// Optional metrics for fulltext index applier.
pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
}
impl ReaderFilterMetrics {
@@ -946,6 +973,23 @@ impl ReaderFilterMetrics {
self.rows_inverted_filtered += other.rows_inverted_filtered;
self.rows_bloom_filtered += other.rows_bloom_filtered;
self.rows_precise_filtered += other.rows_precise_filtered;
// Merge optional applier metrics
if let Some(other_metrics) = &other.inverted_index_apply_metrics {
self.inverted_index_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
self.bloom_filter_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
self.fulltext_index_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
}
/// Reports metrics.