diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 2b54fc3d87..09c59693cc 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -56,7 +56,6 @@ use crate::error::{ ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result, }; use crate::memtable::BoxedBatchIterator; -use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; use crate::read::prune::PruneReader; use crate::row_converter::{CompositeValues, PrimaryKeyCodec}; @@ -991,13 +990,11 @@ impl BatchReader for Box { } } -/// Metrics for scanners. +/// Local metrics for scanners. #[derive(Debug, Default)] pub(crate) struct ScannerMetrics { /// Duration to prepare the scan task. prepare_scan_cost: Duration, - /// Duration to build file ranges. - build_parts_cost: Duration, /// Duration to build the (merge) reader. build_reader_cost: Duration, /// Duration to scan data. @@ -1006,8 +1003,6 @@ pub(crate) struct ScannerMetrics { convert_cost: Duration, /// Duration while waiting for `yield`. yield_cost: Duration, - /// Duration of the scan. - total_cost: Duration, /// Number of batches returned. num_batches: usize, /// Number of rows returned. @@ -1018,50 +1013,6 @@ pub(crate) struct ScannerMetrics { num_file_ranges: usize, } -impl ScannerMetrics { - /// Observes metrics. - fn observe_metrics(&self) { - READ_STAGE_ELAPSED - .with_label_values(&["prepare_scan"]) - .observe(self.prepare_scan_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["build_parts"]) - .observe(self.build_parts_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["build_reader"]) - .observe(self.build_reader_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["convert_rb"]) - .observe(self.convert_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["scan"]) - .observe(self.scan_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["yield"]) - .observe(self.yield_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["total"]) - .observe(self.total_cost.as_secs_f64()); - READ_ROWS_RETURN.observe(self.num_rows as f64); - READ_BATCHES_RETURN.observe(self.num_batches as f64); - } - - /// Merges metrics from another [ScannerMetrics]. - fn merge_from(&mut self, other: &ScannerMetrics) { - self.prepare_scan_cost += other.prepare_scan_cost; - self.build_parts_cost += other.build_parts_cost; - self.build_reader_cost += other.build_reader_cost; - self.scan_cost += other.scan_cost; - self.convert_cost += other.convert_cost; - self.yield_cost += other.yield_cost; - self.total_cost += other.total_cost; - self.num_batches += other.num_batches; - self.num_rows += other.num_rows; - self.num_mem_ranges += other.num_mem_ranges; - self.num_file_ranges += other.num_file_ranges; - } -} - #[cfg(test)] mod tests { use store_api::codec::PrimaryKeyEncoding; diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 8fa48d6c62..4a211f7117 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -14,22 +14,280 @@ //! Utilities for scanners. +use std::fmt; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use async_stream::try_stream; use common_telemetry::debug; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; use futures::Stream; use prometheus::IntGauge; use store_api::storage::RegionId; use crate::error::Result; -use crate::metrics::IN_PROGRESS_SCAN; +use crate::metrics::{ + IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL, + READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, +}; use crate::read::range::{RangeBuilderList, RowGroupIndex}; use crate::read::scan_region::StreamContext; use crate::read::{Batch, ScannerMetrics, Source}; use crate::sst::file::FileTimeRange; -use crate::sst::parquet::reader::ReaderMetrics; +use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; + +/// Verbose scan metrics for a partition. +#[derive(Default)] +struct ScanMetricsSet { + /// Duration to prepare the scan task. + prepare_scan_cost: Duration, + /// Duration to build the (merge) reader. + build_reader_cost: Duration, + /// Duration to scan data. + scan_cost: Duration, + /// Duration to convert batches. + convert_cost: Duration, + /// Duration while waiting for `yield`. + yield_cost: Duration, + /// Duration of the scan. + total_cost: Duration, + /// Number of rows returned. + num_rows: usize, + /// Number of batches returned. + num_batches: usize, + /// Number of mem ranges scanned. + num_mem_ranges: usize, + /// Number of file ranges scanned. + num_file_ranges: usize, + + // SST related metrics: + /// Duration to build file ranges. + build_parts_cost: Duration, + /// Number of row groups before filtering. + rg_total: usize, + /// Number of row groups filtered by fulltext index. + rg_fulltext_filtered: usize, + /// Number of row groups filtered by inverted index. + rg_inverted_filtered: usize, + /// Number of row groups filtered by min-max index. + rg_minmax_filtered: usize, + /// Number of row groups filtered by bloom filter index. + rg_bloom_filtered: usize, + /// Number of rows in row group before filtering. + rows_before_filter: usize, + /// Number of rows in row group filtered by fulltext index. + rows_fulltext_filtered: usize, + /// Number of rows in row group filtered by inverted index. + rows_inverted_filtered: usize, + /// Number of rows in row group filtered by bloom filter index. + rows_bloom_filtered: usize, + /// Number of rows filtered by precise filter. + rows_precise_filtered: usize, + /// Number of record batches read from SST. + num_sst_record_batches: usize, + /// Number of batches decoded from SST. + num_sst_batches: usize, + /// Number of rows read from SST. + num_sst_rows: usize, + + /// Elapsed time before the first poll operation. + first_poll: Duration, +} + +impl fmt::Debug for ScanMetricsSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let ScanMetricsSet { + prepare_scan_cost, + build_reader_cost, + scan_cost, + convert_cost, + yield_cost, + total_cost, + num_rows, + num_batches, + num_mem_ranges, + num_file_ranges, + build_parts_cost, + rg_total, + rg_fulltext_filtered, + rg_inverted_filtered, + rg_minmax_filtered, + rg_bloom_filtered, + rows_before_filter, + rows_fulltext_filtered, + rows_inverted_filtered, + rows_bloom_filtered, + rows_precise_filtered, + num_sst_record_batches, + num_sst_batches, + num_sst_rows, + first_poll, + } = self; + + write!( + f, + "{{prepare_scan_cost={prepare_scan_cost:?}, \ + build_reader_cost={build_reader_cost:?}, \ + scan_cost={scan_cost:?}, \ + convert_cost={convert_cost:?}, \ + yield_cost={yield_cost:?}, \ + total_cost={total_cost:?}, \ + num_rows={num_rows}, \ + num_batches={num_batches}, \ + num_mem_ranges={num_mem_ranges}, \ + num_file_ranges={num_file_ranges}, \ + build_parts_cost={build_parts_cost:?}, \ + rg_total={rg_total}, \ + rg_fulltext_filtered={rg_fulltext_filtered}, \ + rg_inverted_filtered={rg_inverted_filtered}, \ + rg_minmax_filtered={rg_minmax_filtered}, \ + rg_bloom_filtered={rg_bloom_filtered}, \ + rows_before_filter={rows_before_filter}, \ + rows_fulltext_filtered={rows_fulltext_filtered}, \ + rows_inverted_filtered={rows_inverted_filtered}, \ + rows_bloom_filtered={rows_bloom_filtered}, \ + rows_precise_filtered={rows_precise_filtered}, \ + num_sst_record_batches={num_sst_record_batches}, \ + num_sst_batches={num_sst_batches}, \ + num_sst_rows={num_sst_rows}, \ + first_poll={first_poll:?}}}" + ) + } +} +impl ScanMetricsSet { + /// Attaches the `prepare_scan_cost` to the metrics set. + fn with_prepare_scan_cost(mut self, cost: Duration) -> Self { + self.prepare_scan_cost += cost; + self + } + + /// Merges the local scanner metrics. + fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) { + let ScannerMetrics { + prepare_scan_cost, + build_reader_cost, + scan_cost, + convert_cost, + yield_cost, + num_batches, + num_rows, + num_mem_ranges, + num_file_ranges, + } = other; + + self.prepare_scan_cost += *prepare_scan_cost; + self.build_reader_cost += *build_reader_cost; + self.scan_cost += *scan_cost; + self.convert_cost += *convert_cost; + self.yield_cost += *yield_cost; + self.num_rows += *num_rows; + self.num_batches += *num_batches; + self.num_mem_ranges += *num_mem_ranges; + self.num_file_ranges += *num_file_ranges; + } + + /// Merges the local reader metrics. + fn merge_reader_metrics(&mut self, other: &ReaderMetrics) { + let ReaderMetrics { + build_cost, + filter_metrics: + ReaderFilterMetrics { + rg_total, + rg_fulltext_filtered, + rg_inverted_filtered, + rg_minmax_filtered, + rg_bloom_filtered, + rows_total, + rows_fulltext_filtered, + rows_inverted_filtered, + rows_bloom_filtered, + rows_precise_filtered, + }, + num_record_batches, + num_batches, + num_rows, + scan_cost: _, + } = other; + + self.build_parts_cost += *build_cost; + + self.rg_total += *rg_total; + self.rg_fulltext_filtered += *rg_fulltext_filtered; + self.rg_inverted_filtered += *rg_inverted_filtered; + self.rg_minmax_filtered += *rg_minmax_filtered; + self.rg_bloom_filtered += *rg_bloom_filtered; + + self.rows_before_filter += *rows_total; + self.rows_fulltext_filtered += *rows_fulltext_filtered; + self.rows_inverted_filtered += *rows_inverted_filtered; + self.rows_bloom_filtered += *rows_bloom_filtered; + self.rows_precise_filtered += *rows_precise_filtered; + + self.num_sst_record_batches += *num_record_batches; + self.num_sst_batches += *num_batches; + self.num_sst_rows += *num_rows; + } + + /// Observes metrics. + fn observe_metrics(&self) { + READ_STAGE_ELAPSED + .with_label_values(&["prepare_scan"]) + .observe(self.prepare_scan_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["build_reader"]) + .observe(self.build_reader_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["convert_rb"]) + .observe(self.convert_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["scan"]) + .observe(self.scan_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["yield"]) + .observe(self.yield_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["total"]) + .observe(self.total_cost.as_secs_f64()); + READ_ROWS_RETURN.observe(self.num_rows as f64); + READ_BATCHES_RETURN.observe(self.num_batches as f64); + + READ_STAGE_ELAPSED + .with_label_values(&["build_parts"]) + .observe(self.build_parts_cost.as_secs_f64()); + + READ_ROW_GROUPS_TOTAL + .with_label_values(&["before_filtering"]) + .inc_by(self.rg_total as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["fulltext_index_filtered"]) + .inc_by(self.rg_fulltext_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["inverted_index_filtered"]) + .inc_by(self.rg_inverted_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["minmax_index_filtered"]) + .inc_by(self.rg_minmax_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["bloom_filter_index_filtered"]) + .inc_by(self.rg_bloom_filtered as u64); + + PRECISE_FILTER_ROWS_TOTAL + .with_label_values(&["parquet"]) + .inc_by(self.rows_precise_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["before_filtering"]) + .inc_by(self.rows_before_filter as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["fulltext_index_filtered"]) + .inc_by(self.rows_fulltext_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["inverted_index_filtered"]) + .inc_by(self.rows_inverted_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["bloom_filter_index_filtered"]) + .inc_by(self.rows_bloom_filtered as u64); + } +} struct PartitionMetricsInner { region_id: RegionId, @@ -39,38 +297,71 @@ struct PartitionMetricsInner { scanner_type: &'static str, /// Query start time. query_start: Instant, - /// Elapsed time before the first poll operation. - first_poll: Duration, - metrics: ScannerMetrics, - reader_metrics: ReaderMetrics, + /// Verbose scan metrics that only log to debug logs by default. + metrics: Mutex, in_progress_scan: IntGauge, + + // Normal metrics that always report to the [ExecutionPlanMetricsSet]: + /// Duration to build file ranges. + build_parts_cost: Time, + /// Duration to build the (merge) reader. + build_reader_cost: Time, + /// Duration to scan data. + scan_cost: Time, + /// Duration while waiting for `yield`. + yield_cost: Time, } impl PartitionMetricsInner { - fn on_finish(&mut self) { - if self.metrics.total_cost.is_zero() { - self.metrics.total_cost = self.query_start.elapsed(); + fn on_finish(&self) { + let mut metrics = self.metrics.lock().unwrap(); + if metrics.total_cost.is_zero() { + metrics.total_cost = self.query_start.elapsed(); } - self.metrics.build_parts_cost = self.reader_metrics.build_cost; } } impl Drop for PartitionMetricsInner { fn drop(&mut self) { self.on_finish(); - self.metrics.observe_metrics(); + let metrics = self.metrics.lock().unwrap(); + metrics.observe_metrics(); self.in_progress_scan.dec(); debug!( - "{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}", - self.scanner_type, self.region_id, self.partition, self.first_poll, self.metrics, self.reader_metrics + "{} finished, region_id: {}, partition: {}, metrics: {:?}", + self.scanner_type, self.region_id, self.partition, metrics ); } } +/// List of PartitionMetrics. +#[derive(Default)] +pub(crate) struct PartitionMetricsList(Mutex>>); + +impl PartitionMetricsList { + /// Sets a new [PartitionMetrics] at the specified partition. + pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) { + let mut list = self.0.lock().unwrap(); + if list.len() <= partition { + list.resize(partition + 1, None); + } + list[partition] = Some(metrics); + } + + /// Format verbose metrics for each partition for explain. + pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { + let list = self.0.lock().unwrap(); + write!(f, ", metrics_per_partition: ")?; + f.debug_list() + .entries(list.iter().filter_map(|p| p.as_ref())) + .finish() + } +} + /// Metrics while reading a partition. #[derive(Clone)] -pub(crate) struct PartitionMetrics(Arc>); +pub(crate) struct PartitionMetrics(Arc); impl PartitionMetrics { pub(crate) fn new( @@ -78,57 +369,82 @@ impl PartitionMetrics { partition: usize, scanner_type: &'static str, query_start: Instant, - metrics: ScannerMetrics, + metrics_set: &ExecutionPlanMetricsSet, ) -> Self { let partition_str = partition.to_string(); let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]); in_progress_scan.inc(); + let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed()); let inner = PartitionMetricsInner { region_id, partition, scanner_type, query_start, - first_poll: Duration::default(), - metrics, - reader_metrics: ReaderMetrics::default(), + metrics: Mutex::new(metrics), in_progress_scan, + build_parts_cost: MetricBuilder::new(metrics_set) + .subset_time("build_parts_cost", partition), + build_reader_cost: MetricBuilder::new(metrics_set) + .subset_time("build_reader_cost", partition), + scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition), + yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition), }; - Self(Arc::new(Mutex::new(inner))) + Self(Arc::new(inner)) } pub(crate) fn on_first_poll(&self) { - let mut inner = self.0.lock().unwrap(); - inner.first_poll = inner.query_start.elapsed(); + let mut metrics = self.0.metrics.lock().unwrap(); + metrics.first_poll = self.0.query_start.elapsed(); } pub(crate) fn inc_num_mem_ranges(&self, num: usize) { - let mut inner = self.0.lock().unwrap(); - inner.metrics.num_mem_ranges += num; + let mut metrics = self.0.metrics.lock().unwrap(); + metrics.num_mem_ranges += num; } pub(crate) fn inc_num_file_ranges(&self, num: usize) { - let mut inner = self.0.lock().unwrap(); - inner.metrics.num_file_ranges += num; + let mut metrics = self.0.metrics.lock().unwrap(); + metrics.num_file_ranges += num; } + /// Merges `build_reader_cost`. pub(crate) fn inc_build_reader_cost(&self, cost: Duration) { - let mut inner = self.0.lock().unwrap(); - inner.metrics.build_reader_cost += cost; + self.0.build_reader_cost.add_duration(cost); + + let mut metrics = self.0.metrics.lock().unwrap(); + metrics.build_reader_cost += cost; } + /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`. pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) { - let mut inner = self.0.lock().unwrap(); - inner.metrics.merge_from(metrics); + self.0 + .build_reader_cost + .add_duration(metrics.build_reader_cost); + self.0.scan_cost.add_duration(metrics.scan_cost); + self.0.yield_cost.add_duration(metrics.yield_cost); + + let mut metrics_set = self.0.metrics.lock().unwrap(); + metrics_set.merge_scanner_metrics(metrics); } + /// Merges [ReaderMetrics] and `build_reader_cost`. pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) { - let mut inner = self.0.lock().unwrap(); - inner.reader_metrics.merge_from(metrics); + self.0.build_parts_cost.add_duration(metrics.build_cost); + + let mut metrics_set = self.0.metrics.lock().unwrap(); + metrics_set.merge_reader_metrics(metrics); } + /// Finishes the query. pub(crate) fn on_finish(&self) { - let mut inner = self.0.lock().unwrap(); - inner.on_finish(); + self.0.on_finish(); + } +} + +impl fmt::Debug for PartitionMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let metrics = self.0.metrics.lock().unwrap(); + write!(f, "[partition={}, {:?}]", self.0.partition, metrics) } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 31733dc2f1..ee56fd9d40 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -24,6 +24,7 @@ use common_recordbatch::error::ExternalSnafu; use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::tracing; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use snafu::ResultExt; @@ -38,7 +39,9 @@ use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; -use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; +use crate::read::scan_util::{ + scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList, +}; use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source}; use crate::region::options::MergeMode; @@ -53,6 +56,9 @@ pub struct SeqScan { stream_ctx: Arc, /// The scanner is used for compaction. compaction: bool, + /// Metrics for each partition. + /// The scanner only sets in query and keeps it empty during compaction. + metrics_list: PartitionMetricsList, } impl SeqScan { @@ -69,6 +75,7 @@ impl SeqScan { properties, stream_ctx, compaction, + metrics_list: PartitionMetricsList::default(), } } @@ -77,8 +84,9 @@ impl SeqScan { /// The returned stream is not partitioned and will contains all the data. If want /// partitioned scan, use [`RegionScanner::scan_partition`]. pub fn build_stream(&self) -> Result { + let metrics_set = ExecutionPlanMetricsSet::new(); let streams = (0..self.properties.partitions.len()) - .map(|partition: usize| self.scan_partition(partition)) + .map(|partition: usize| self.scan_partition(&metrics_set, partition)) .collect::, _>>()?; let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?; @@ -92,16 +100,8 @@ impl SeqScan { pub async fn build_reader_for_compaction(&self) -> Result { assert!(self.compaction); - let part_metrics = PartitionMetrics::new( - self.stream_ctx.input.mapper.metadata().region_id, - 0, - get_scanner_type(self.compaction), - self.stream_ctx.query_start, - ScannerMetrics { - prepare_scan_cost: self.stream_ctx.query_start.elapsed(), - ..Default::default() - }, - ); + let metrics_set = ExecutionPlanMetricsSet::new(); + let part_metrics = self.new_partition_metrics(&metrics_set, 0); debug_assert_eq!(1, self.properties.partitions.len()); let partition_ranges = &self.properties.partitions[0]; @@ -194,6 +194,7 @@ impl SeqScan { /// Otherwise the returned stream might not contains any data. fn scan_partition_impl( &self, + metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { if partition >= self.properties.partitions.len() { @@ -207,7 +208,7 @@ impl SeqScan { } if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) { - return self.scan_partition_by_series(partition); + return self.scan_partition_by_series(metrics_set, partition); } let stream_ctx = self.stream_ctx.clone(); @@ -215,7 +216,7 @@ impl SeqScan { let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; let distinguish_range = self.properties.distinguish_partition_range; - let part_metrics = self.new_partition_metrics(partition); + let part_metrics = self.new_partition_metrics(metrics_set, partition); let stream = try_stream! { part_metrics.on_first_poll(); @@ -310,13 +311,14 @@ impl SeqScan { /// Otherwise the returned stream might not contains any data. fn scan_partition_by_series( &self, + metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { let stream_ctx = self.stream_ctx.clone(); let semaphore = self.new_semaphore(); let partition_ranges = self.properties.partitions[partition].clone(); let distinguish_range = self.properties.distinguish_partition_range; - let part_metrics = self.new_partition_metrics(partition); + let part_metrics = self.new_partition_metrics(metrics_set, partition); debug_assert!(!self.compaction); let stream = try_stream! { @@ -411,17 +413,26 @@ impl SeqScan { } } - fn new_partition_metrics(&self, partition: usize) -> PartitionMetrics { - PartitionMetrics::new( + /// Creates a new partition metrics instance. + /// Sets the partition metrics for the given partition if it is not for compaction. + fn new_partition_metrics( + &self, + metrics_set: &ExecutionPlanMetricsSet, + partition: usize, + ) -> PartitionMetrics { + let metrics = PartitionMetrics::new( self.stream_ctx.input.mapper.metadata().region_id, partition, get_scanner_type(self.compaction), self.stream_ctx.query_start, - ScannerMetrics { - prepare_scan_cost: self.stream_ctx.query_start.elapsed(), - ..Default::default() - }, - ) + metrics_set, + ); + + if !self.compaction { + self.metrics_list.set(partition, metrics.clone()); + } + + metrics } } @@ -438,8 +449,12 @@ impl RegionScanner for SeqScan { self.stream_ctx.input.mapper.metadata().clone() } - fn scan_partition(&self, partition: usize) -> Result { - self.scan_partition_impl(partition) + fn scan_partition( + &self, + metrics_set: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result { + self.scan_partition_impl(metrics_set, partition) } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { @@ -466,7 +481,10 @@ impl DisplayAs for SeqScan { )?; match t { DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f), - DisplayFormatType::Verbose => self.stream_ctx.format_for_explain(true, f), + DisplayFormatType::Verbose => { + self.stream_ctx.format_for_explain(true, f)?; + self.metrics_list.format_verbose_metrics(f) + } } } } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 0fc1d8e4b4..3724075998 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -22,6 +22,7 @@ use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; @@ -32,7 +33,9 @@ use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties} use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; -use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; +use crate::read::scan_util::{ + scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList, +}; use crate::read::{Batch, ScannerMetrics}; /// Scans a region without providing any output ordering guarantee. @@ -43,6 +46,8 @@ pub struct UnorderedScan { properties: ScannerProperties, /// Context of streams. stream_ctx: Arc, + /// Metrics for each partition. + metrics_list: PartitionMetricsList, } impl UnorderedScan { @@ -57,14 +62,16 @@ impl UnorderedScan { Self { properties, stream_ctx, + metrics_list: PartitionMetricsList::default(), } } /// Scans the region and returns a stream. pub(crate) async fn build_stream(&self) -> Result { + let metrics_set = ExecutionPlanMetricsSet::new(); let part_num = self.properties.num_partitions(); let streams = (0..part_num) - .map(|i| self.scan_partition(i)) + .map(|i| self.scan_partition(&metrics_set, i)) .collect::, BoxedError>>()?; let stream = stream! { for mut stream in streams { @@ -119,6 +126,7 @@ impl UnorderedScan { fn scan_partition_impl( &self, + metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { if partition >= self.properties.partitions.len() { @@ -136,11 +144,9 @@ impl UnorderedScan { partition, "UnorderedScan", self.stream_ctx.query_start, - ScannerMetrics { - prepare_scan_cost: self.stream_ctx.query_start.elapsed(), - ..Default::default() - }, + metrics_set, ); + self.metrics_list.set(partition, part_metrics.clone()); let stream_ctx = self.stream_ctx.clone(); let part_ranges = self.properties.partitions[partition].clone(); let distinguish_range = self.properties.distinguish_partition_range; @@ -239,8 +245,12 @@ impl RegionScanner for UnorderedScan { Ok(()) } - fn scan_partition(&self, partition: usize) -> Result { - self.scan_partition_impl(partition) + fn scan_partition( + &self, + metrics_set: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result { + self.scan_partition_impl(metrics_set, partition) } fn has_predicate(&self) -> bool { @@ -262,7 +272,10 @@ impl DisplayAs for UnorderedScan { )?; match t { DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f), - DisplayFormatType::Verbose => self.stream_ctx.format_for_explain(true, f), + DisplayFormatType::Verbose => { + self.stream_ctx.format_for_explain(true, f)?; + self.metrics_list.format_verbose_metrics(f) + } } } } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index d6a5aac4cc..1dc161b228 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -26,6 +26,7 @@ use common_error::ext::{BoxedError, PlainError}; use common_error::status_code::StatusCode; use common_recordbatch::SendableRecordBatchStream; use common_time::Timestamp; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::future::join_all; @@ -342,7 +343,11 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// /// # Panics /// Panics if the `partition` is out of bound. - fn scan_partition(&self, partition: usize) -> Result; + fn scan_partition( + &self, + metrics_set: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result; /// Check if there is any predicate that may be executed in this scanner. fn has_predicate(&self) -> bool; @@ -562,7 +567,11 @@ impl RegionScanner for SinglePartitionScanner { Ok(()) } - fn scan_partition(&self, _partition: usize) -> Result { + fn scan_partition( + &self, + _metrics_set: &ExecutionPlanMetricsSet, + _partition: usize, + ) -> Result { let mut stream = self.stream.lock().unwrap(); stream.take().ok_or_else(|| { BoxedError::new(PlainError::new( diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 9c9435d141..c72df1f1ac 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -276,7 +276,7 @@ impl ExecutionPlan for RegionScanExec { .scanner .lock() .unwrap() - .scan_partition(partition) + .scan_partition(&self.metric, partition) .map_err(|e| DataFusionError::External(Box::new(e)))?; let stream_metrics = StreamMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper { diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index b52ba04d13..a61246b25e 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -105,7 +105,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)] REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+