feat: expose scanner metrics to df execution metrics (#5699)

* feat: add metrics list to scanner

* chore: add report metrics method

* feat: use df metrics in PartitionMetrics

* feat: pass execution metrics to scan partition

* refactor: remove PartitionMetricsList

* feat: better debug format for ScanMetricsSet

* feat: do not expose all metrics to execution metrics by default

* refactor: use struct destruction

* feat: add metrics list to scanner

* chore: Add custom Debug for ScanMetricsSet and partition metrics display

* test: update sqlness result
This commit is contained in:
Yingwen
2025-03-28 07:40:39 +08:00
committed by GitHub
parent 76a58a07e1
commit dbc25dd8da
7 changed files with 428 additions and 121 deletions

View File

@@ -56,7 +56,6 @@ use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::prune::PruneReader;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
@@ -991,13 +990,11 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
}
}
/// Metrics for scanners.
/// Local metrics for scanners.
#[derive(Debug, Default)]
pub(crate) struct ScannerMetrics {
/// Duration to prepare the scan task.
prepare_scan_cost: Duration,
/// Duration to build file ranges.
build_parts_cost: Duration,
/// Duration to build the (merge) reader.
build_reader_cost: Duration,
/// Duration to scan data.
@@ -1006,8 +1003,6 @@ pub(crate) struct ScannerMetrics {
convert_cost: Duration,
/// Duration while waiting for `yield`.
yield_cost: Duration,
/// Duration of the scan.
total_cost: Duration,
/// Number of batches returned.
num_batches: usize,
/// Number of rows returned.
@@ -1018,50 +1013,6 @@ pub(crate) struct ScannerMetrics {
num_file_ranges: usize,
}
impl ScannerMetrics {
/// Observes metrics.
fn observe_metrics(&self) {
READ_STAGE_ELAPSED
.with_label_values(&["prepare_scan"])
.observe(self.prepare_scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["build_parts"])
.observe(self.build_parts_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["build_reader"])
.observe(self.build_reader_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["convert_rb"])
.observe(self.convert_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["scan"])
.observe(self.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["yield"])
.observe(self.yield_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(self.total_cost.as_secs_f64());
READ_ROWS_RETURN.observe(self.num_rows as f64);
READ_BATCHES_RETURN.observe(self.num_batches as f64);
}
/// Merges metrics from another [ScannerMetrics].
fn merge_from(&mut self, other: &ScannerMetrics) {
self.prepare_scan_cost += other.prepare_scan_cost;
self.build_parts_cost += other.build_parts_cost;
self.build_reader_cost += other.build_reader_cost;
self.scan_cost += other.scan_cost;
self.convert_cost += other.convert_cost;
self.yield_cost += other.yield_cost;
self.total_cost += other.total_cost;
self.num_batches += other.num_batches;
self.num_rows += other.num_rows;
self.num_mem_ranges += other.num_mem_ranges;
self.num_file_ranges += other.num_file_ranges;
}
}
#[cfg(test)]
mod tests {
use store_api::codec::PrimaryKeyEncoding;

View File

@@ -14,22 +14,280 @@
//! Utilities for scanners.
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_telemetry::debug;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
use futures::Stream;
use prometheus::IntGauge;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::metrics::IN_PROGRESS_SCAN;
use crate::metrics::{
IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
};
use crate::read::range::{RangeBuilderList, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::reader::ReaderMetrics;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
/// Verbose scan metrics for a partition.
#[derive(Default)]
struct ScanMetricsSet {
/// Duration to prepare the scan task.
prepare_scan_cost: Duration,
/// Duration to build the (merge) reader.
build_reader_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration to convert batches.
convert_cost: Duration,
/// Duration while waiting for `yield`.
yield_cost: Duration,
/// Duration of the scan.
total_cost: Duration,
/// Number of rows returned.
num_rows: usize,
/// Number of batches returned.
num_batches: usize,
/// Number of mem ranges scanned.
num_mem_ranges: usize,
/// Number of file ranges scanned.
num_file_ranges: usize,
// SST related metrics:
/// Duration to build file ranges.
build_parts_cost: Duration,
/// Number of row groups before filtering.
rg_total: usize,
/// Number of row groups filtered by fulltext index.
rg_fulltext_filtered: usize,
/// Number of row groups filtered by inverted index.
rg_inverted_filtered: usize,
/// Number of row groups filtered by min-max index.
rg_minmax_filtered: usize,
/// Number of row groups filtered by bloom filter index.
rg_bloom_filtered: usize,
/// Number of rows in row group before filtering.
rows_before_filter: usize,
/// Number of rows in row group filtered by fulltext index.
rows_fulltext_filtered: usize,
/// Number of rows in row group filtered by inverted index.
rows_inverted_filtered: usize,
/// Number of rows in row group filtered by bloom filter index.
rows_bloom_filtered: usize,
/// Number of rows filtered by precise filter.
rows_precise_filtered: usize,
/// Number of record batches read from SST.
num_sst_record_batches: usize,
/// Number of batches decoded from SST.
num_sst_batches: usize,
/// Number of rows read from SST.
num_sst_rows: usize,
/// Elapsed time before the first poll operation.
first_poll: Duration,
}
impl fmt::Debug for ScanMetricsSet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ScanMetricsSet {
prepare_scan_cost,
build_reader_cost,
scan_cost,
convert_cost,
yield_cost,
total_cost,
num_rows,
num_batches,
num_mem_ranges,
num_file_ranges,
build_parts_cost,
rg_total,
rg_fulltext_filtered,
rg_inverted_filtered,
rg_minmax_filtered,
rg_bloom_filtered,
rows_before_filter,
rows_fulltext_filtered,
rows_inverted_filtered,
rows_bloom_filtered,
rows_precise_filtered,
num_sst_record_batches,
num_sst_batches,
num_sst_rows,
first_poll,
} = self;
write!(
f,
"{{prepare_scan_cost={prepare_scan_cost:?}, \
build_reader_cost={build_reader_cost:?}, \
scan_cost={scan_cost:?}, \
convert_cost={convert_cost:?}, \
yield_cost={yield_cost:?}, \
total_cost={total_cost:?}, \
num_rows={num_rows}, \
num_batches={num_batches}, \
num_mem_ranges={num_mem_ranges}, \
num_file_ranges={num_file_ranges}, \
build_parts_cost={build_parts_cost:?}, \
rg_total={rg_total}, \
rg_fulltext_filtered={rg_fulltext_filtered}, \
rg_inverted_filtered={rg_inverted_filtered}, \
rg_minmax_filtered={rg_minmax_filtered}, \
rg_bloom_filtered={rg_bloom_filtered}, \
rows_before_filter={rows_before_filter}, \
rows_fulltext_filtered={rows_fulltext_filtered}, \
rows_inverted_filtered={rows_inverted_filtered}, \
rows_bloom_filtered={rows_bloom_filtered}, \
rows_precise_filtered={rows_precise_filtered}, \
num_sst_record_batches={num_sst_record_batches}, \
num_sst_batches={num_sst_batches}, \
num_sst_rows={num_sst_rows}, \
first_poll={first_poll:?}}}"
)
}
}
impl ScanMetricsSet {
/// Attaches the `prepare_scan_cost` to the metrics set.
fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
self.prepare_scan_cost += cost;
self
}
/// Merges the local scanner metrics.
fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
let ScannerMetrics {
prepare_scan_cost,
build_reader_cost,
scan_cost,
convert_cost,
yield_cost,
num_batches,
num_rows,
num_mem_ranges,
num_file_ranges,
} = other;
self.prepare_scan_cost += *prepare_scan_cost;
self.build_reader_cost += *build_reader_cost;
self.scan_cost += *scan_cost;
self.convert_cost += *convert_cost;
self.yield_cost += *yield_cost;
self.num_rows += *num_rows;
self.num_batches += *num_batches;
self.num_mem_ranges += *num_mem_ranges;
self.num_file_ranges += *num_file_ranges;
}
/// Merges the local reader metrics.
fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
let ReaderMetrics {
build_cost,
filter_metrics:
ReaderFilterMetrics {
rg_total,
rg_fulltext_filtered,
rg_inverted_filtered,
rg_minmax_filtered,
rg_bloom_filtered,
rows_total,
rows_fulltext_filtered,
rows_inverted_filtered,
rows_bloom_filtered,
rows_precise_filtered,
},
num_record_batches,
num_batches,
num_rows,
scan_cost: _,
} = other;
self.build_parts_cost += *build_cost;
self.rg_total += *rg_total;
self.rg_fulltext_filtered += *rg_fulltext_filtered;
self.rg_inverted_filtered += *rg_inverted_filtered;
self.rg_minmax_filtered += *rg_minmax_filtered;
self.rg_bloom_filtered += *rg_bloom_filtered;
self.rows_before_filter += *rows_total;
self.rows_fulltext_filtered += *rows_fulltext_filtered;
self.rows_inverted_filtered += *rows_inverted_filtered;
self.rows_bloom_filtered += *rows_bloom_filtered;
self.rows_precise_filtered += *rows_precise_filtered;
self.num_sst_record_batches += *num_record_batches;
self.num_sst_batches += *num_batches;
self.num_sst_rows += *num_rows;
}
/// Observes metrics.
fn observe_metrics(&self) {
READ_STAGE_ELAPSED
.with_label_values(&["prepare_scan"])
.observe(self.prepare_scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["build_reader"])
.observe(self.build_reader_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["convert_rb"])
.observe(self.convert_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["scan"])
.observe(self.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["yield"])
.observe(self.yield_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(self.total_cost.as_secs_f64());
READ_ROWS_RETURN.observe(self.num_rows as f64);
READ_BATCHES_RETURN.observe(self.num_batches as f64);
READ_STAGE_ELAPSED
.with_label_values(&["build_parts"])
.observe(self.build_parts_cost.as_secs_f64());
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.rg_total as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.rg_fulltext_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.rg_inverted_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["minmax_index_filtered"])
.inc_by(self.rg_minmax_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rg_bloom_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.rows_before_filter as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.rows_fulltext_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.rows_inverted_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rows_bloom_filtered as u64);
}
}
struct PartitionMetricsInner {
region_id: RegionId,
@@ -39,38 +297,71 @@ struct PartitionMetricsInner {
scanner_type: &'static str,
/// Query start time.
query_start: Instant,
/// Elapsed time before the first poll operation.
first_poll: Duration,
metrics: ScannerMetrics,
reader_metrics: ReaderMetrics,
/// Verbose scan metrics that only log to debug logs by default.
metrics: Mutex<ScanMetricsSet>,
in_progress_scan: IntGauge,
// Normal metrics that always report to the [ExecutionPlanMetricsSet]:
/// Duration to build file ranges.
build_parts_cost: Time,
/// Duration to build the (merge) reader.
build_reader_cost: Time,
/// Duration to scan data.
scan_cost: Time,
/// Duration while waiting for `yield`.
yield_cost: Time,
}
impl PartitionMetricsInner {
fn on_finish(&mut self) {
if self.metrics.total_cost.is_zero() {
self.metrics.total_cost = self.query_start.elapsed();
fn on_finish(&self) {
let mut metrics = self.metrics.lock().unwrap();
if metrics.total_cost.is_zero() {
metrics.total_cost = self.query_start.elapsed();
}
self.metrics.build_parts_cost = self.reader_metrics.build_cost;
}
}
impl Drop for PartitionMetricsInner {
fn drop(&mut self) {
self.on_finish();
self.metrics.observe_metrics();
let metrics = self.metrics.lock().unwrap();
metrics.observe_metrics();
self.in_progress_scan.dec();
debug!(
"{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}",
self.scanner_type, self.region_id, self.partition, self.first_poll, self.metrics, self.reader_metrics
"{} finished, region_id: {}, partition: {}, metrics: {:?}",
self.scanner_type, self.region_id, self.partition, metrics
);
}
}
/// List of PartitionMetrics.
#[derive(Default)]
pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
impl PartitionMetricsList {
/// Sets a new [PartitionMetrics] at the specified partition.
pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
let mut list = self.0.lock().unwrap();
if list.len() <= partition {
list.resize(partition + 1, None);
}
list[partition] = Some(metrics);
}
/// Format verbose metrics for each partition for explain.
pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
let list = self.0.lock().unwrap();
write!(f, ", metrics_per_partition: ")?;
f.debug_list()
.entries(list.iter().filter_map(|p| p.as_ref()))
.finish()
}
}
/// Metrics while reading a partition.
#[derive(Clone)]
pub(crate) struct PartitionMetrics(Arc<Mutex<PartitionMetricsInner>>);
pub(crate) struct PartitionMetrics(Arc<PartitionMetricsInner>);
impl PartitionMetrics {
pub(crate) fn new(
@@ -78,57 +369,82 @@ impl PartitionMetrics {
partition: usize,
scanner_type: &'static str,
query_start: Instant,
metrics: ScannerMetrics,
metrics_set: &ExecutionPlanMetricsSet,
) -> Self {
let partition_str = partition.to_string();
let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
in_progress_scan.inc();
let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
let inner = PartitionMetricsInner {
region_id,
partition,
scanner_type,
query_start,
first_poll: Duration::default(),
metrics,
reader_metrics: ReaderMetrics::default(),
metrics: Mutex::new(metrics),
in_progress_scan,
build_parts_cost: MetricBuilder::new(metrics_set)
.subset_time("build_parts_cost", partition),
build_reader_cost: MetricBuilder::new(metrics_set)
.subset_time("build_reader_cost", partition),
scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
};
Self(Arc::new(Mutex::new(inner)))
Self(Arc::new(inner))
}
pub(crate) fn on_first_poll(&self) {
let mut inner = self.0.lock().unwrap();
inner.first_poll = inner.query_start.elapsed();
let mut metrics = self.0.metrics.lock().unwrap();
metrics.first_poll = self.0.query_start.elapsed();
}
pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
let mut inner = self.0.lock().unwrap();
inner.metrics.num_mem_ranges += num;
let mut metrics = self.0.metrics.lock().unwrap();
metrics.num_mem_ranges += num;
}
pub(crate) fn inc_num_file_ranges(&self, num: usize) {
let mut inner = self.0.lock().unwrap();
inner.metrics.num_file_ranges += num;
let mut metrics = self.0.metrics.lock().unwrap();
metrics.num_file_ranges += num;
}
/// Merges `build_reader_cost`.
pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
let mut inner = self.0.lock().unwrap();
inner.metrics.build_reader_cost += cost;
self.0.build_reader_cost.add_duration(cost);
let mut metrics = self.0.metrics.lock().unwrap();
metrics.build_reader_cost += cost;
}
/// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
let mut inner = self.0.lock().unwrap();
inner.metrics.merge_from(metrics);
self.0
.build_reader_cost
.add_duration(metrics.build_reader_cost);
self.0.scan_cost.add_duration(metrics.scan_cost);
self.0.yield_cost.add_duration(metrics.yield_cost);
let mut metrics_set = self.0.metrics.lock().unwrap();
metrics_set.merge_scanner_metrics(metrics);
}
/// Merges [ReaderMetrics] and `build_reader_cost`.
pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
let mut inner = self.0.lock().unwrap();
inner.reader_metrics.merge_from(metrics);
self.0.build_parts_cost.add_duration(metrics.build_cost);
let mut metrics_set = self.0.metrics.lock().unwrap();
metrics_set.merge_reader_metrics(metrics);
}
/// Finishes the query.
pub(crate) fn on_finish(&self) {
let mut inner = self.0.lock().unwrap();
inner.on_finish();
self.0.on_finish();
}
}
impl fmt::Debug for PartitionMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let metrics = self.0.metrics.lock().unwrap();
write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
}
}

