feat: per file scan metrics (#7396)

* feat: collect per file metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: divide build_cost to build_part_cost and build_reader_cost

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: limit the file metrics num to display

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: use sorted iter to get sorted files

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: output metrics in desc order

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-12-15 20:52:03 +08:00
committed by GitHub
parent 913ac325e5
commit 5232a12a8c
2 changed files with 230 additions and 7 deletions

View File

@@ -1137,6 +1137,12 @@ impl ScanInput {
self.files.len() self.files.len()
} }
/// Gets the file handle from a row group index.
pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
let file_index = index.index - self.num_memtables();
&self.files[file_index]
}
pub fn region_metadata(&self) -> &RegionMetadataRef { pub fn region_metadata(&self) -> &RegionMetadataRef {
self.mapper.metadata() self.mapper.metadata()
} }

View File

@@ -14,7 +14,7 @@
//! Utilities for scanners. //! Utilities for scanners.
use std::collections::VecDeque; use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -42,7 +42,7 @@ use crate::read::merge::{MergeMetrics, MergeMetricsReport};
use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex}; use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext; use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange; use crate::sst::file::{FileTimeRange, RegionFileId};
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics; use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics; use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
@@ -52,6 +52,57 @@ use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_group::ParquetFetchMetrics;
/// Per-file scan metrics.
#[derive(Default, Clone)]
pub struct FileScanMetrics {
/// Number of ranges (row groups) read from this file.
pub num_ranges: usize,
/// Number of rows read from this file.
pub num_rows: usize,
/// Time spent building file ranges/parts (file-level preparation).
pub build_part_cost: Duration,
/// Time spent building readers for this file (accumulated across all ranges).
pub build_reader_cost: Duration,
/// Time spent scanning this file (accumulated across all ranges).
pub scan_cost: Duration,
}
impl fmt::Debug for FileScanMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
if self.num_ranges > 0 {
write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
}
if self.num_rows > 0 {
write!(f, ", \"num_rows\":{}", self.num_rows)?;
}
if !self.build_reader_cost.is_zero() {
write!(
f,
", \"build_reader_cost\":\"{:?}\"",
self.build_reader_cost
)?;
}
if !self.scan_cost.is_zero() {
write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
}
write!(f, "}}")
}
}
impl FileScanMetrics {
/// Merges another FileMetrics into this one.
pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
self.num_ranges += other.num_ranges;
self.num_rows += other.num_rows;
self.build_part_cost += other.build_part_cost;
self.build_reader_cost += other.build_reader_cost;
self.scan_cost += other.scan_cost;
}
}
/// Verbose scan metrics for a partition. /// Verbose scan metrics for a partition.
#[derive(Default)] #[derive(Default)]
pub(crate) struct ScanMetricsSet { pub(crate) struct ScanMetricsSet {
@@ -151,6 +202,37 @@ pub(crate) struct ScanMetricsSet {
fetch_metrics: Option<ParquetFetchMetrics>, fetch_metrics: Option<ParquetFetchMetrics>,
/// Metadata cache metrics. /// Metadata cache metrics.
metadata_cache_metrics: Option<MetadataCacheMetrics>, metadata_cache_metrics: Option<MetadataCacheMetrics>,
/// Per-file scan metrics, only populated when explain_verbose is true.
per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
}
/// Wrapper for file metrics that compares by total cost in reverse order.
/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
struct CompareCostReverse<'a> {
total_cost: Duration,
file_id: RegionFileId,
metrics: &'a FileScanMetrics,
}
impl Ord for CompareCostReverse<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Reverse comparison: smaller costs are "greater"
other.total_cost.cmp(&self.total_cost)
}
}
impl PartialOrd for CompareCostReverse<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Eq for CompareCostReverse<'_> {}
impl PartialEq for CompareCostReverse<'_> {
fn eq(&self, other: &Self) -> bool {
self.total_cost == other.total_cost
}
} }
impl fmt::Debug for ScanMetricsSet { impl fmt::Debug for ScanMetricsSet {
@@ -199,6 +281,7 @@ impl fmt::Debug for ScanMetricsSet {
fulltext_index_apply_metrics, fulltext_index_apply_metrics,
fetch_metrics, fetch_metrics,
metadata_cache_metrics, metadata_cache_metrics,
per_file_metrics,
} = self; } = self;
// Write core metrics // Write core metrics
@@ -326,6 +409,52 @@ impl fmt::Debug for ScanMetricsSet {
write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?; write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
} }
// Write top file metrics if present and non-empty
if let Some(file_metrics) = per_file_metrics
&& !file_metrics.is_empty()
{
// Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
let mut heap = BinaryHeap::new();
for (file_id, metrics) in file_metrics.iter() {
let total_cost =
metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
if heap.len() < 10 {
// Haven't reached 10 yet, just push
heap.push(CompareCostReverse {
total_cost,
file_id: *file_id,
metrics,
});
} else if let Some(min_entry) = heap.peek() {
// If current cost is higher than the minimum in our top-10, replace it
if total_cost > min_entry.total_cost {
heap.pop();
heap.push(CompareCostReverse {
total_cost,
file_id: *file_id,
metrics,
});
}
}
}
let top_files = heap.into_sorted_vec();
write!(f, ", \"top_file_metrics\": {{")?;
for (i, item) in top_files.iter().enumerate() {
let CompareCostReverse {
total_cost: _,
file_id,
metrics,
} = item;
if i > 0 {
write!(f, ", ")?;
}
write!(f, "\"{}\": {:?}", file_id, metrics)?;
}
write!(f, "}}")?;
}
write!(f, ", \"stream_eof\":{stream_eof}}}") write!(f, ", \"stream_eof\":{stream_eof}}}")
} }
} }
@@ -432,6 +561,17 @@ impl ScanMetricsSet {
.merge_from(metadata_cache_metrics); .merge_from(metadata_cache_metrics);
} }
/// Merges per-file metrics.
fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
for (file_id, metrics) in other {
self_file_metrics
.entry(*file_id)
.or_default()
.merge_from(metrics);
}
}
/// Sets distributor metrics. /// Sets distributor metrics.
fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) { fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
let SeriesDistributorMetrics { let SeriesDistributorMetrics {
@@ -722,11 +862,20 @@ impl PartitionMetrics {
} }
/// Merges [ReaderMetrics] and `build_reader_cost`. /// Merges [ReaderMetrics] and `build_reader_cost`.
pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) { pub fn merge_reader_metrics(
&self,
metrics: &ReaderMetrics,
per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
) {
self.0.build_parts_cost.add_duration(metrics.build_cost); self.0.build_parts_cost.add_duration(metrics.build_cost);
let mut metrics_set = self.0.metrics.lock().unwrap(); let mut metrics_set = self.0.metrics.lock().unwrap();
metrics_set.merge_reader_metrics(metrics); metrics_set.merge_reader_metrics(metrics);
// Merge per-file metrics if provided
if let Some(file_metrics) = per_file_metrics {
metrics_set.merge_per_file_metrics(file_metrics);
}
} }
/// Finishes the query. /// Finishes the query.
@@ -938,13 +1087,32 @@ pub(crate) async fn scan_file_ranges(
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?; .await?;
part_metrics.inc_num_file_ranges(ranges.len()); part_metrics.inc_num_file_ranges(ranges.len());
part_metrics.merge_reader_metrics(&reader_metrics); part_metrics.merge_reader_metrics(&reader_metrics, None);
// Creates initial per-file metrics with build_part_cost.
let init_per_file_metrics = if part_metrics.explain_verbose() {
let file = stream_ctx.input.file_from_index(index);
let file_id = file.file_id();
let mut map = HashMap::new();
map.insert(
file_id,
FileScanMetrics {
build_part_cost: reader_metrics.build_cost,
..Default::default()
},
);
Some(map)
} else {
None
};
Ok(build_file_range_scan_stream( Ok(build_file_range_scan_stream(
stream_ctx, stream_ctx,
part_metrics, part_metrics,
read_type, read_type,
ranges, ranges,
init_per_file_metrics,
)) ))
} }
@@ -964,13 +1132,32 @@ pub(crate) async fn scan_flat_file_ranges(
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?; .await?;
part_metrics.inc_num_file_ranges(ranges.len()); part_metrics.inc_num_file_ranges(ranges.len());
part_metrics.merge_reader_metrics(&reader_metrics); part_metrics.merge_reader_metrics(&reader_metrics, None);
// Creates initial per-file metrics with build_part_cost.
let init_per_file_metrics = if part_metrics.explain_verbose() {
let file = stream_ctx.input.file_from_index(index);
let file_id = file.file_id();
let mut map = HashMap::new();
map.insert(
file_id,
FileScanMetrics {
build_part_cost: reader_metrics.build_cost,
..Default::default()
},
);
Some(map)
} else {
None
};
Ok(build_flat_file_range_scan_stream( Ok(build_flat_file_range_scan_stream(
stream_ctx, stream_ctx,
part_metrics, part_metrics,
read_type, read_type,
ranges, ranges,
init_per_file_metrics,
)) ))
} }
@@ -980,6 +1167,7 @@ pub fn build_file_range_scan_stream(
part_metrics: PartitionMetrics, part_metrics: PartitionMetrics,
read_type: &'static str, read_type: &'static str,
ranges: SmallVec<[FileRange; 2]>, ranges: SmallVec<[FileRange; 2]>,
mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
) -> impl Stream<Item = Result<Batch>> { ) -> impl Stream<Item = Result<Batch>> {
try_stream! { try_stream! {
let fetch_metrics = if part_metrics.explain_verbose() { let fetch_metrics = if part_metrics.explain_verbose() {
@@ -1006,6 +1194,20 @@ pub fn build_file_range_scan_stream(
} }
if let Source::PruneReader(reader) = source { if let Source::PruneReader(reader) = source {
let prune_metrics = reader.metrics(); let prune_metrics = reader.metrics();
// Update per-file metrics if tracking is enabled
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
let file_id = range.file_handle().file_id();
let file_metrics = file_metrics_map
.entry(file_id)
.or_insert_with(FileScanMetrics::default);
file_metrics.num_ranges += 1;
file_metrics.num_rows += prune_metrics.num_rows;
file_metrics.build_reader_cost += build_cost;
file_metrics.scan_cost += prune_metrics.scan_cost;
}
reader_metrics.merge_from(&prune_metrics); reader_metrics.merge_from(&prune_metrics);
} }
} }
@@ -1013,7 +1215,7 @@ pub fn build_file_range_scan_stream(
// Reports metrics. // Reports metrics.
reader_metrics.observe_rows(read_type); reader_metrics.observe_rows(read_type);
reader_metrics.filter_metrics.observe(); reader_metrics.filter_metrics.observe();
part_metrics.merge_reader_metrics(reader_metrics); part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
} }
} }
@@ -1023,6 +1225,7 @@ pub fn build_flat_file_range_scan_stream(
part_metrics: PartitionMetrics, part_metrics: PartitionMetrics,
read_type: &'static str, read_type: &'static str,
ranges: SmallVec<[FileRange; 2]>, ranges: SmallVec<[FileRange; 2]>,
mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
) -> impl Stream<Item = Result<RecordBatch>> { ) -> impl Stream<Item = Result<RecordBatch>> {
try_stream! { try_stream! {
let fetch_metrics = if part_metrics.explain_verbose() { let fetch_metrics = if part_metrics.explain_verbose() {
@@ -1058,13 +1261,27 @@ pub fn build_flat_file_range_scan_stream(
} }
let prune_metrics = reader.metrics(); let prune_metrics = reader.metrics();
// Update per-file metrics if tracking is enabled
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
let file_id = range.file_handle().file_id();
let file_metrics = file_metrics_map
.entry(file_id)
.or_insert_with(FileScanMetrics::default);
file_metrics.num_ranges += 1;
file_metrics.num_rows += prune_metrics.num_rows;
file_metrics.build_reader_cost += build_cost;
file_metrics.scan_cost += prune_metrics.scan_cost;
}
reader_metrics.merge_from(&prune_metrics); reader_metrics.merge_from(&prune_metrics);
} }
// Reports metrics. // Reports metrics.
reader_metrics.observe_rows(read_type); reader_metrics.observe_rows(read_type);
reader_metrics.filter_metrics.observe(); reader_metrics.filter_metrics.observe();
part_metrics.merge_reader_metrics(reader_metrics); part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
} }
} }