diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 9ec734a69d..8191dbcb7a 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -1137,6 +1137,12 @@ impl ScanInput { self.files.len() } + /// Gets the file handle from a row group index. + pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle { + let file_index = index.index - self.num_memtables(); + &self.files[file_index] + } + pub fn region_metadata(&self) -> &RegionMetadataRef { self.mapper.metadata() } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 1230cf35f4..b6107d5af6 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -52,35 +52,53 @@ use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::row_group::ParquetFetchMetrics; -/// Per-file scan metrics tracked when explain_verbose is enabled. +/// Per-file scan metrics. #[derive(Default, Clone)] -pub(crate) struct FileMetrics { +pub struct FileScanMetrics { /// Number of ranges (row groups) read from this file. - pub(crate) num_ranges: usize, + pub num_ranges: usize, /// Number of rows read from this file. - pub(crate) num_rows: usize, + pub num_rows: usize, + /// Time spent building file ranges/parts (file-level preparation). + pub build_part_cost: Duration, /// Time spent building readers for this file (accumulated across all ranges). - pub(crate) build_cost: Duration, + pub build_reader_cost: Duration, /// Time spent scanning this file (accumulated across all ranges). - pub(crate) scan_cost: Duration, + pub scan_cost: Duration, } -impl fmt::Debug for FileMetrics { +impl fmt::Debug for FileScanMetrics { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{{\"num_ranges\":{}, \"num_rows\":{}, \"build_cost\":\"{:?}\", \"scan_cost\":\"{:?}\"}}", - self.num_ranges, self.num_rows, self.build_cost, self.scan_cost - ) + write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?; + + if self.num_ranges > 0 { + write!(f, ", \"num_ranges\":{}", self.num_ranges)?; + } + if self.num_rows > 0 { + write!(f, ", \"num_rows\":{}", self.num_rows)?; + } + if !self.build_reader_cost.is_zero() { + write!( + f, + ", \"build_reader_cost\":\"{:?}\"", + self.build_reader_cost + )?; + } + if !self.scan_cost.is_zero() { + write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?; + } + + write!(f, "}}") } } -impl FileMetrics { +impl FileScanMetrics { /// Merges another FileMetrics into this one. - pub(crate) fn merge_from(&mut self, other: &FileMetrics) { + pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) { self.num_ranges += other.num_ranges; self.num_rows += other.num_rows; - self.build_cost += other.build_cost; + self.build_part_cost += other.build_part_cost; + self.build_reader_cost += other.build_reader_cost; self.scan_cost += other.scan_cost; } } @@ -185,7 +203,7 @@ pub(crate) struct ScanMetricsSet { /// Metadata cache metrics. metadata_cache_metrics: Option, /// Per-file scan metrics, only populated when explain_verbose is true. - per_file_metrics: Option>, + per_file_metrics: Option>, } impl fmt::Debug for ScanMetricsSet { @@ -483,12 +501,12 @@ impl ScanMetricsSet { } /// Merges per-file metrics. - fn merge_per_file_metrics(&mut self, other: &HashMap) { + fn merge_per_file_metrics(&mut self, other: &HashMap) { let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new); for (file_id, metrics) in other { self_file_metrics .entry(*file_id) - .or_insert_with(FileMetrics::default) + .or_default() .merge_from(metrics); } } @@ -786,7 +804,7 @@ impl PartitionMetrics { pub fn merge_reader_metrics( &self, metrics: &ReaderMetrics, - per_file_metrics: Option<&HashMap>, + per_file_metrics: Option<&HashMap>, ) { self.0.build_parts_cost.add_duration(metrics.build_cost); @@ -1010,11 +1028,30 @@ pub(crate) async fn scan_file_ranges( part_metrics.inc_num_file_ranges(ranges.len()); part_metrics.merge_reader_metrics(&reader_metrics, None); + // Creates initial per-file metrics with build_part_cost. + let init_per_file_metrics = if part_metrics.explain_verbose() { + let file = stream_ctx.input.file_from_index(index); + let file_id = file.file_id(); + + let mut map = HashMap::new(); + map.insert( + file_id, + FileScanMetrics { + build_part_cost: reader_metrics.build_cost, + ..Default::default() + }, + ); + Some(map) + } else { + None + }; + Ok(build_file_range_scan_stream( stream_ctx, part_metrics, read_type, ranges, + init_per_file_metrics, )) } @@ -1036,11 +1073,30 @@ pub(crate) async fn scan_flat_file_ranges( part_metrics.inc_num_file_ranges(ranges.len()); part_metrics.merge_reader_metrics(&reader_metrics, None); + // Creates initial per-file metrics with build_part_cost. + let init_per_file_metrics = if part_metrics.explain_verbose() { + let file = stream_ctx.input.file_from_index(index); + let file_id = file.file_id(); + + let mut map = HashMap::new(); + map.insert( + file_id, + FileScanMetrics { + build_part_cost: reader_metrics.build_cost, + ..Default::default() + }, + ); + Some(map) + } else { + None + }; + Ok(build_flat_file_range_scan_stream( stream_ctx, part_metrics, read_type, ranges, + init_per_file_metrics, )) } @@ -1050,6 +1106,7 @@ pub fn build_file_range_scan_stream( part_metrics: PartitionMetrics, read_type: &'static str, ranges: SmallVec<[FileRange; 2]>, + mut per_file_metrics: Option>, ) -> impl Stream> { try_stream! { let fetch_metrics = if part_metrics.explain_verbose() { @@ -1057,11 +1114,6 @@ pub fn build_file_range_scan_stream( } else { None }; - let mut per_file_metrics = if part_metrics.explain_verbose() { - Some(HashMap::::new()) - } else { - None - }; let reader_metrics = &mut ReaderMetrics { fetch_metrics: fetch_metrics.clone(), ..Default::default() @@ -1087,11 +1139,11 @@ pub fn build_file_range_scan_stream( let file_id = range.file_handle().file_id(); let file_metrics = file_metrics_map .entry(file_id) - .or_insert_with(FileMetrics::default); + .or_insert_with(FileScanMetrics::default); file_metrics.num_ranges += 1; file_metrics.num_rows += prune_metrics.num_rows; - file_metrics.build_cost += build_cost; + file_metrics.build_reader_cost += build_cost; file_metrics.scan_cost += prune_metrics.scan_cost; } @@ -1112,6 +1164,7 @@ pub fn build_flat_file_range_scan_stream( part_metrics: PartitionMetrics, read_type: &'static str, ranges: SmallVec<[FileRange; 2]>, + mut per_file_metrics: Option>, ) -> impl Stream> { try_stream! { let fetch_metrics = if part_metrics.explain_verbose() { @@ -1119,11 +1172,6 @@ pub fn build_flat_file_range_scan_stream( } else { None }; - let mut per_file_metrics = if part_metrics.explain_verbose() { - Some(HashMap::::new()) - } else { - None - }; let reader_metrics = &mut ReaderMetrics { fetch_metrics: fetch_metrics.clone(), ..Default::default() @@ -1158,11 +1206,11 @@ pub fn build_flat_file_range_scan_stream( let file_id = range.file_handle().file_id(); let file_metrics = file_metrics_map .entry(file_id) - .or_insert_with(FileMetrics::default); + .or_insert_with(FileScanMetrics::default); file_metrics.num_ranges += 1; file_metrics.num_rows += prune_metrics.num_rows; - file_metrics.build_cost += build_cost; + file_metrics.build_reader_cost += build_cost; file_metrics.scan_cost += prune_metrics.scan_cost; }