View File

@@ -24,6 +24,7 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::tracing;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
@@ -38,7 +39,9 @@ use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
};
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
use crate::region::options::MergeMode;
@@ -53,6 +56,9 @@ pub struct SeqScan {
stream_ctx: Arc<StreamContext>,
/// The scanner is used for compaction.
compaction: bool,
/// Metrics for each partition.
/// The scanner only sets in query and keeps it empty during compaction.
metrics_list: PartitionMetricsList,
}
impl SeqScan {
@@ -69,6 +75,7 @@ impl SeqScan {
properties,
stream_ctx,
compaction,
metrics_list: PartitionMetricsList::default(),
}
}
@@ -77,8 +84,9 @@ impl SeqScan {
/// The returned stream is not partitioned and will contains all the data. If want
/// partitioned scan, use [`RegionScanner::scan_partition`].
pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
let metrics_set = ExecutionPlanMetricsSet::new();
let streams = (0..self.properties.partitions.len())
.map(|partition: usize| self.scan_partition(partition))
.map(|partition: usize| self.scan_partition(&metrics_set, partition))
.collect::<Result<Vec<_>, _>>()?;
let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
@@ -92,16 +100,8 @@ impl SeqScan {
pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
assert!(self.compaction);
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
0,
get_scanner_type(self.compaction),
self.stream_ctx.query_start,
ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
},
);
let metrics_set = ExecutionPlanMetricsSet::new();
let part_metrics = self.new_partition_metrics(&metrics_set, 0);
debug_assert_eq!(1, self.properties.partitions.len());
let partition_ranges = &self.properties.partitions[0];
@@ -194,6 +194,7 @@ impl SeqScan {
/// Otherwise the returned stream might not contains any data.
fn scan_partition_impl(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
if partition >= self.properties.partitions.len() {
@@ -207,7 +208,7 @@ impl SeqScan {
}
if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
return self.scan_partition_by_series(partition);
return self.scan_partition_by_series(metrics_set, partition);
}
let stream_ctx = self.stream_ctx.clone();
@@ -215,7 +216,7 @@ impl SeqScan {
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range;
let part_metrics = self.new_partition_metrics(partition);
let part_metrics = self.new_partition_metrics(metrics_set, partition);
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -310,13 +311,14 @@ impl SeqScan {
/// Otherwise the returned stream might not contains any data.
fn scan_partition_by_series(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.new_semaphore();
let partition_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range;
let part_metrics = self.new_partition_metrics(partition);
let part_metrics = self.new_partition_metrics(metrics_set, partition);
debug_assert!(!self.compaction);
let stream = try_stream! {
@@ -411,17 +413,26 @@ impl SeqScan {
}
}
fn new_partition_metrics(&self, partition: usize) -> PartitionMetrics {
PartitionMetrics::new(
/// Creates a new partition metrics instance.
/// Sets the partition metrics for the given partition if it is not for compaction.
fn new_partition_metrics(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> PartitionMetrics {
let metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
partition,
get_scanner_type(self.compaction),
self.stream_ctx.query_start,
ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
},
)
metrics_set,
);
if !self.compaction {
self.metrics_list.set(partition, metrics.clone());
}
metrics
}
}
@@ -438,8 +449,12 @@ impl RegionScanner for SeqScan {
self.stream_ctx.input.mapper.metadata().clone()
}
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(partition)
fn scan_partition(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(metrics_set, partition)
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
@@ -466,7 +481,10 @@ impl DisplayAs for SeqScan {
)?;
match t {
DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
DisplayFormatType::Verbose => self.stream_ctx.format_for_explain(true, f),
DisplayFormatType::Verbose => {
self.stream_ctx.format_for_explain(true, f)?;
self.metrics_list.format_verbose_metrics(f)
}
}
}
}

View File

@@ -22,6 +22,7 @@ use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
@@ -32,7 +33,9 @@ use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties}
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
};
use crate::read::{Batch, ScannerMetrics};
/// Scans a region without providing any output ordering guarantee.
@@ -43,6 +46,8 @@ pub struct UnorderedScan {
properties: ScannerProperties,
/// Context of streams.
stream_ctx: Arc<StreamContext>,
/// Metrics for each partition.
metrics_list: PartitionMetricsList,
}
impl UnorderedScan {
@@ -57,14 +62,16 @@ impl UnorderedScan {
Self {
properties,
stream_ctx,
metrics_list: PartitionMetricsList::default(),
}
}
/// Scans the region and returns a stream.
pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
let metrics_set = ExecutionPlanMetricsSet::new();
let part_num = self.properties.num_partitions();
let streams = (0..part_num)
.map(|i| self.scan_partition(i))
.map(|i| self.scan_partition(&metrics_set, i))
.collect::<Result<Vec<_>, BoxedError>>()?;
let stream = stream! {
for mut stream in streams {
@@ -119,6 +126,7 @@ impl UnorderedScan {
fn scan_partition_impl(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
if partition >= self.properties.partitions.len() {
@@ -136,11 +144,9 @@ impl UnorderedScan {
partition,
"UnorderedScan",
self.stream_ctx.query_start,
ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
},
metrics_set,
);
self.metrics_list.set(partition, part_metrics.clone());
let stream_ctx = self.stream_ctx.clone();
let part_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range;
@@ -239,8 +245,12 @@ impl RegionScanner for UnorderedScan {
Ok(())
}
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(partition)
fn scan_partition(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(metrics_set, partition)
}
fn has_predicate(&self) -> bool {
@@ -262,7 +272,10 @@ impl DisplayAs for UnorderedScan {
)?;
match t {
DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
DisplayFormatType::Verbose => self.stream_ctx.format_for_explain(true, f),
DisplayFormatType::Verbose => {
self.stream_ctx.format_for_explain(true, f)?;
self.metrics_list.format_verbose_metrics(f)
}
}
}
}

View File

@@ -26,6 +26,7 @@ use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use common_time::Timestamp;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::future::join_all;
@@ -342,7 +343,11 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
///
/// # Panics
/// Panics if the `partition` is out of bound.
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError>;
fn scan_partition(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError>;
/// Check if there is any predicate that may be executed in this scanner.
fn has_predicate(&self) -> bool;
@@ -562,7 +567,11 @@ impl RegionScanner for SinglePartitionScanner {
Ok(())
}
fn scan_partition(&self, _partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
fn scan_partition(
&self,
_metrics_set: &ExecutionPlanMetricsSet,
_partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
let mut stream = self.stream.lock().unwrap();
stream.take().ok_or_else(|| {
BoxedError::new(PlainError::new(

View File

@@ -276,7 +276,7 @@ impl ExecutionPlan for RegionScanExec {
.scanner
.lock()
.unwrap()
.scan_partition(partition)
.scan_partition(&self.metric, partition)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream_metrics = StreamMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {

View File

@@ -105,7 +105,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+