From 1df605ec4b4769d6ed4625db0a6fcf5ae361cc7c Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 30 Jul 2025 17:23:32 +0800 Subject: [PATCH] feat: more logs and metrics under explain verbose mode (#6575) * feat: collect region metrics Signed-off-by: evenyag * chore: log in info level Signed-off-by: evenyag * feat: add CoalescePartitionsExec to explain Signed-off-by: evenyag * fix: finish metrics in partition and add sender full to metrics Signed-off-by: evenyag * feat: add eof flag on finish Signed-off-by: evenyag * fix: output cost as string Signed-off-by: evenyag * feat: log on stream done Signed-off-by: evenyag * chore: region id as string Signed-off-by: evenyag * chore: enlarge send channel size Signed-off-by: evenyag * feat: more log in flight and scan Signed-off-by: evenyag * chore: logs about rows/batches/bytes Signed-off-by: evenyag * chore: enlarge channel size Signed-off-by: evenyag * chore: remote read only log in verbose Signed-off-by: evenyag * chore: revert channel change Signed-off-by: evenyag * refactor: get explain verbose in RegionScanExec Signed-off-by: evenyag * feat: print scan log in verbose mode Signed-off-by: evenyag * refactor: collect region metrics after finishing one region Signed-off-by: evenyag * refactor: define StreamMetrics and log in verbose mode Signed-off-by: evenyag * feat: only log non zero filter and distributor metrics Signed-off-by: evenyag * chore: revert displaying CoalescePartitions in explain Signed-off-by: evenyag * feat: collect memtable metrics in partition metrics Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/datanode/src/region_server.rs | 16 +- src/mito2/src/engine/scan_test.rs | 4 +- src/mito2/src/memtable.rs | 47 ++++- src/mito2/src/memtable/partition_tree.rs | 13 +- src/mito2/src/memtable/partition_tree/tree.rs | 22 +++ .../src/memtable/simple_bulk_memtable.rs | 24 ++- .../simple_bulk_memtable/test_only.rs | 21 ++- src/mito2/src/memtable/time_series.rs | 35 +++- src/mito2/src/read/scan_util.rs | 161 ++++++++++++++--- src/mito2/src/read/seq_scan.rs | 28 ++- src/mito2/src/read/series_scan.rs | 49 ++++- src/mito2/src/read/unordered_scan.rs | 26 ++- src/query/src/datafusion.rs | 24 ++- src/query/src/dist_plan/merge_scan.rs | 171 +++++++++++++++++- src/query/src/dummy_catalog.rs | 19 +- src/servers/src/grpc/flight/stream.rs | 86 ++++++++- src/store-api/src/region_engine.rs | 9 + src/table/src/table/scan.rs | 16 +- 18 files changed, 680 insertions(+), 91 deletions(-) diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 0b5d1f8590..13354f5bff 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -174,7 +174,7 @@ impl RegionServer { async fn table_provider( &self, region_id: RegionId, - ctx: Option<&session::context::QueryContext>, + ctx: Option, ) -> Result> { let status = self .inner @@ -207,9 +207,15 @@ impl RegionServer { }; let region_id = RegionId::from_u64(request.region_id); - let provider = self.table_provider(region_id, Some(&query_ctx)).await?; + let provider = self + .table_provider(region_id, Some(query_ctx.clone())) + .await?; let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider)); + if query_ctx.explain_verbose() { + common_telemetry::info!("Handle remote read for region: {}", region_id); + } + let decoder = self .inner .query_engine @@ -243,10 +249,12 @@ impl RegionServer { }; let ctx: Option = request.header.as_ref().map(|h| h.into()); - - let provider = self.table_provider(request.region_id, ctx.as_ref()).await?; let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build())); + let provider = self + .table_provider(request.region_id, Some(query_ctx.clone())) + .await?; + struct RegionDataSourceInjector { source: Arc, } diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index 4391bbabba..81029ad13c 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -225,7 +225,9 @@ async fn test_series_scan() { let mut partition_batches = vec![vec![]; 3]; let mut streams: Vec<_> = (0..3) .map(|partition| { - let stream = scanner.scan_partition(&metrics_set, partition).unwrap(); + let stream = scanner + .scan_partition(&Default::default(), &metrics_set, partition) + .unwrap(); Some(stream) }) .collect(); diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 6dceeb5faf..cf2aa4c576 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -17,7 +17,8 @@ use std::collections::BTreeMap; use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; pub use bulk::part::EncodedBulkPart; use common_time::Timestamp; @@ -354,11 +355,43 @@ impl MemtableBuilderProvider { } } +/// Metrics for scanning a memtable. +#[derive(Clone, Default)] +pub struct MemScanMetrics(Arc>); + +impl MemScanMetrics { + /// Merges the metrics. + pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) { + let mut metrics = self.0.lock().unwrap(); + metrics.total_series += inner.total_series; + metrics.num_rows += inner.num_rows; + metrics.num_batches += inner.num_batches; + metrics.scan_cost += inner.scan_cost; + } + + /// Gets the metrics data. + pub(crate) fn data(&self) -> MemScanMetricsData { + self.0.lock().unwrap().clone() + } +} + +#[derive(Clone, Default)] +pub(crate) struct MemScanMetricsData { + /// Total series in the memtable. + pub(crate) total_series: usize, + /// Number of rows read. + pub(crate) num_rows: usize, + /// Number of batch read. + pub(crate) num_batches: usize, + /// Duration to scan the memtable. + pub(crate) scan_cost: Duration, +} + /// Builder to build an iterator to read the range. /// The builder should know the projection and the predicate to build the iterator. pub trait IterBuilder: Send + Sync { /// Returns the iterator to read the range. - fn build(&self) -> Result; + fn build(&self, metrics: Option) -> Result; } pub type BoxedIterBuilder = Box; @@ -410,8 +443,12 @@ impl MemtableRange { /// Builds an iterator to read the range. /// Filters the result by the specific time range, this ensures memtable won't return /// rows out of the time range when new rows are inserted. - pub fn build_prune_iter(&self, time_range: FileTimeRange) -> Result { - let iter = self.context.builder.build()?; + pub fn build_prune_iter( + &self, + time_range: FileTimeRange, + metrics: Option, + ) -> Result { + let iter = self.context.builder.build(metrics)?; let time_filters = self.context.predicate.time_filters(); Ok(Box::new(PruneTimeIterator::new( iter, @@ -422,7 +459,7 @@ impl MemtableRange { /// Builds an iterator to read all rows in range. pub fn build_iter(&self) -> Result { - self.context.builder.build() + self.context.builder.build(None) } pub fn num_rows(&self) -> usize { diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 53fa6a2f6a..50f8e4cc8e 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -41,9 +41,9 @@ use crate::memtable::bulk::part::BulkPart; use crate::memtable::partition_tree::tree::PartitionTree; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, - MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, - PredicateGroup, + AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, + MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, + MemtableStats, PredicateGroup, }; use crate::region::options::MergeMode; @@ -183,7 +183,7 @@ impl Memtable for PartitionTreeMemtable { predicate: Option, sequence: Option, ) -> Result { - self.tree.read(projection, predicate, sequence) + self.tree.read(projection, predicate, sequence, None) } fn ranges( @@ -315,7 +315,7 @@ impl PartitionTreeMemtable { predicate: Option, sequence: Option, ) -> Result { - self.tree.read(projection, predicate, sequence) + self.tree.read(projection, predicate, sequence, None) } } @@ -360,11 +360,12 @@ struct PartitionTreeIterBuilder { } impl IterBuilder for PartitionTreeIterBuilder { - fn build(&self) -> Result { + fn build(&self, metrics: Option) -> Result { self.tree.read( self.projection.as_deref(), self.predicate.clone(), self.sequence, + metrics, ) } } diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index 909714ca92..09924469c8 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -230,6 +230,7 @@ impl PartitionTree { projection: Option<&[ColumnId]>, predicate: Option, sequence: Option, + mem_scan_metrics: Option, ) -> Result { let start = Instant::now(); // Creates the projection set. @@ -257,6 +258,7 @@ impl PartitionTree { partitions, current_reader: None, metrics: tree_iter_metric, + mem_scan_metrics, }; let context = ReadPartitionContext::new( self.metadata.clone(), @@ -467,10 +469,28 @@ struct TreeIter { partitions: VecDeque, current_reader: Option, metrics: TreeIterMetrics, + mem_scan_metrics: Option, +} + +impl TreeIter { + fn report_mem_scan_metrics(&mut self) { + if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() { + let inner = crate::memtable::MemScanMetricsData { + total_series: 0, // This is unavailable. + num_rows: self.metrics.rows_fetched, + num_batches: self.metrics.batches_fetched, + scan_cost: self.metrics.iter_elapsed, + }; + mem_scan_metrics.merge_inner(&inner); + } + } } impl Drop for TreeIter { fn drop(&mut self) { + // Report MemScanMetrics if not already reported + self.report_mem_scan_metrics(); + READ_ROWS_TOTAL .with_label_values(&["partition_tree_memtable"]) .inc_by(self.metrics.rows_fetched as u64); @@ -523,6 +543,8 @@ impl TreeIter { /// Fetches next batch. fn next_batch(&mut self) -> Result> { let Some(part_reader) = &mut self.current_reader else { + // Report MemScanMetrics before returning None + self.report_mem_scan_metrics(); return Ok(None); }; diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 19b6a9c396..cb49974513 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -19,6 +19,7 @@ use std::collections::HashSet; use std::fmt::{Debug, Formatter}; use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; use api::v1::OpType; use datatypes::vectors::Helper; @@ -33,8 +34,8 @@ use crate::memtable::bulk::part::BulkPart; use crate::memtable::stats::WriteMetrics; use crate::memtable::time_series::Series; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange, - MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, + AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId, + MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, }; use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT; use crate::read::dedup::LastNonNullIter; @@ -219,7 +220,7 @@ impl Memtable for SimpleBulkMemtable { _predicate: Option, sequence: Option, ) -> error::Result { - let iter = self.create_iter(projection, sequence)?.build()?; + let iter = self.create_iter(projection, sequence)?.build(None)?; if self.merge_mode == MergeMode::LastNonNull { let iter = LastNonNullIter::new(iter); @@ -235,6 +236,7 @@ impl Memtable for SimpleBulkMemtable { predicate: PredicateGroup, sequence: Option, ) -> error::Result { + let start_time = Instant::now(); let projection = Arc::new(self.build_projection(projection)); let values = self.series.read().unwrap().read_to_values(); let contexts = values @@ -263,6 +265,7 @@ impl Memtable for SimpleBulkMemtable { let builder = BatchRangeBuilder { batch, merge_mode: self.merge_mode, + scan_cost: start_time.elapsed(), }; ( num_rows, @@ -346,11 +349,22 @@ impl Memtable for SimpleBulkMemtable { pub struct BatchRangeBuilder { pub batch: Batch, pub merge_mode: MergeMode, + scan_cost: Duration, } impl IterBuilder for BatchRangeBuilder { - fn build(&self) -> error::Result { + fn build(&self, metrics: Option) -> error::Result { let batch = self.batch.clone(); + if let Some(metrics) = metrics { + let inner = crate::memtable::MemScanMetricsData { + total_series: 1, + num_rows: batch.num_rows(), + num_batches: 1, + scan_cost: self.scan_cost, + }; + metrics.merge_inner(&inner); + } + let iter = Iter { batch: Some(Ok(batch)), }; @@ -684,7 +698,7 @@ mod tests { .unwrap(); assert_eq!(ranges.ranges.len(), 1); let range = ranges.ranges.into_values().next().unwrap(); - let mut reader = range.context.builder.build().unwrap(); + let mut reader = range.context.builder.build(None).unwrap(); let mut num_rows = 0; while let Some(b) = reader.next().transpose().unwrap() { diff --git a/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs b/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs index 1850d1e804..f71385d78c 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashSet; +use std::time::Instant; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; @@ -20,7 +21,7 @@ use store_api::storage::{ColumnId, SequenceNumber}; use crate::error; use crate::memtable::simple_bulk_memtable::{Iter, SimpleBulkMemtable}; use crate::memtable::time_series::Values; -use crate::memtable::{BoxedBatchIterator, IterBuilder}; +use crate::memtable::{BoxedBatchIterator, IterBuilder, MemScanMetrics}; use crate::read::dedup::LastNonNullIter; use crate::region::options::MergeMode; @@ -64,7 +65,8 @@ pub(crate) struct BatchIterBuilderDeprecated { } impl IterBuilder for BatchIterBuilderDeprecated { - fn build(&self) -> error::Result { + fn build(&self, metrics: Option) -> error::Result { + let start_time = Instant::now(); let Some(values) = self.values.clone() else { return Ok(Box::new(Iter { batch: None })); }; @@ -78,6 +80,21 @@ impl IterBuilder for BatchIterBuilderDeprecated { .map(Some) .transpose(); + // Collect metrics from the batch + if let Some(metrics) = metrics { + let (num_rows, num_batches) = match &maybe_batch { + Some(Ok(batch)) => (batch.num_rows(), 1), + _ => (0, 0), + }; + let inner = crate::memtable::MemScanMetricsData { + total_series: 1, + num_rows, + num_batches, + scan_cost: start_time.elapsed(), + }; + metrics.merge_inner(&inner); + } + let iter = Iter { batch: maybe_batch }; if self.merge_mode == MergeMode::LastNonNull { diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index b8e1b15898..cdae1abdc6 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -51,9 +51,9 @@ use crate::memtable::bulk::part::BulkPart; use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, - MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, - PredicateGroup, + AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, + MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, + MemtableStats, PredicateGroup, }; use crate::metrics::{ MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL, @@ -279,7 +279,7 @@ impl Memtable for TimeSeriesMemtable { let iter = self .series_set - .iter_series(projection, filters, self.dedup, sequence)?; + .iter_series(projection, filters, self.dedup, sequence, None)?; if self.merge_mode == MergeMode::LastNonNull { let iter = LastNonNullIter::new(iter); @@ -457,6 +457,7 @@ impl SeriesSet { predicate: Option, dedup: bool, sequence: Option, + mem_scan_metrics: Option, ) -> Result { let primary_key_schema = primary_key_schema(&self.region_metadata); let primary_key_datatypes = self @@ -475,6 +476,7 @@ impl SeriesSet { self.codec.clone(), dedup, sequence, + mem_scan_metrics, ) } } @@ -524,6 +526,7 @@ struct Iter { dedup: bool, sequence: Option, metrics: Metrics, + mem_scan_metrics: Option, } impl Iter { @@ -538,6 +541,7 @@ impl Iter { codec: Arc, dedup: bool, sequence: Option, + mem_scan_metrics: Option, ) -> Result { let predicate = predicate .map(|predicate| { @@ -560,8 +564,21 @@ impl Iter { dedup, sequence, metrics: Metrics::default(), + mem_scan_metrics, }) } + + fn report_mem_scan_metrics(&mut self) { + if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() { + let inner = crate::memtable::MemScanMetricsData { + total_series: self.metrics.total_series, + num_rows: self.metrics.num_rows, + num_batches: self.metrics.num_batches, + scan_cost: self.metrics.scan_cost, + }; + mem_scan_metrics.merge_inner(&inner); + } + } } impl Drop for Iter { @@ -571,6 +588,9 @@ impl Drop for Iter { self.metadata.region_id, self.metrics ); + // Report MemScanMetrics if not already reported + self.report_mem_scan_metrics(); + READ_ROWS_TOTAL .with_label_values(&["time_series_memtable"]) .inc_by(self.metrics.num_rows as u64); @@ -631,8 +651,12 @@ impl Iterator for Iter { }); return Some(batch); } + drop(map); // Explicitly drop the read lock self.metrics.scan_cost += start.elapsed(); + // Report MemScanMetrics before returning None + self.report_mem_scan_metrics(); + None } } @@ -1211,12 +1235,13 @@ struct TimeSeriesIterBuilder { } impl IterBuilder for TimeSeriesIterBuilder { - fn build(&self) -> Result { + fn build(&self, metrics: Option) -> Result { let iter = self.series_set.iter_series( self.projection.clone(), self.predicate.clone(), self.dedup, self.sequence, + metrics, )?; if self.merge_mode == MergeMode::LastNonNull { diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index a517274e74..a752301456 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -19,7 +19,6 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use async_stream::try_stream; -use common_telemetry::debug; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; use futures::Stream; use prometheus::IntGauge; @@ -27,6 +26,7 @@ use smallvec::SmallVec; use store_api::storage::RegionId; use crate::error::Result; +use crate::memtable::MemScanMetrics; use crate::metrics::{ IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, @@ -40,7 +40,7 @@ use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; /// Verbose scan metrics for a partition. #[derive(Default)] -struct ScanMetricsSet { +pub(crate) struct ScanMetricsSet { /// Duration to prepare the scan task. prepare_scan_cost: Duration, /// Duration to build the (merge) reader. @@ -60,6 +60,16 @@ struct ScanMetricsSet { /// Number of file ranges scanned. num_file_ranges: usize, + // Memtable related metrics: + /// Duration to scan memtables. + mem_scan_cost: Duration, + /// Number of rows read from memtables. + mem_rows: usize, + /// Number of batches read from memtables. + mem_batches: usize, + /// Number of series read from memtables. + mem_series: usize, + // SST related metrics: /// Duration to build file ranges. build_parts_cost: Duration, @@ -95,6 +105,8 @@ struct ScanMetricsSet { /// Number of send timeout in SeriesScan. num_series_send_timeout: usize, + /// Number of send full in SeriesScan. + num_series_send_full: usize, /// Number of rows the series distributor scanned. num_distributor_rows: usize, /// Number of batches the series distributor scanned. @@ -103,6 +115,9 @@ struct ScanMetricsSet { distributor_scan_cost: Duration, /// Duration of the series distributor to yield. distributor_yield_cost: Duration, + + /// The stream reached EOF + stream_eof: bool, } impl fmt::Debug for ScanMetricsSet { @@ -133,12 +148,19 @@ impl fmt::Debug for ScanMetricsSet { num_sst_rows, first_poll, num_series_send_timeout, + num_series_send_full, num_distributor_rows, num_distributor_batches, distributor_scan_cost, distributor_yield_cost, + stream_eof, + mem_scan_cost, + mem_rows, + mem_batches, + mem_series, } = self; + // Write core metrics write!( f, "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \ @@ -152,25 +174,80 @@ impl fmt::Debug for ScanMetricsSet { \"num_file_ranges\":{num_file_ranges}, \ \"build_parts_cost\":\"{build_parts_cost:?}\", \ \"rg_total\":{rg_total}, \ - \"rg_fulltext_filtered\":{rg_fulltext_filtered}, \ - \"rg_inverted_filtered\":{rg_inverted_filtered}, \ - \"rg_minmax_filtered\":{rg_minmax_filtered}, \ - \"rg_bloom_filtered\":{rg_bloom_filtered}, \ \"rows_before_filter\":{rows_before_filter}, \ - \"rows_fulltext_filtered\":{rows_fulltext_filtered}, \ - \"rows_inverted_filtered\":{rows_inverted_filtered}, \ - \"rows_bloom_filtered\":{rows_bloom_filtered}, \ - \"rows_precise_filtered\":{rows_precise_filtered}, \ \"num_sst_record_batches\":{num_sst_record_batches}, \ \"num_sst_batches\":{num_sst_batches}, \ \"num_sst_rows\":{num_sst_rows}, \ - \"first_poll\":\"{first_poll:?}\", \ - \"num_series_send_timeout\":{num_series_send_timeout}, \ - \"num_distributor_rows\":{num_distributor_rows}, \ - \"num_distributor_batches\":{num_distributor_batches}, \ - \"distributor_scan_cost\":\"{distributor_scan_cost:?}\", \ - \"distributor_yield_cost\":\"{distributor_yield_cost:?}\"}}" - ) + \"first_poll\":\"{first_poll:?}\"" + )?; + + // Write non-zero filter counters + if *rg_fulltext_filtered > 0 { + write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?; + } + if *rg_inverted_filtered > 0 { + write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?; + } + if *rg_minmax_filtered > 0 { + write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?; + } + if *rg_bloom_filtered > 0 { + write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?; + } + if *rows_fulltext_filtered > 0 { + write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?; + } + if *rows_inverted_filtered > 0 { + write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?; + } + if *rows_bloom_filtered > 0 { + write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?; + } + if *rows_precise_filtered > 0 { + write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?; + } + + // Write non-zero distributor metrics + if *num_series_send_timeout > 0 { + write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?; + } + if *num_series_send_full > 0 { + write!(f, ", \"num_series_send_full\":{num_series_send_full}")?; + } + if *num_distributor_rows > 0 { + write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?; + } + if *num_distributor_batches > 0 { + write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?; + } + if !distributor_scan_cost.is_zero() { + write!( + f, + ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\"" + )?; + } + if !distributor_yield_cost.is_zero() { + write!( + f, + ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\"" + )?; + } + + // Write non-zero memtable metrics + if *mem_rows > 0 { + write!(f, ", \"mem_rows\":{mem_rows}")?; + } + if *mem_batches > 0 { + write!(f, ", \"mem_batches\":{mem_batches}")?; + } + if *mem_series > 0 { + write!(f, ", \"mem_series\":{mem_series}")?; + } + if !mem_scan_cost.is_zero() { + write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?; + } + + write!(f, ", \"stream_eof\":{stream_eof}}}") } } impl ScanMetricsSet { @@ -249,6 +326,7 @@ impl ScanMetricsSet { fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) { let SeriesDistributorMetrics { num_series_send_timeout, + num_series_send_full, num_rows, num_batches, scan_cost, @@ -256,6 +334,7 @@ impl ScanMetricsSet { } = distributor_metrics; self.num_series_send_timeout += *num_series_send_timeout; + self.num_series_send_full += *num_series_send_full; self.num_distributor_rows += *num_rows; self.num_distributor_batches += *num_batches; self.distributor_scan_cost += *scan_cost; @@ -328,6 +407,8 @@ struct PartitionMetricsInner { scanner_type: &'static str, /// Query start time. query_start: Instant, + /// Whether to use verbose logging. + explain_verbose: bool, /// Verbose scan metrics that only log to debug logs by default. metrics: Mutex, in_progress_scan: IntGauge, @@ -346,25 +427,35 @@ struct PartitionMetricsInner { } impl PartitionMetricsInner { - fn on_finish(&self) { + fn on_finish(&self, stream_eof: bool) { let mut metrics = self.metrics.lock().unwrap(); if metrics.total_cost.is_zero() { metrics.total_cost = self.query_start.elapsed(); } + if !metrics.stream_eof { + metrics.stream_eof = stream_eof; + } } } impl Drop for PartitionMetricsInner { fn drop(&mut self) { - self.on_finish(); + self.on_finish(false); let metrics = self.metrics.lock().unwrap(); metrics.observe_metrics(); self.in_progress_scan.dec(); - debug!( - "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}", - self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost, - ); + if self.explain_verbose { + common_telemetry::info!( + "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}", + self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost, + ); + } else { + common_telemetry::debug!( + "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}", + self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost, + ); + } } } @@ -403,6 +494,7 @@ impl PartitionMetrics { partition: usize, scanner_type: &'static str, query_start: Instant, + explain_verbose: bool, metrics_set: &ExecutionPlanMetricsSet, ) -> Self { let partition_str = partition.to_string(); @@ -414,6 +506,7 @@ impl PartitionMetrics { partition, scanner_type, query_start, + explain_verbose, metrics: Mutex::new(metrics), in_progress_scan, build_parts_cost: MetricBuilder::new(metrics_set) @@ -454,6 +547,15 @@ impl PartitionMetrics { self.0.convert_cost.add_duration(cost); } + /// Reports memtable scan metrics. + pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) { + let mut metrics = self.0.metrics.lock().unwrap(); + metrics.mem_scan_cost += data.scan_cost; + metrics.mem_rows += data.num_rows; + metrics.mem_batches += data.num_batches; + metrics.mem_series += data.total_series; + } + /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`. pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) { self.0 @@ -476,7 +578,7 @@ impl PartitionMetrics { /// Finishes the query. pub(crate) fn on_finish(&self) { - self.0.on_finish(); + self.0.on_finish(true); } /// Sets the distributor metrics. @@ -502,6 +604,8 @@ impl fmt::Debug for PartitionMetrics { pub(crate) struct SeriesDistributorMetrics { /// Number of send timeout in SeriesScan. pub(crate) num_series_send_timeout: usize, + /// Number of send full in SeriesScan. + pub(crate) num_series_send_full: usize, /// Number of rows the series distributor scanned. pub(crate) num_rows: usize, /// Number of batches the series distributor scanned. @@ -524,13 +628,20 @@ pub(crate) fn scan_mem_ranges( part_metrics.inc_num_mem_ranges(ranges.len()); for range in ranges { let build_reader_start = Instant::now(); - let iter = range.build_prune_iter(time_range)?; + let mem_scan_metrics = Some(MemScanMetrics::default()); + let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?; part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); let mut source = Source::Iter(iter); while let Some(batch) = source.next_batch().await? { yield batch; } + + // Report the memtable scan metrics to partition metrics + if let Some(ref metrics) = mem_scan_metrics { + let data = metrics.data(); + part_metrics.report_mem_scan_metrics(&data); + } } } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 93357da454..d53dc1e8f2 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -29,7 +29,9 @@ use datatypes::schema::SchemaRef; use futures::StreamExt; use snafu::ensure; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties}; +use store_api::region_engine::{ + PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, +}; use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; @@ -87,7 +89,9 @@ impl SeqScan { pub fn build_stream(&self) -> Result { let metrics_set = ExecutionPlanMetricsSet::new(); let streams = (0..self.properties.partitions.len()) - .map(|partition: usize| self.scan_partition(&metrics_set, partition)) + .map(|partition: usize| { + self.scan_partition(&QueryScanContext::default(), &metrics_set, partition) + }) .collect::, _>>()?; let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?; @@ -100,7 +104,7 @@ impl SeqScan { let streams = (0..self.properties.partitions.len()) .map(|partition| { - let metrics = self.new_partition_metrics(&metrics_set, partition); + let metrics = self.new_partition_metrics(false, &metrics_set, partition); self.scan_batch_in_partition(partition, metrics) }) .collect::>>()?; @@ -116,7 +120,7 @@ impl SeqScan { assert!(self.compaction); let metrics_set = ExecutionPlanMetricsSet::new(); - let part_metrics = self.new_partition_metrics(&metrics_set, 0); + let part_metrics = self.new_partition_metrics(false, &metrics_set, 0); debug_assert_eq!(1, self.properties.partitions.len()); let partition_ranges = &self.properties.partitions[0]; @@ -210,10 +214,19 @@ impl SeqScan { /// Otherwise the returned stream might not contains any data. fn scan_partition_impl( &self, + ctx: &QueryScanContext, metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { - let metrics = self.new_partition_metrics(metrics_set, partition); + if ctx.explain_verbose { + common_telemetry::info!( + "SeqScan partition {}, region_id: {}", + partition, + self.stream_ctx.input.region_metadata().region_id + ); + } + + let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition); let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?; @@ -345,6 +358,7 @@ impl SeqScan { /// Sets the partition metrics for the given partition if it is not for compaction. fn new_partition_metrics( &self, + explain_verbose: bool, metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> PartitionMetrics { @@ -353,6 +367,7 @@ impl SeqScan { partition, get_scanner_type(self.compaction), self.stream_ctx.query_start, + explain_verbose, metrics_set, ); @@ -379,10 +394,11 @@ impl RegionScanner for SeqScan { fn scan_partition( &self, + ctx: &QueryScanContext, metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { - self.scan_partition_impl(metrics_set, partition) + self.scan_partition_impl(ctx, metrics_set, partition) .map_err(BoxedError::new) } diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index b031180bfe..1985481b60 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -29,7 +29,9 @@ use futures::StreamExt; use smallvec::{smallvec, SmallVec}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties}; +use store_api::region_engine::{ + PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, +}; use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Semaphore; @@ -87,13 +89,20 @@ impl SeriesScan { fn scan_partition_impl( &self, + ctx: &QueryScanContext, metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { - let metrics = - new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list); + let metrics = new_partition_metrics( + &self.stream_ctx, + ctx.explain_verbose, + metrics_set, + partition, + &self.metrics_list, + ); - let batch_stream = self.scan_batch_in_partition(partition, metrics.clone(), metrics_set)?; + let batch_stream = + self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?; let input = &self.stream_ctx.input; let record_batch_stream = ConvertBatchStream::new( @@ -111,10 +120,19 @@ impl SeriesScan { fn scan_batch_in_partition( &self, + ctx: &QueryScanContext, partition: usize, part_metrics: PartitionMetrics, metrics_set: &ExecutionPlanMetricsSet, ) -> Result { + if ctx.explain_verbose { + common_telemetry::info!( + "SeriesScan partition {}, region_id: {}", + partition, + self.stream_ctx.input.region_metadata().region_id + ); + } + ensure!( partition < self.properties.num_partitions(), PartitionOutOfRangeSnafu { @@ -146,6 +164,8 @@ impl SeriesScan { part_metrics.merge_metrics(&metrics); } + + part_metrics.on_finish(); }; Ok(Box::pin(stream)) } @@ -190,7 +210,7 @@ impl SeriesScan { let part_num = self.properties.num_partitions(); let metrics_set = ExecutionPlanMetricsSet::default(); let streams = (0..part_num) - .map(|i| self.scan_partition(&metrics_set, i)) + .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i)) .collect::, BoxedError>>()?; let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?; Ok(Box::pin(chained_stream)) @@ -204,12 +224,18 @@ impl SeriesScan { .map(|partition| { let metrics = new_partition_metrics( &self.stream_ctx, + false, &metrics_set, partition, &self.metrics_list, ); - self.scan_batch_in_partition(partition, metrics, &metrics_set) + self.scan_batch_in_partition( + &QueryScanContext::default(), + partition, + metrics, + &metrics_set, + ) }) .collect::>>()?; @@ -242,10 +268,11 @@ impl RegionScanner for SeriesScan { fn scan_partition( &self, + ctx: &QueryScanContext, metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { - self.scan_partition_impl(metrics_set, partition) + self.scan_partition_impl(ctx, metrics_set, partition) .map_err(BoxedError::new) } @@ -328,6 +355,7 @@ impl SeriesDistributor { async fn scan_partitions(&mut self) -> Result<()> { let part_metrics = new_partition_metrics( &self.stream_ctx, + false, &self.metrics_set, self.partitions.len(), &self.metrics_list, @@ -399,6 +427,7 @@ impl SeriesDistributor { metrics.scan_cost += fetch_start.elapsed(); metrics.num_series_send_timeout = self.senders.num_timeout; + metrics.num_series_send_full = self.senders.num_full; part_metrics.set_distributor_metrics(&metrics); part_metrics.on_finish(); @@ -444,6 +473,8 @@ struct SenderList { sender_idx: usize, /// Number of timeout. num_timeout: usize, + /// Number of full senders. + num_full: usize, } impl SenderList { @@ -454,6 +485,7 @@ impl SenderList { num_nones, sender_idx: 0, num_timeout: 0, + num_full: 0, } } @@ -471,6 +503,7 @@ impl SenderList { match sender.try_send(Ok(batch)) { Ok(()) => return Ok(None), Err(TrySendError::Full(res)) => { + self.num_full += 1; // Safety: we send Ok. batch = res.unwrap(); } @@ -546,6 +579,7 @@ impl SenderList { fn new_partition_metrics( stream_ctx: &StreamContext, + explain_verbose: bool, metrics_set: &ExecutionPlanMetricsSet, partition: usize, metrics_list: &PartitionMetricsList, @@ -555,6 +589,7 @@ fn new_partition_metrics( partition, "SeriesScan", stream_ctx.query_start, + explain_verbose, metrics_set, ); diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index f3ecc96301..a32d5e23b2 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -27,7 +27,9 @@ use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; use snafu::ensure; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties}; +use store_api::region_engine::{ + PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, +}; use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::range::RangeBuilderList; @@ -71,7 +73,7 @@ impl UnorderedScan { let metrics_set = ExecutionPlanMetricsSet::new(); let part_num = self.properties.num_partitions(); let streams = (0..part_num) - .map(|i| self.scan_partition(&metrics_set, i)) + .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i)) .collect::, BoxedError>>()?; let stream = stream! { for mut stream in streams { @@ -139,7 +141,7 @@ impl UnorderedScan { let streams = (0..self.properties.partitions.len()) .map(|partition| { - let metrics = self.partition_metrics(partition, &metrics_set); + let metrics = self.partition_metrics(false, partition, &metrics_set); self.scan_batch_in_partition(partition, metrics) }) .collect::>>()?; @@ -149,6 +151,7 @@ impl UnorderedScan { fn partition_metrics( &self, + explain_verbose: bool, partition: usize, metrics_set: &ExecutionPlanMetricsSet, ) -> PartitionMetrics { @@ -157,6 +160,7 @@ impl UnorderedScan { partition, "UnorderedScan", self.stream_ctx.query_start, + explain_verbose, metrics_set, ); self.metrics_list.set(partition, part_metrics.clone()); @@ -165,10 +169,19 @@ impl UnorderedScan { fn scan_partition_impl( &self, + ctx: &QueryScanContext, metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { - let metrics = self.partition_metrics(partition, metrics_set); + if ctx.explain_verbose { + common_telemetry::info!( + "UnorderedScan partition {}, region_id: {}", + partition, + self.stream_ctx.input.region_metadata().region_id + ); + } + + let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set); let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?; @@ -263,6 +276,8 @@ impl UnorderedScan { metrics.scan_cost += fetch_start.elapsed(); part_metrics.merge_metrics(&metrics); } + + part_metrics.on_finish(); }; Ok(Box::pin(stream)) } @@ -288,10 +303,11 @@ impl RegionScanner for UnorderedScan { fn scan_partition( &self, + ctx: &QueryScanContext, metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { - self.scan_partition_impl(metrics_set, partition) + self.scan_partition_impl(ctx, metrics_set, partition) .map_err(BoxedError::new) } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index aed94982c1..16e02f2a0b 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -539,6 +539,12 @@ impl QueryExecutor for DatafusionQueryEngine { ctx: &QueryEngineContext, plan: &Arc, ) -> Result { + let explain_verbose = ctx.query_ctx().explain_verbose(); + let output_partitions = plan.properties().output_partitioning().partition_count(); + if explain_verbose { + common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}"); + } + let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer(); let task_ctx = ctx.build_task_ctx(); @@ -562,9 +568,15 @@ impl QueryExecutor for DatafusionQueryEngine { .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; stream.set_metrics2(plan.clone()); - stream.set_explain_verbose(ctx.query_ctx().explain_verbose()); + stream.set_explain_verbose(explain_verbose); let stream = OnDone::new(Box::pin(stream), move || { - exec_timer.observe_duration(); + let exec_cost = exec_timer.stop_and_record(); + if explain_verbose { + common_telemetry::info!( + "DatafusionQueryEngine execute 1 stream, cost: {:?}s", + exec_cost, + ); + } }); Ok(Box::pin(stream)) } @@ -591,7 +603,13 @@ impl QueryExecutor for DatafusionQueryEngine { stream.set_metrics2(plan.clone()); stream.set_explain_verbose(ctx.query_ctx().explain_verbose()); let stream = OnDone::new(Box::pin(stream), move || { - exec_timer.observe_duration(); + let exec_cost = exec_timer.stop_and_record(); + if explain_verbose { + common_telemetry::info!( + "DatafusionQueryEngine execute {output_partitions} stream, cost: {:?}s", + exec_cost + ); + } }); Ok(Box::pin(stream)) } diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 87846ef510..b6dbda9125 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -148,6 +148,8 @@ pub struct MergeScanExec { properties: PlanProperties, /// Metrics from sub stages sub_stage_metrics: Arc>>, + /// Metrics for each partition + partition_metrics: Arc>>, query_ctx: QueryContextRef, target_partition: usize, partition_cols: Vec, @@ -244,6 +246,7 @@ impl MergeScanExec { region_query_handler, metric: ExecutionPlanMetricsSet::new(), sub_stage_metrics: Arc::default(), + partition_metrics: Arc::default(), properties, query_ctx, target_partition, @@ -263,12 +266,14 @@ impl MergeScanExec { let schema = self.schema.clone(); let query_ctx = self.query_ctx.clone(); let sub_stage_metrics_moved = self.sub_stage_metrics.clone(); + let partition_metrics_moved = self.partition_metrics.clone(); let plan = self.plan.clone(); let target_partition = self.target_partition; let dbname = context.task_id().unwrap_or_default(); let tracing_context = TracingContext::from_json(context.session_id().as_str()); let current_channel = self.query_ctx.channel(); let read_preference = self.query_ctx.read_preference(); + let explain_verbose = self.query_ctx.explain_verbose(); let stream = Box::pin(stream!({ // only report metrics once for each MergeScan @@ -295,7 +300,17 @@ impl MergeScanExec { region_id, plan: plan.clone(), }; + let region_start = Instant::now(); let do_get_start = Instant::now(); + + if explain_verbose { + common_telemetry::info!( + "Merge scan one region, partition: {}, region_id: {}", + partition, + region_id + ); + } + let mut stream = region_query_handler .do_get(read_preference, request) .await @@ -332,10 +347,31 @@ impl MergeScanExec { // reset poll timer poll_timer = Instant::now(); } - common_telemetry::debug!( - "Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}", - partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost - ); + let total_cost = region_start.elapsed(); + + // Record region metrics and push to global partition_metrics + let region_metrics = RegionMetrics { + region_id, + poll_duration, + do_get_cost, + total_cost, + }; + + // Push RegionMetrics to global partition_metrics immediately after scanning this region + { + let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap(); + let partition_metrics = partition_metrics_guard + .entry(partition) + .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose)); + partition_metrics.add_region_metrics(region_metrics); + } + + if explain_verbose { + common_telemetry::info!( + "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}", + partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost + ); + } // process metrics after all data is drained. if let Some(metrics) = stream.metrics() { @@ -358,6 +394,14 @@ impl MergeScanExec { MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); } + + // Finish partition metrics and log results + { + let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap(); + if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) { + partition_metrics.finish(); + } + } })); Ok(Box::pin(RecordBatchStreamWrapper { @@ -409,6 +453,7 @@ impl MergeScanExec { self.properties.boundedness, ), sub_stage_metrics: self.sub_stage_metrics.clone(), + partition_metrics: self.partition_metrics.clone(), query_ctx: self.query_ctx.clone(), target_partition: self.target_partition, partition_cols: self.partition_cols.clone(), @@ -436,6 +481,90 @@ impl MergeScanExec { pub fn region_count(&self) -> usize { self.regions.len() } + + fn partition_metrics(&self) -> Vec { + self.partition_metrics + .lock() + .unwrap() + .values() + .cloned() + .collect() + } +} + +/// Metrics for a region of a partition. +#[derive(Debug, Clone)] +struct RegionMetrics { + region_id: RegionId, + poll_duration: Duration, + do_get_cost: Duration, + /// Total cost to scan the region. + total_cost: Duration, +} + +/// Metrics for a partition of a MergeScanExec. +#[derive(Debug, Clone)] +struct PartitionMetrics { + partition: usize, + region_metrics: Vec, + total_poll_duration: Duration, + total_do_get_cost: Duration, + total_regions: usize, + explain_verbose: bool, + finished: bool, +} + +impl PartitionMetrics { + fn new(partition: usize, explain_verbose: bool) -> Self { + Self { + partition, + region_metrics: Vec::new(), + total_poll_duration: Duration::ZERO, + total_do_get_cost: Duration::ZERO, + total_regions: 0, + explain_verbose, + finished: false, + } + } + + fn add_region_metrics(&mut self, region_metrics: RegionMetrics) { + self.total_poll_duration += region_metrics.poll_duration; + self.total_do_get_cost += region_metrics.do_get_cost; + self.total_regions += 1; + self.region_metrics.push(region_metrics); + } + + /// Finish the partition metrics and log the results. + fn finish(&mut self) { + if self.finished { + return; + } + self.finished = true; + self.log_metrics(); + } + + /// Log partition metrics based on explain_verbose level. + fn log_metrics(&self) { + if self.explain_verbose { + common_telemetry::info!( + "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}", + self.partition, self.total_regions, self.total_poll_duration, self.total_do_get_cost + ); + } else { + common_telemetry::debug!( + "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}", + self.partition, self.total_regions, self.total_poll_duration, self.total_do_get_cost + ); + } + } +} + +impl Drop for PartitionMetrics { + fn drop(&mut self) { + if !self.finished { + self.log_metrics(); + } + } } impl ExecutionPlan for MergeScanExec { @@ -484,12 +613,42 @@ impl ExecutionPlan for MergeScanExec { } impl DisplayAs for MergeScanExec { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "MergeScanExec: peers=[")?; for region_id in self.regions.iter() { write!(f, "{}, ", region_id)?; } - write!(f, "]") + write!(f, "]")?; + + if matches!(t, DisplayFormatType::Verbose) { + let partition_metrics = self.partition_metrics(); + if !partition_metrics.is_empty() { + write!(f, ", metrics={{")?; + for (i, pm) in partition_metrics.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[", + pm.partition, pm.total_regions, + pm.total_poll_duration, + pm.total_do_get_cost)?; + for (j, rm) in pm.region_metrics.iter().enumerate() { + if j > 0 { + write!(f, ",")?; + } + write!(f, "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}", + rm.region_id, + rm.poll_duration, + rm.do_get_cost, + rm.total_cost)?; + } + write!(f, "]}}")?; + } + write!(f, "}}")?; + } + } + + Ok(()) } } diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 337722ced3..547bb7016e 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -28,6 +28,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_common::DataFusionError; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; use datatypes::arrow::datatypes::SchemaRef; +use session::context::QueryContextRef; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; @@ -135,6 +136,7 @@ pub struct DummyTableProvider { metadata: RegionMetadataRef, /// Keeping a mutable request makes it possible to change in the optimize phase. scan_request: Arc>, + query_ctx: Option, } impl fmt::Debug for DummyTableProvider { @@ -178,7 +180,11 @@ impl TableProvider for DummyTableProvider { .handle_query(self.region_id, request.clone()) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; - Ok(Arc::new(RegionScanExec::new(scanner, request)?)) + let mut scan_exec = RegionScanExec::new(scanner, request)?; + if let Some(query_ctx) = &self.query_ctx { + scan_exec.set_explain_verbose(query_ctx.explain_verbose()); + } + Ok(Arc::new(scan_exec)) } fn supports_filters_pushdown( @@ -221,6 +227,7 @@ impl DummyTableProvider { engine, metadata, scan_request: Default::default(), + query_ctx: None, } } @@ -261,7 +268,7 @@ impl DummyTableProviderFactory { &self, region_id: RegionId, engine: RegionEngineRef, - ctx: Option<&session::context::QueryContext>, + query_ctx: Option, ) -> Result { let metadata = engine @@ -272,7 +279,8 @@ impl DummyTableProviderFactory { region_id, })?; - let scan_request = ctx + let scan_request = query_ctx + .as_ref() .map(|ctx| ScanRequest { sequence: ctx.get_snapshot(region_id.as_u64()), sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()), @@ -285,6 +293,7 @@ impl DummyTableProviderFactory { engine, metadata, scan_request: Arc::new(Mutex::new(scan_request)), + query_ctx, }) } } @@ -295,7 +304,7 @@ impl TableProviderFactory for DummyTableProviderFactory { &self, region_id: RegionId, engine: RegionEngineRef, - ctx: Option<&session::context::QueryContext>, + ctx: Option, ) -> Result> { let provider = self.create_table_provider(region_id, engine, ctx).await?; Ok(Arc::new(provider)) @@ -308,7 +317,7 @@ pub trait TableProviderFactory: Send + Sync { &self, region_id: RegionId, engine: RegionEngineRef, - ctx: Option<&session::context::QueryContext>, + ctx: Option, ) -> Result>; } diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index 939fa8381c..2c208be04e 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; use arrow_flight::FlightData; use common_error::ext::ErrorExt; @@ -22,7 +23,7 @@ use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing::{info_span, Instrument}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; -use common_telemetry::{error, warn}; +use common_telemetry::{error, info, warn}; use futures::channel::mpsc; use futures::channel::mpsc::Sender; use futures::{SinkExt, Stream, StreamExt}; @@ -35,6 +36,61 @@ use crate::error; use crate::grpc::flight::TonicResult; use crate::grpc::FlightCompression; +/// Metrics collector for Flight stream with RAII logging pattern +struct StreamMetrics { + send_schema_duration: Duration, + send_record_batch_duration: Duration, + send_metrics_duration: Duration, + fetch_content_duration: Duration, + record_batch_count: usize, + metrics_count: usize, + total_rows: usize, + total_bytes: usize, + should_log: bool, +} + +impl StreamMetrics { + fn new(should_log: bool) -> Self { + Self { + send_schema_duration: Duration::ZERO, + send_record_batch_duration: Duration::ZERO, + send_metrics_duration: Duration::ZERO, + fetch_content_duration: Duration::ZERO, + record_batch_count: 0, + metrics_count: 0, + total_rows: 0, + total_bytes: 0, + should_log, + } + } +} + +impl Drop for StreamMetrics { + fn drop(&mut self) { + if self.should_log { + info!( + "flight_data_stream finished: \ + send_schema_duration={:?}, \ + send_record_batch_duration={:?}, \ + send_metrics_duration={:?}, \ + fetch_content_duration={:?}, \ + record_batch_count={}, \ + metrics_count={}, \ + total_rows={}, \ + total_bytes={}", + self.send_schema_duration, + self.send_record_batch_duration, + self.send_metrics_duration, + self.fetch_content_duration, + self.record_batch_count, + self.metrics_count, + self.total_rows, + self.total_bytes + ); + } + } +} + #[pin_project(PinnedDrop)] pub struct FlightRecordBatchStream { #[pin] @@ -78,15 +134,29 @@ impl FlightRecordBatchStream { mut tx: Sender>, should_send_partial_metrics: bool, ) { + let mut metrics = StreamMetrics::new(should_send_partial_metrics); + let schema = recordbatches.schema().arrow_schema().clone(); + let start = Instant::now(); if let Err(e) = tx.send(Ok(FlightMessage::Schema(schema))).await { warn!(e; "stop sending Flight data"); return; } + metrics.send_schema_duration += start.elapsed(); - while let Some(batch_or_err) = recordbatches.next().in_current_span().await { + while let Some(batch_or_err) = { + let start = Instant::now(); + let result = recordbatches.next().in_current_span().await; + metrics.fetch_content_duration += start.elapsed(); + result + } { match batch_or_err { Ok(recordbatch) => { + metrics.total_rows += recordbatch.num_rows(); + metrics.record_batch_count += 1; + metrics.total_bytes += recordbatch.df_record_batch().get_array_memory_size(); + + let start = Instant::now(); if let Err(e) = tx .send(Ok(FlightMessage::RecordBatch( recordbatch.into_df_record_batch(), @@ -96,15 +166,20 @@ impl FlightRecordBatchStream { warn!(e; "stop sending Flight data"); return; } + metrics.send_record_batch_duration += start.elapsed(); + if should_send_partial_metrics { - if let Some(metrics) = recordbatches + if let Some(metrics_str) = recordbatches .metrics() .and_then(|m| serde_json::to_string(&m).ok()) { - if let Err(e) = tx.send(Ok(FlightMessage::Metrics(metrics))).await { + metrics.metrics_count += 1; + let start = Instant::now(); + if let Err(e) = tx.send(Ok(FlightMessage::Metrics(metrics_str))).await { warn!(e; "stop sending Flight data"); return; } + metrics.send_metrics_duration += start.elapsed(); } } } @@ -126,7 +201,10 @@ impl FlightRecordBatchStream { .metrics() .and_then(|m| serde_json::to_string(&m).ok()) { + metrics.metrics_count += 1; + let start = Instant::now(); let _ = tx.send(Ok(FlightMessage::Metrics(metrics_str))).await; + metrics.send_metrics_duration += start.elapsed(); } } } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 25951a4f6d..a7f9e7a0e9 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -390,6 +390,13 @@ impl PrepareRequest { } } +/// Necessary context of the query for the scanner. +#[derive(Clone, Default)] +pub struct QueryScanContext { + /// Whether the query is EXPLAIN ANALYZE VERBOSE. + pub explain_verbose: bool, +} + /// A scanner that provides a way to scan the region concurrently. /// /// The scanner splits the region into partitions so that each partition can be scanned concurrently. @@ -415,6 +422,7 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Panics if the `partition` is out of bound. fn scan_partition( &self, + ctx: &QueryScanContext, metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result; @@ -832,6 +840,7 @@ impl RegionScanner for SinglePartitionScanner { fn scan_partition( &self, + _ctx: &QueryScanContext, _metrics_set: &ExecutionPlanMetricsSet, _partition: usize, ) -> Result { diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 4b41da9c05..0a566fb8bc 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -40,7 +40,9 @@ use datafusion_physical_expr::{ use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datatypes::compute::SortOptions; use futures::{Stream, StreamExt}; -use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScannerRef}; +use store_api::region_engine::{ + PartitionRange, PrepareRequest, QueryScanContext, RegionScannerRef, +}; use store_api::storage::{ScanRequest, TimeSeriesDistribution}; use crate::table::metrics::StreamMetrics; @@ -59,6 +61,7 @@ pub struct RegionScanExec { is_partition_set: bool, // TODO(ruihang): handle TimeWindowed dist via this parameter distribution: Option, + explain_verbose: bool, } impl RegionScanExec { @@ -165,6 +168,7 @@ impl RegionScanExec { total_rows, is_partition_set: false, distribution: request.distribution, + explain_verbose: false, }) } @@ -231,6 +235,7 @@ impl RegionScanExec { total_rows: self.total_rows, is_partition_set: true, distribution: self.distribution, + explain_verbose: self.explain_verbose, }) } @@ -266,6 +271,10 @@ impl RegionScanExec { .map(|col| col.column_schema.name.clone()) .collect() } + + pub fn set_explain_verbose(&mut self, explain_verbose: bool) { + self.explain_verbose = explain_verbose; + } } impl ExecutionPlan for RegionScanExec { @@ -301,11 +310,14 @@ impl ExecutionPlan for RegionScanExec { let span = tracing_context.attach(common_telemetry::tracing::info_span!("read_from_region")); + let ctx = QueryScanContext { + explain_verbose: self.explain_verbose, + }; let stream = self .scanner .lock() .unwrap() - .scan_partition(&self.metric, partition) + .scan_partition(&ctx, &self.metric, partition) .map_err(|e| DataFusionError::External(Box::new(e)))?; let stream_metrics = StreamMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper {