From e2517dec807dd4366258a45304fa6b4595cbab3a Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 11 Dec 2025 15:02:39 +0800 Subject: [PATCH] feat: collect per file metrics Signed-off-by: evenyag --- src/mito2/src/read/scan_util.rs | 122 ++++++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 7c69dee845..1230cf35f4 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -14,7 +14,7 @@ //! Utilities for scanners. -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -42,7 +42,7 @@ use crate::read::merge::{MergeMetrics, MergeMetricsReport}; use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex}; use crate::read::scan_region::StreamContext; use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; -use crate::sst::file::FileTimeRange; +use crate::sst::file::{FileTimeRange, RegionFileId}; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics; use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics; use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics; @@ -52,6 +52,39 @@ use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::row_group::ParquetFetchMetrics; +/// Per-file scan metrics tracked when explain_verbose is enabled. +#[derive(Default, Clone)] +pub(crate) struct FileMetrics { + /// Number of ranges (row groups) read from this file. + pub(crate) num_ranges: usize, + /// Number of rows read from this file. + pub(crate) num_rows: usize, + /// Time spent building readers for this file (accumulated across all ranges). + pub(crate) build_cost: Duration, + /// Time spent scanning this file (accumulated across all ranges). + pub(crate) scan_cost: Duration, +} + +impl fmt::Debug for FileMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{{\"num_ranges\":{}, \"num_rows\":{}, \"build_cost\":\"{:?}\", \"scan_cost\":\"{:?}\"}}", + self.num_ranges, self.num_rows, self.build_cost, self.scan_cost + ) + } +} + +impl FileMetrics { + /// Merges another FileMetrics into this one. + pub(crate) fn merge_from(&mut self, other: &FileMetrics) { + self.num_ranges += other.num_ranges; + self.num_rows += other.num_rows; + self.build_cost += other.build_cost; + self.scan_cost += other.scan_cost; + } +} + /// Verbose scan metrics for a partition. #[derive(Default)] pub(crate) struct ScanMetricsSet { @@ -151,6 +184,8 @@ pub(crate) struct ScanMetricsSet { fetch_metrics: Option, /// Metadata cache metrics. metadata_cache_metrics: Option, + /// Per-file scan metrics, only populated when explain_verbose is true. + per_file_metrics: Option>, } impl fmt::Debug for ScanMetricsSet { @@ -199,6 +234,7 @@ impl fmt::Debug for ScanMetricsSet { fulltext_index_apply_metrics, fetch_metrics, metadata_cache_metrics, + per_file_metrics, } = self; // Write core metrics @@ -326,6 +362,20 @@ impl fmt::Debug for ScanMetricsSet { write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?; } + // Write per-file metrics if present and non-empty + if let Some(file_metrics) = per_file_metrics + && !file_metrics.is_empty() + { + write!(f, ", \"per_file_metrics\": {{")?; + for (i, (file_id, metrics)) in file_metrics.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "\"{}\": {:?}", file_id, metrics)?; + } + write!(f, "}}")?; + } + write!(f, ", \"stream_eof\":{stream_eof}}}") } } @@ -432,6 +482,17 @@ impl ScanMetricsSet { .merge_from(metadata_cache_metrics); } + /// Merges per-file metrics. + fn merge_per_file_metrics(&mut self, other: &HashMap) { + let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new); + for (file_id, metrics) in other { + self_file_metrics + .entry(*file_id) + .or_insert_with(FileMetrics::default) + .merge_from(metrics); + } + } + /// Sets distributor metrics. fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) { let SeriesDistributorMetrics { @@ -722,11 +783,20 @@ impl PartitionMetrics { } /// Merges [ReaderMetrics] and `build_reader_cost`. - pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) { + pub fn merge_reader_metrics( + &self, + metrics: &ReaderMetrics, + per_file_metrics: Option<&HashMap>, + ) { self.0.build_parts_cost.add_duration(metrics.build_cost); let mut metrics_set = self.0.metrics.lock().unwrap(); metrics_set.merge_reader_metrics(metrics); + + // Merge per-file metrics if provided + if let Some(file_metrics) = per_file_metrics { + metrics_set.merge_per_file_metrics(file_metrics); + } } /// Finishes the query. @@ -938,7 +1008,7 @@ pub(crate) async fn scan_file_ranges( .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .await?; part_metrics.inc_num_file_ranges(ranges.len()); - part_metrics.merge_reader_metrics(&reader_metrics); + part_metrics.merge_reader_metrics(&reader_metrics, None); Ok(build_file_range_scan_stream( stream_ctx, @@ -964,7 +1034,7 @@ pub(crate) async fn scan_flat_file_ranges( .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .await?; part_metrics.inc_num_file_ranges(ranges.len()); - part_metrics.merge_reader_metrics(&reader_metrics); + part_metrics.merge_reader_metrics(&reader_metrics, None); Ok(build_flat_file_range_scan_stream( stream_ctx, @@ -987,6 +1057,11 @@ pub fn build_file_range_scan_stream( } else { None }; + let mut per_file_metrics = if part_metrics.explain_verbose() { + Some(HashMap::::new()) + } else { + None + }; let reader_metrics = &mut ReaderMetrics { fetch_metrics: fetch_metrics.clone(), ..Default::default() @@ -1006,6 +1081,20 @@ pub fn build_file_range_scan_stream( } if let Source::PruneReader(reader) = source { let prune_metrics = reader.metrics(); + + // Update per-file metrics if tracking is enabled + if let Some(file_metrics_map) = per_file_metrics.as_mut() { + let file_id = range.file_handle().file_id(); + let file_metrics = file_metrics_map + .entry(file_id) + .or_insert_with(FileMetrics::default); + + file_metrics.num_ranges += 1; + file_metrics.num_rows += prune_metrics.num_rows; + file_metrics.build_cost += build_cost; + file_metrics.scan_cost += prune_metrics.scan_cost; + } + reader_metrics.merge_from(&prune_metrics); } } @@ -1013,7 +1102,7 @@ pub fn build_file_range_scan_stream( // Reports metrics. reader_metrics.observe_rows(read_type); reader_metrics.filter_metrics.observe(); - part_metrics.merge_reader_metrics(reader_metrics); + part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref()); } } @@ -1030,6 +1119,11 @@ pub fn build_flat_file_range_scan_stream( } else { None }; + let mut per_file_metrics = if part_metrics.explain_verbose() { + Some(HashMap::::new()) + } else { + None + }; let reader_metrics = &mut ReaderMetrics { fetch_metrics: fetch_metrics.clone(), ..Default::default() @@ -1058,13 +1152,27 @@ pub fn build_flat_file_range_scan_stream( } let prune_metrics = reader.metrics(); + + // Update per-file metrics if tracking is enabled + if let Some(file_metrics_map) = per_file_metrics.as_mut() { + let file_id = range.file_handle().file_id(); + let file_metrics = file_metrics_map + .entry(file_id) + .or_insert_with(FileMetrics::default); + + file_metrics.num_ranges += 1; + file_metrics.num_rows += prune_metrics.num_rows; + file_metrics.build_cost += build_cost; + file_metrics.scan_cost += prune_metrics.scan_cost; + } + reader_metrics.merge_from(&prune_metrics); } // Reports metrics. reader_metrics.observe_rows(read_type); reader_metrics.filter_metrics.observe(); - part_metrics.merge_reader_metrics(reader_metrics); + part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref()); } }