diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 9ec734a69d..8191dbcb7a 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -1137,6 +1137,12 @@ impl ScanInput { self.files.len() } + /// Gets the file handle from a row group index. + pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle { + let file_index = index.index - self.num_memtables(); + &self.files[file_index] + } + pub fn region_metadata(&self) -> &RegionMetadataRef { self.mapper.metadata() } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 7c69dee845..73df1e8fd8 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -14,7 +14,7 @@ //! Utilities for scanners. -use std::collections::VecDeque; +use std::collections::{BinaryHeap, HashMap, VecDeque}; use std::fmt; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -42,7 +42,7 @@ use crate::read::merge::{MergeMetrics, MergeMetricsReport}; use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex}; use crate::read::scan_region::StreamContext; use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; -use crate::sst::file::FileTimeRange; +use crate::sst::file::{FileTimeRange, RegionFileId}; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics; use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics; use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics; @@ -52,6 +52,57 @@ use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::row_group::ParquetFetchMetrics; +/// Per-file scan metrics. +#[derive(Default, Clone)] +pub struct FileScanMetrics { + /// Number of ranges (row groups) read from this file. + pub num_ranges: usize, + /// Number of rows read from this file. + pub num_rows: usize, + /// Time spent building file ranges/parts (file-level preparation). + pub build_part_cost: Duration, + /// Time spent building readers for this file (accumulated across all ranges). + pub build_reader_cost: Duration, + /// Time spent scanning this file (accumulated across all ranges). + pub scan_cost: Duration, +} + +impl fmt::Debug for FileScanMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?; + + if self.num_ranges > 0 { + write!(f, ", \"num_ranges\":{}", self.num_ranges)?; + } + if self.num_rows > 0 { + write!(f, ", \"num_rows\":{}", self.num_rows)?; + } + if !self.build_reader_cost.is_zero() { + write!( + f, + ", \"build_reader_cost\":\"{:?}\"", + self.build_reader_cost + )?; + } + if !self.scan_cost.is_zero() { + write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?; + } + + write!(f, "}}") + } +} + +impl FileScanMetrics { + /// Merges another FileMetrics into this one. + pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) { + self.num_ranges += other.num_ranges; + self.num_rows += other.num_rows; + self.build_part_cost += other.build_part_cost; + self.build_reader_cost += other.build_reader_cost; + self.scan_cost += other.scan_cost; + } +} + /// Verbose scan metrics for a partition. #[derive(Default)] pub(crate) struct ScanMetricsSet { @@ -151,6 +202,37 @@ pub(crate) struct ScanMetricsSet { fetch_metrics: Option, /// Metadata cache metrics. metadata_cache_metrics: Option, + /// Per-file scan metrics, only populated when explain_verbose is true. + per_file_metrics: Option>, +} + +/// Wrapper for file metrics that compares by total cost in reverse order. +/// This allows using BinaryHeap as a min-heap for efficient top-K selection. +struct CompareCostReverse<'a> { + total_cost: Duration, + file_id: RegionFileId, + metrics: &'a FileScanMetrics, +} + +impl Ord for CompareCostReverse<'_> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Reverse comparison: smaller costs are "greater" + other.total_cost.cmp(&self.total_cost) + } +} + +impl PartialOrd for CompareCostReverse<'_> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Eq for CompareCostReverse<'_> {} + +impl PartialEq for CompareCostReverse<'_> { + fn eq(&self, other: &Self) -> bool { + self.total_cost == other.total_cost + } } impl fmt::Debug for ScanMetricsSet { @@ -199,6 +281,7 @@ impl fmt::Debug for ScanMetricsSet { fulltext_index_apply_metrics, fetch_metrics, metadata_cache_metrics, + per_file_metrics, } = self; // Write core metrics @@ -326,6 +409,52 @@ impl fmt::Debug for ScanMetricsSet { write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?; } + // Write top file metrics if present and non-empty + if let Some(file_metrics) = per_file_metrics + && !file_metrics.is_empty() + { + // Use min-heap (BinaryHeap with reverse comparison) to keep only top 10 + let mut heap = BinaryHeap::new(); + for (file_id, metrics) in file_metrics.iter() { + let total_cost = + metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost; + + if heap.len() < 10 { + // Haven't reached 10 yet, just push + heap.push(CompareCostReverse { + total_cost, + file_id: *file_id, + metrics, + }); + } else if let Some(min_entry) = heap.peek() { + // If current cost is higher than the minimum in our top-10, replace it + if total_cost > min_entry.total_cost { + heap.pop(); + heap.push(CompareCostReverse { + total_cost, + file_id: *file_id, + metrics, + }); + } + } + } + + let top_files = heap.into_sorted_vec(); + write!(f, ", \"top_file_metrics\": {{")?; + for (i, item) in top_files.iter().enumerate() { + let CompareCostReverse { + total_cost: _, + file_id, + metrics, + } = item; + if i > 0 { + write!(f, ", ")?; + } + write!(f, "\"{}\": {:?}", file_id, metrics)?; + } + write!(f, "}}")?; + } + write!(f, ", \"stream_eof\":{stream_eof}}}") } } @@ -432,6 +561,17 @@ impl ScanMetricsSet { .merge_from(metadata_cache_metrics); } + /// Merges per-file metrics. + fn merge_per_file_metrics(&mut self, other: &HashMap) { + let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new); + for (file_id, metrics) in other { + self_file_metrics + .entry(*file_id) + .or_default() + .merge_from(metrics); + } + } + /// Sets distributor metrics. fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) { let SeriesDistributorMetrics { @@ -722,11 +862,20 @@ impl PartitionMetrics { } /// Merges [ReaderMetrics] and `build_reader_cost`. - pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) { + pub fn merge_reader_metrics( + &self, + metrics: &ReaderMetrics, + per_file_metrics: Option<&HashMap>, + ) { self.0.build_parts_cost.add_duration(metrics.build_cost); let mut metrics_set = self.0.metrics.lock().unwrap(); metrics_set.merge_reader_metrics(metrics); + + // Merge per-file metrics if provided + if let Some(file_metrics) = per_file_metrics { + metrics_set.merge_per_file_metrics(file_metrics); + } } /// Finishes the query. @@ -938,13 +1087,32 @@ pub(crate) async fn scan_file_ranges( .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .await?; part_metrics.inc_num_file_ranges(ranges.len()); - part_metrics.merge_reader_metrics(&reader_metrics); + part_metrics.merge_reader_metrics(&reader_metrics, None); + + // Creates initial per-file metrics with build_part_cost. + let init_per_file_metrics = if part_metrics.explain_verbose() { + let file = stream_ctx.input.file_from_index(index); + let file_id = file.file_id(); + + let mut map = HashMap::new(); + map.insert( + file_id, + FileScanMetrics { + build_part_cost: reader_metrics.build_cost, + ..Default::default() + }, + ); + Some(map) + } else { + None + }; Ok(build_file_range_scan_stream( stream_ctx, part_metrics, read_type, ranges, + init_per_file_metrics, )) } @@ -964,13 +1132,32 @@ pub(crate) async fn scan_flat_file_ranges( .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .await?; part_metrics.inc_num_file_ranges(ranges.len()); - part_metrics.merge_reader_metrics(&reader_metrics); + part_metrics.merge_reader_metrics(&reader_metrics, None); + + // Creates initial per-file metrics with build_part_cost. + let init_per_file_metrics = if part_metrics.explain_verbose() { + let file = stream_ctx.input.file_from_index(index); + let file_id = file.file_id(); + + let mut map = HashMap::new(); + map.insert( + file_id, + FileScanMetrics { + build_part_cost: reader_metrics.build_cost, + ..Default::default() + }, + ); + Some(map) + } else { + None + }; Ok(build_flat_file_range_scan_stream( stream_ctx, part_metrics, read_type, ranges, + init_per_file_metrics, )) } @@ -980,6 +1167,7 @@ pub fn build_file_range_scan_stream( part_metrics: PartitionMetrics, read_type: &'static str, ranges: SmallVec<[FileRange; 2]>, + mut per_file_metrics: Option>, ) -> impl Stream> { try_stream! { let fetch_metrics = if part_metrics.explain_verbose() { @@ -1006,6 +1194,20 @@ pub fn build_file_range_scan_stream( } if let Source::PruneReader(reader) = source { let prune_metrics = reader.metrics(); + + // Update per-file metrics if tracking is enabled + if let Some(file_metrics_map) = per_file_metrics.as_mut() { + let file_id = range.file_handle().file_id(); + let file_metrics = file_metrics_map + .entry(file_id) + .or_insert_with(FileScanMetrics::default); + + file_metrics.num_ranges += 1; + file_metrics.num_rows += prune_metrics.num_rows; + file_metrics.build_reader_cost += build_cost; + file_metrics.scan_cost += prune_metrics.scan_cost; + } + reader_metrics.merge_from(&prune_metrics); } } @@ -1013,7 +1215,7 @@ pub fn build_file_range_scan_stream( // Reports metrics. reader_metrics.observe_rows(read_type); reader_metrics.filter_metrics.observe(); - part_metrics.merge_reader_metrics(reader_metrics); + part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref()); } } @@ -1023,6 +1225,7 @@ pub fn build_flat_file_range_scan_stream( part_metrics: PartitionMetrics, read_type: &'static str, ranges: SmallVec<[FileRange; 2]>, + mut per_file_metrics: Option>, ) -> impl Stream> { try_stream! { let fetch_metrics = if part_metrics.explain_verbose() { @@ -1058,13 +1261,27 @@ pub fn build_flat_file_range_scan_stream( } let prune_metrics = reader.metrics(); + + // Update per-file metrics if tracking is enabled + if let Some(file_metrics_map) = per_file_metrics.as_mut() { + let file_id = range.file_handle().file_id(); + let file_metrics = file_metrics_map + .entry(file_id) + .or_insert_with(FileScanMetrics::default); + + file_metrics.num_ranges += 1; + file_metrics.num_rows += prune_metrics.num_rows; + file_metrics.build_reader_cost += build_cost; + file_metrics.scan_cost += prune_metrics.scan_cost; + } + reader_metrics.merge_from(&prune_metrics); } // Reports metrics. reader_metrics.observe_rows(read_type); reader_metrics.filter_metrics.observe(); - part_metrics.merge_reader_metrics(reader_metrics); + part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref()); } }