feat: more logs and metrics under explain verbose mode (#6575)

* feat: collect region metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: log in info level

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add CoalescePartitionsExec to explain

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: finish metrics in partition and add sender full to metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add eof flag on finish

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: output cost as string

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: log on stream done

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: region id as string

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: enlarge send channel size

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: more log in flight and scan

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: logs about rows/batches/bytes

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: enlarge channel size

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remote read only log in verbose

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: revert channel change

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: get explain verbose in RegionScanExec

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: print scan log in verbose mode

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: collect region metrics after finishing one region

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: define StreamMetrics and log in verbose mode

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: only log non zero filter and distributor metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: revert displaying CoalescePartitions in explain

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect memtable metrics in partition metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-07-30 17:23:32 +08:00
committed by GitHub
parent ac8493ab4a
commit 1df605ec4b
18 changed files with 680 additions and 91 deletions

View File

@@ -174,7 +174,7 @@ impl RegionServer {
async fn table_provider(
&self,
region_id: RegionId,
ctx: Option<&session::context::QueryContext>,
ctx: Option<QueryContextRef>,
) -> Result<Arc<dyn TableProvider>> {
let status = self
.inner
@@ -207,9 +207,15 @@ impl RegionServer {
};
let region_id = RegionId::from_u64(request.region_id);
let provider = self.table_provider(region_id, Some(&query_ctx)).await?;
let provider = self
.table_provider(region_id, Some(query_ctx.clone()))
.await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
if query_ctx.explain_verbose() {
common_telemetry::info!("Handle remote read for region: {}", region_id);
}
let decoder = self
.inner
.query_engine
@@ -243,10 +249,12 @@ impl RegionServer {
};
let ctx: Option<session::context::QueryContext> = request.header.as_ref().map(|h| h.into());
let provider = self.table_provider(request.region_id, ctx.as_ref()).await?;
let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
let provider = self
.table_provider(request.region_id, Some(query_ctx.clone()))
.await?;
struct RegionDataSourceInjector {
source: Arc<dyn TableSource>,
}

View File

@@ -225,7 +225,9 @@ async fn test_series_scan() {
let mut partition_batches = vec![vec![]; 3];
let mut streams: Vec<_> = (0..3)
.map(|partition| {
let stream = scanner.scan_partition(&metrics_set, partition).unwrap();
let stream = scanner
.scan_partition(&Default::default(), &metrics_set, partition)
.unwrap();
Some(stream)
})
.collect();

View File

@@ -17,7 +17,8 @@
use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
pub use bulk::part::EncodedBulkPart;
use common_time::Timestamp;
@@ -354,11 +355,43 @@ impl MemtableBuilderProvider {
}
}
/// Metrics for scanning a memtable.
#[derive(Clone, Default)]
pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
impl MemScanMetrics {
/// Merges the metrics.
pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
let mut metrics = self.0.lock().unwrap();
metrics.total_series += inner.total_series;
metrics.num_rows += inner.num_rows;
metrics.num_batches += inner.num_batches;
metrics.scan_cost += inner.scan_cost;
}
/// Gets the metrics data.
pub(crate) fn data(&self) -> MemScanMetricsData {
self.0.lock().unwrap().clone()
}
}
#[derive(Clone, Default)]
pub(crate) struct MemScanMetricsData {
/// Total series in the memtable.
pub(crate) total_series: usize,
/// Number of rows read.
pub(crate) num_rows: usize,
/// Number of batch read.
pub(crate) num_batches: usize,
/// Duration to scan the memtable.
pub(crate) scan_cost: Duration,
}
/// Builder to build an iterator to read the range.
/// The builder should know the projection and the predicate to build the iterator.
pub trait IterBuilder: Send + Sync {
/// Returns the iterator to read the range.
fn build(&self) -> Result<BoxedBatchIterator>;
fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
}
pub type BoxedIterBuilder = Box<dyn IterBuilder>;
@@ -410,8 +443,12 @@ impl MemtableRange {
/// Builds an iterator to read the range.
/// Filters the result by the specific time range, this ensures memtable won't return
/// rows out of the time range when new rows are inserted.
pub fn build_prune_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
let iter = self.context.builder.build()?;
pub fn build_prune_iter(
&self,
time_range: FileTimeRange,
metrics: Option<MemScanMetrics>,
) -> Result<BoxedBatchIterator> {
let iter = self.context.builder.build(metrics)?;
let time_filters = self.context.predicate.time_filters();
Ok(Box::new(PruneTimeIterator::new(
iter,
@@ -422,7 +459,7 @@ impl MemtableRange {
/// Builds an iterator to read all rows in range.
pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
self.context.builder.build()
self.context.builder.build(None)
}
pub fn num_rows(&self) -> usize {

View File

@@ -41,9 +41,9 @@ use crate::memtable::bulk::part::BulkPart;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
PredicateGroup,
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
MemtableStats, PredicateGroup,
};
use crate::region::options::MergeMode;
@@ -183,7 +183,7 @@ impl Memtable for PartitionTreeMemtable {
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator> {
self.tree.read(projection, predicate, sequence)
self.tree.read(projection, predicate, sequence, None)
}
fn ranges(
@@ -315,7 +315,7 @@ impl PartitionTreeMemtable {
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator> {
self.tree.read(projection, predicate, sequence)
self.tree.read(projection, predicate, sequence, None)
}
}
@@ -360,11 +360,12 @@ struct PartitionTreeIterBuilder {
}
impl IterBuilder for PartitionTreeIterBuilder {
fn build(&self) -> Result<BoxedBatchIterator> {
fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
self.tree.read(
self.projection.as_deref(),
self.predicate.clone(),
self.sequence,
metrics,
)
}
}

View File

@@ -230,6 +230,7 @@ impl PartitionTree {
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
mem_scan_metrics: Option<crate::memtable::MemScanMetrics>,
) -> Result<BoxedBatchIterator> {
let start = Instant::now();
// Creates the projection set.
@@ -257,6 +258,7 @@ impl PartitionTree {
partitions,
current_reader: None,
metrics: tree_iter_metric,
mem_scan_metrics,
};
let context = ReadPartitionContext::new(
self.metadata.clone(),
@@ -467,10 +469,28 @@ struct TreeIter {
partitions: VecDeque<PartitionRef>,
current_reader: Option<PartitionReader>,
metrics: TreeIterMetrics,
mem_scan_metrics: Option<crate::memtable::MemScanMetrics>,
}
impl TreeIter {
fn report_mem_scan_metrics(&mut self) {
if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
let inner = crate::memtable::MemScanMetricsData {
total_series: 0, // This is unavailable.
num_rows: self.metrics.rows_fetched,
num_batches: self.metrics.batches_fetched,
scan_cost: self.metrics.iter_elapsed,
};
mem_scan_metrics.merge_inner(&inner);
}
}
}
impl Drop for TreeIter {
fn drop(&mut self) {
// Report MemScanMetrics if not already reported
self.report_mem_scan_metrics();
READ_ROWS_TOTAL
.with_label_values(&["partition_tree_memtable"])
.inc_by(self.metrics.rows_fetched as u64);
@@ -523,6 +543,8 @@ impl TreeIter {
/// Fetches next batch.
fn next_batch(&mut self) -> Result<Option<Batch>> {
let Some(part_reader) = &mut self.current_reader else {
// Report MemScanMetrics before returning None
self.report_mem_scan_metrics();
return Ok(None);
};

View File

@@ -19,6 +19,7 @@ use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use api::v1::OpType;
use datatypes::vectors::Helper;
@@ -33,8 +34,8 @@ use crate::memtable::bulk::part::BulkPart;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::time_series::Series;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange,
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
};
use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
use crate::read::dedup::LastNonNullIter;
@@ -219,7 +220,7 @@ impl Memtable for SimpleBulkMemtable {
_predicate: Option<table::predicate::Predicate>,
sequence: Option<SequenceNumber>,
) -> error::Result<BoxedBatchIterator> {
let iter = self.create_iter(projection, sequence)?.build()?;
let iter = self.create_iter(projection, sequence)?.build(None)?;
if self.merge_mode == MergeMode::LastNonNull {
let iter = LastNonNullIter::new(iter);
@@ -235,6 +236,7 @@ impl Memtable for SimpleBulkMemtable {
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
) -> error::Result<MemtableRanges> {
let start_time = Instant::now();
let projection = Arc::new(self.build_projection(projection));
let values = self.series.read().unwrap().read_to_values();
let contexts = values
@@ -263,6 +265,7 @@ impl Memtable for SimpleBulkMemtable {
let builder = BatchRangeBuilder {
batch,
merge_mode: self.merge_mode,
scan_cost: start_time.elapsed(),
};
(
num_rows,
@@ -346,11 +349,22 @@ impl Memtable for SimpleBulkMemtable {
pub struct BatchRangeBuilder {
pub batch: Batch,
pub merge_mode: MergeMode,
scan_cost: Duration,
}
impl IterBuilder for BatchRangeBuilder {
fn build(&self) -> error::Result<BoxedBatchIterator> {
fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
let batch = self.batch.clone();
if let Some(metrics) = metrics {
let inner = crate::memtable::MemScanMetricsData {
total_series: 1,
num_rows: batch.num_rows(),
num_batches: 1,
scan_cost: self.scan_cost,
};
metrics.merge_inner(&inner);
}
let iter = Iter {
batch: Some(Ok(batch)),
};
@@ -684,7 +698,7 @@ mod tests {
.unwrap();
assert_eq!(ranges.ranges.len(), 1);
let range = ranges.ranges.into_values().next().unwrap();
let mut reader = range.context.builder.build().unwrap();
let mut reader = range.context.builder.build(None).unwrap();
let mut num_rows = 0;
while let Some(b) = reader.next().transpose().unwrap() {

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::HashSet;
use std::time::Instant;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -20,7 +21,7 @@ use store_api::storage::{ColumnId, SequenceNumber};
use crate::error;
use crate::memtable::simple_bulk_memtable::{Iter, SimpleBulkMemtable};
use crate::memtable::time_series::Values;
use crate::memtable::{BoxedBatchIterator, IterBuilder};
use crate::memtable::{BoxedBatchIterator, IterBuilder, MemScanMetrics};
use crate::read::dedup::LastNonNullIter;
use crate::region::options::MergeMode;
@@ -64,7 +65,8 @@ pub(crate) struct BatchIterBuilderDeprecated {
}
impl IterBuilder for BatchIterBuilderDeprecated {
fn build(&self) -> error::Result<BoxedBatchIterator> {
fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
let start_time = Instant::now();
let Some(values) = self.values.clone() else {
return Ok(Box::new(Iter { batch: None }));
};
@@ -78,6 +80,21 @@ impl IterBuilder for BatchIterBuilderDeprecated {
.map(Some)
.transpose();
// Collect metrics from the batch
if let Some(metrics) = metrics {
let (num_rows, num_batches) = match &maybe_batch {
Some(Ok(batch)) => (batch.num_rows(), 1),
_ => (0, 0),
};
let inner = crate::memtable::MemScanMetricsData {
total_series: 1,
num_rows,
num_batches,
scan_cost: start_time.elapsed(),
};
metrics.merge_inner(&inner);
}
let iter = Iter { batch: maybe_batch };
if self.merge_mode == MergeMode::LastNonNull {

View File

@@ -51,9 +51,9 @@ use crate::memtable::bulk::part::BulkPart;
use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
PredicateGroup,
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
MemtableStats, PredicateGroup,
};
use crate::metrics::{
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
@@ -279,7 +279,7 @@ impl Memtable for TimeSeriesMemtable {
let iter = self
.series_set
.iter_series(projection, filters, self.dedup, sequence)?;
.iter_series(projection, filters, self.dedup, sequence, None)?;
if self.merge_mode == MergeMode::LastNonNull {
let iter = LastNonNullIter::new(iter);
@@ -457,6 +457,7 @@ impl SeriesSet {
predicate: Option<Predicate>,
dedup: bool,
sequence: Option<SequenceNumber>,
mem_scan_metrics: Option<MemScanMetrics>,
) -> Result<Iter> {
let primary_key_schema = primary_key_schema(&self.region_metadata);
let primary_key_datatypes = self
@@ -475,6 +476,7 @@ impl SeriesSet {
self.codec.clone(),
dedup,
sequence,
mem_scan_metrics,
)
}
}
@@ -524,6 +526,7 @@ struct Iter {
dedup: bool,
sequence: Option<SequenceNumber>,
metrics: Metrics,
mem_scan_metrics: Option<MemScanMetrics>,
}
impl Iter {
@@ -538,6 +541,7 @@ impl Iter {
codec: Arc<DensePrimaryKeyCodec>,
dedup: bool,
sequence: Option<SequenceNumber>,
mem_scan_metrics: Option<MemScanMetrics>,
) -> Result<Self> {
let predicate = predicate
.map(|predicate| {
@@ -560,8 +564,21 @@ impl Iter {
dedup,
sequence,
metrics: Metrics::default(),
mem_scan_metrics,
})
}
fn report_mem_scan_metrics(&mut self) {
if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
let inner = crate::memtable::MemScanMetricsData {
total_series: self.metrics.total_series,
num_rows: self.metrics.num_rows,
num_batches: self.metrics.num_batches,
scan_cost: self.metrics.scan_cost,
};
mem_scan_metrics.merge_inner(&inner);
}
}
}
impl Drop for Iter {
@@ -571,6 +588,9 @@ impl Drop for Iter {
self.metadata.region_id, self.metrics
);
// Report MemScanMetrics if not already reported
self.report_mem_scan_metrics();
READ_ROWS_TOTAL
.with_label_values(&["time_series_memtable"])
.inc_by(self.metrics.num_rows as u64);
@@ -631,8 +651,12 @@ impl Iterator for Iter {
});
return Some(batch);
}
drop(map); // Explicitly drop the read lock
self.metrics.scan_cost += start.elapsed();
// Report MemScanMetrics before returning None
self.report_mem_scan_metrics();
None
}
}
@@ -1211,12 +1235,13 @@ struct TimeSeriesIterBuilder {
}
impl IterBuilder for TimeSeriesIterBuilder {
fn build(&self) -> Result<BoxedBatchIterator> {
fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
let iter = self.series_set.iter_series(
self.projection.clone(),
self.predicate.clone(),
self.dedup,
self.sequence,
metrics,
)?;
if self.merge_mode == MergeMode::LastNonNull {

View File

@@ -19,7 +19,6 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_telemetry::debug;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
use futures::Stream;
use prometheus::IntGauge;
@@ -27,6 +26,7 @@ use smallvec::SmallVec;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::memtable::MemScanMetrics;
use crate::metrics::{
IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
@@ -40,7 +40,7 @@ use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
/// Verbose scan metrics for a partition.
#[derive(Default)]
struct ScanMetricsSet {
pub(crate) struct ScanMetricsSet {
/// Duration to prepare the scan task.
prepare_scan_cost: Duration,
/// Duration to build the (merge) reader.
@@ -60,6 +60,16 @@ struct ScanMetricsSet {
/// Number of file ranges scanned.
num_file_ranges: usize,
// Memtable related metrics:
/// Duration to scan memtables.
mem_scan_cost: Duration,
/// Number of rows read from memtables.
mem_rows: usize,
/// Number of batches read from memtables.
mem_batches: usize,
/// Number of series read from memtables.
mem_series: usize,
// SST related metrics:
/// Duration to build file ranges.
build_parts_cost: Duration,
@@ -95,6 +105,8 @@ struct ScanMetricsSet {
/// Number of send timeout in SeriesScan.
num_series_send_timeout: usize,
/// Number of send full in SeriesScan.
num_series_send_full: usize,
/// Number of rows the series distributor scanned.
num_distributor_rows: usize,
/// Number of batches the series distributor scanned.
@@ -103,6 +115,9 @@ struct ScanMetricsSet {
distributor_scan_cost: Duration,
/// Duration of the series distributor to yield.
distributor_yield_cost: Duration,
/// The stream reached EOF
stream_eof: bool,
}
impl fmt::Debug for ScanMetricsSet {
@@ -133,12 +148,19 @@ impl fmt::Debug for ScanMetricsSet {
num_sst_rows,
first_poll,
num_series_send_timeout,
num_series_send_full,
num_distributor_rows,
num_distributor_batches,
distributor_scan_cost,
distributor_yield_cost,
stream_eof,
mem_scan_cost,
mem_rows,
mem_batches,
mem_series,
} = self;
// Write core metrics
write!(
f,
"{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
@@ -152,25 +174,80 @@ impl fmt::Debug for ScanMetricsSet {
\"num_file_ranges\":{num_file_ranges}, \
\"build_parts_cost\":\"{build_parts_cost:?}\", \
\"rg_total\":{rg_total}, \
\"rg_fulltext_filtered\":{rg_fulltext_filtered}, \
\"rg_inverted_filtered\":{rg_inverted_filtered}, \
\"rg_minmax_filtered\":{rg_minmax_filtered}, \
\"rg_bloom_filtered\":{rg_bloom_filtered}, \
\"rows_before_filter\":{rows_before_filter}, \
\"rows_fulltext_filtered\":{rows_fulltext_filtered}, \
\"rows_inverted_filtered\":{rows_inverted_filtered}, \
\"rows_bloom_filtered\":{rows_bloom_filtered}, \
\"rows_precise_filtered\":{rows_precise_filtered}, \
\"num_sst_record_batches\":{num_sst_record_batches}, \
\"num_sst_batches\":{num_sst_batches}, \
\"num_sst_rows\":{num_sst_rows}, \
\"first_poll\":\"{first_poll:?}\", \
\"num_series_send_timeout\":{num_series_send_timeout}, \
\"num_distributor_rows\":{num_distributor_rows}, \
\"num_distributor_batches\":{num_distributor_batches}, \
\"distributor_scan_cost\":\"{distributor_scan_cost:?}\", \
\"distributor_yield_cost\":\"{distributor_yield_cost:?}\"}}"
)
\"first_poll\":\"{first_poll:?}\""
)?;
// Write non-zero filter counters
if *rg_fulltext_filtered > 0 {
write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
}
if *rg_inverted_filtered > 0 {
write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
}
if *rg_minmax_filtered > 0 {
write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
}
if *rg_bloom_filtered > 0 {
write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
}
if *rows_fulltext_filtered > 0 {
write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
}
if *rows_inverted_filtered > 0 {
write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
}
if *rows_bloom_filtered > 0 {
write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
}
if *rows_precise_filtered > 0 {
write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
}
// Write non-zero distributor metrics
if *num_series_send_timeout > 0 {
write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
}
if *num_series_send_full > 0 {
write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
}
if *num_distributor_rows > 0 {
write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
}
if *num_distributor_batches > 0 {
write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
}
if !distributor_scan_cost.is_zero() {
write!(
f,
", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
)?;
}
if !distributor_yield_cost.is_zero() {
write!(
f,
", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
)?;
}
// Write non-zero memtable metrics
if *mem_rows > 0 {
write!(f, ", \"mem_rows\":{mem_rows}")?;
}
if *mem_batches > 0 {
write!(f, ", \"mem_batches\":{mem_batches}")?;
}
if *mem_series > 0 {
write!(f, ", \"mem_series\":{mem_series}")?;
}
if !mem_scan_cost.is_zero() {
write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
}
write!(f, ", \"stream_eof\":{stream_eof}}}")
}
}
impl ScanMetricsSet {
@@ -249,6 +326,7 @@ impl ScanMetricsSet {
fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
let SeriesDistributorMetrics {
num_series_send_timeout,
num_series_send_full,
num_rows,
num_batches,
scan_cost,
@@ -256,6 +334,7 @@ impl ScanMetricsSet {
} = distributor_metrics;
self.num_series_send_timeout += *num_series_send_timeout;
self.num_series_send_full += *num_series_send_full;
self.num_distributor_rows += *num_rows;
self.num_distributor_batches += *num_batches;
self.distributor_scan_cost += *scan_cost;
@@ -328,6 +407,8 @@ struct PartitionMetricsInner {
scanner_type: &'static str,
/// Query start time.
query_start: Instant,
/// Whether to use verbose logging.
explain_verbose: bool,
/// Verbose scan metrics that only log to debug logs by default.
metrics: Mutex<ScanMetricsSet>,
in_progress_scan: IntGauge,
@@ -346,25 +427,35 @@ struct PartitionMetricsInner {
}
impl PartitionMetricsInner {
fn on_finish(&self) {
fn on_finish(&self, stream_eof: bool) {
let mut metrics = self.metrics.lock().unwrap();
if metrics.total_cost.is_zero() {
metrics.total_cost = self.query_start.elapsed();
}
if !metrics.stream_eof {
metrics.stream_eof = stream_eof;
}
}
}
impl Drop for PartitionMetricsInner {
fn drop(&mut self) {
self.on_finish();
self.on_finish(false);
let metrics = self.metrics.lock().unwrap();
metrics.observe_metrics();
self.in_progress_scan.dec();
debug!(
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
);
if self.explain_verbose {
common_telemetry::info!(
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
);
} else {
common_telemetry::debug!(
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
);
}
}
}
@@ -403,6 +494,7 @@ impl PartitionMetrics {
partition: usize,
scanner_type: &'static str,
query_start: Instant,
explain_verbose: bool,
metrics_set: &ExecutionPlanMetricsSet,
) -> Self {
let partition_str = partition.to_string();
@@ -414,6 +506,7 @@ impl PartitionMetrics {
partition,
scanner_type,
query_start,
explain_verbose,
metrics: Mutex::new(metrics),
in_progress_scan,
build_parts_cost: MetricBuilder::new(metrics_set)
@@ -454,6 +547,15 @@ impl PartitionMetrics {
self.0.convert_cost.add_duration(cost);
}
/// Reports memtable scan metrics.
pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
let mut metrics = self.0.metrics.lock().unwrap();
metrics.mem_scan_cost += data.scan_cost;
metrics.mem_rows += data.num_rows;
metrics.mem_batches += data.num_batches;
metrics.mem_series += data.total_series;
}
/// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
self.0
@@ -476,7 +578,7 @@ impl PartitionMetrics {
/// Finishes the query.
pub(crate) fn on_finish(&self) {
self.0.on_finish();
self.0.on_finish(true);
}
/// Sets the distributor metrics.
@@ -502,6 +604,8 @@ impl fmt::Debug for PartitionMetrics {
pub(crate) struct SeriesDistributorMetrics {
/// Number of send timeout in SeriesScan.
pub(crate) num_series_send_timeout: usize,
/// Number of send full in SeriesScan.
pub(crate) num_series_send_full: usize,
/// Number of rows the series distributor scanned.
pub(crate) num_rows: usize,
/// Number of batches the series distributor scanned.
@@ -524,13 +628,20 @@ pub(crate) fn scan_mem_ranges(
part_metrics.inc_num_mem_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
let iter = range.build_prune_iter(time_range)?;
let mem_scan_metrics = Some(MemScanMetrics::default());
let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
let mut source = Source::Iter(iter);
while let Some(batch) = source.next_batch().await? {
yield batch;
}
// Report the memtable scan metrics to partition metrics
if let Some(ref metrics) = mem_scan_metrics {
let data = metrics.data();
part_metrics.report_mem_scan_metrics(&data);
}
}
}
}

View File

@@ -29,7 +29,9 @@ use datatypes::schema::SchemaRef;
use futures::StreamExt;
use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
};
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
@@ -87,7 +89,9 @@ impl SeqScan {
pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
let metrics_set = ExecutionPlanMetricsSet::new();
let streams = (0..self.properties.partitions.len())
.map(|partition: usize| self.scan_partition(&metrics_set, partition))
.map(|partition: usize| {
self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
})
.collect::<Result<Vec<_>, _>>()?;
let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
@@ -100,7 +104,7 @@ impl SeqScan {
let streams = (0..self.properties.partitions.len())
.map(|partition| {
let metrics = self.new_partition_metrics(&metrics_set, partition);
let metrics = self.new_partition_metrics(false, &metrics_set, partition);
self.scan_batch_in_partition(partition, metrics)
})
.collect::<Result<Vec<_>>>()?;
@@ -116,7 +120,7 @@ impl SeqScan {
assert!(self.compaction);
let metrics_set = ExecutionPlanMetricsSet::new();
let part_metrics = self.new_partition_metrics(&metrics_set, 0);
let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
debug_assert_eq!(1, self.properties.partitions.len());
let partition_ranges = &self.properties.partitions[0];
@@ -210,10 +214,19 @@ impl SeqScan {
/// Otherwise the returned stream might not contains any data.
fn scan_partition_impl(
&self,
ctx: &QueryScanContext,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let metrics = self.new_partition_metrics(metrics_set, partition);
if ctx.explain_verbose {
common_telemetry::info!(
"SeqScan partition {}, region_id: {}",
partition,
self.stream_ctx.input.region_metadata().region_id
);
}
let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
@@ -345,6 +358,7 @@ impl SeqScan {
/// Sets the partition metrics for the given partition if it is not for compaction.
fn new_partition_metrics(
&self,
explain_verbose: bool,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> PartitionMetrics {
@@ -353,6 +367,7 @@ impl SeqScan {
partition,
get_scanner_type(self.compaction),
self.stream_ctx.query_start,
explain_verbose,
metrics_set,
);
@@ -379,10 +394,11 @@ impl RegionScanner for SeqScan {
fn scan_partition(
&self,
ctx: &QueryScanContext,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(metrics_set, partition)
self.scan_partition_impl(ctx, metrics_set, partition)
.map_err(BoxedError::new)
}

View File

@@ -29,7 +29,9 @@ use futures::StreamExt;
use smallvec::{smallvec, SmallVec};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
};
use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Semaphore;
@@ -87,13 +89,20 @@ impl SeriesScan {
fn scan_partition_impl(
&self,
ctx: &QueryScanContext,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let metrics =
new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
let metrics = new_partition_metrics(
&self.stream_ctx,
ctx.explain_verbose,
metrics_set,
partition,
&self.metrics_list,
);
let batch_stream = self.scan_batch_in_partition(partition, metrics.clone(), metrics_set)?;
let batch_stream =
self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
let input = &self.stream_ctx.input;
let record_batch_stream = ConvertBatchStream::new(
@@ -111,10 +120,19 @@ impl SeriesScan {
fn scan_batch_in_partition(
&self,
ctx: &QueryScanContext,
partition: usize,
part_metrics: PartitionMetrics,
metrics_set: &ExecutionPlanMetricsSet,
) -> Result<ScanBatchStream> {
if ctx.explain_verbose {
common_telemetry::info!(
"SeriesScan partition {}, region_id: {}",
partition,
self.stream_ctx.input.region_metadata().region_id
);
}
ensure!(
partition < self.properties.num_partitions(),
PartitionOutOfRangeSnafu {
@@ -146,6 +164,8 @@ impl SeriesScan {
part_metrics.merge_metrics(&metrics);
}
part_metrics.on_finish();
};
Ok(Box::pin(stream))
}
@@ -190,7 +210,7 @@ impl SeriesScan {
let part_num = self.properties.num_partitions();
let metrics_set = ExecutionPlanMetricsSet::default();
let streams = (0..part_num)
.map(|i| self.scan_partition(&metrics_set, i))
.map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
.collect::<Result<Vec<_>, BoxedError>>()?;
let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
Ok(Box::pin(chained_stream))
@@ -204,12 +224,18 @@ impl SeriesScan {
.map(|partition| {
let metrics = new_partition_metrics(
&self.stream_ctx,
false,
&metrics_set,
partition,
&self.metrics_list,
);
self.scan_batch_in_partition(partition, metrics, &metrics_set)
self.scan_batch_in_partition(
&QueryScanContext::default(),
partition,
metrics,
&metrics_set,
)
})
.collect::<Result<Vec<_>>>()?;
@@ -242,10 +268,11 @@ impl RegionScanner for SeriesScan {
fn scan_partition(
&self,
ctx: &QueryScanContext,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(metrics_set, partition)
self.scan_partition_impl(ctx, metrics_set, partition)
.map_err(BoxedError::new)
}
@@ -328,6 +355,7 @@ impl SeriesDistributor {
async fn scan_partitions(&mut self) -> Result<()> {
let part_metrics = new_partition_metrics(
&self.stream_ctx,
false,
&self.metrics_set,
self.partitions.len(),
&self.metrics_list,
@@ -399,6 +427,7 @@ impl SeriesDistributor {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_series_send_timeout = self.senders.num_timeout;
metrics.num_series_send_full = self.senders.num_full;
part_metrics.set_distributor_metrics(&metrics);
part_metrics.on_finish();
@@ -444,6 +473,8 @@ struct SenderList {
sender_idx: usize,
/// Number of timeout.
num_timeout: usize,
/// Number of full senders.
num_full: usize,
}
impl SenderList {
@@ -454,6 +485,7 @@ impl SenderList {
num_nones,
sender_idx: 0,
num_timeout: 0,
num_full: 0,
}
}
@@ -471,6 +503,7 @@ impl SenderList {
match sender.try_send(Ok(batch)) {
Ok(()) => return Ok(None),
Err(TrySendError::Full(res)) => {
self.num_full += 1;
// Safety: we send Ok.
batch = res.unwrap();
}
@@ -546,6 +579,7 @@ impl SenderList {
fn new_partition_metrics(
stream_ctx: &StreamContext,
explain_verbose: bool,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
metrics_list: &PartitionMetricsList,
@@ -555,6 +589,7 @@ fn new_partition_metrics(
partition,
"SeriesScan",
stream_ctx.query_start,
explain_verbose,
metrics_set,
);

View File

@@ -27,7 +27,9 @@ use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
use store_api::region_engine::{
PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
};
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::range::RangeBuilderList;
@@ -71,7 +73,7 @@ impl UnorderedScan {
let metrics_set = ExecutionPlanMetricsSet::new();
let part_num = self.properties.num_partitions();
let streams = (0..part_num)
.map(|i| self.scan_partition(&metrics_set, i))
.map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
.collect::<Result<Vec<_>, BoxedError>>()?;
let stream = stream! {
for mut stream in streams {
@@ -139,7 +141,7 @@ impl UnorderedScan {
let streams = (0..self.properties.partitions.len())
.map(|partition| {
let metrics = self.partition_metrics(partition, &metrics_set);
let metrics = self.partition_metrics(false, partition, &metrics_set);
self.scan_batch_in_partition(partition, metrics)
})
.collect::<Result<Vec<_>>>()?;
@@ -149,6 +151,7 @@ impl UnorderedScan {
fn partition_metrics(
&self,
explain_verbose: bool,
partition: usize,
metrics_set: &ExecutionPlanMetricsSet,
) -> PartitionMetrics {
@@ -157,6 +160,7 @@ impl UnorderedScan {
partition,
"UnorderedScan",
self.stream_ctx.query_start,
explain_verbose,
metrics_set,
);
self.metrics_list.set(partition, part_metrics.clone());
@@ -165,10 +169,19 @@ impl UnorderedScan {
fn scan_partition_impl(
&self,
ctx: &QueryScanContext,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let metrics = self.partition_metrics(partition, metrics_set);
if ctx.explain_verbose {
common_telemetry::info!(
"UnorderedScan partition {}, region_id: {}",
partition,
self.stream_ctx.input.region_metadata().region_id
);
}
let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
@@ -263,6 +276,8 @@ impl UnorderedScan {
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}
part_metrics.on_finish();
};
Ok(Box::pin(stream))
}
@@ -288,10 +303,11 @@ impl RegionScanner for UnorderedScan {
fn scan_partition(
&self,
ctx: &QueryScanContext,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(metrics_set, partition)
self.scan_partition_impl(ctx, metrics_set, partition)
.map_err(BoxedError::new)
}

View File

@@ -539,6 +539,12 @@ impl QueryExecutor for DatafusionQueryEngine {
ctx: &QueryEngineContext,
plan: &Arc<dyn ExecutionPlan>,
) -> Result<SendableRecordBatchStream> {
let explain_verbose = ctx.query_ctx().explain_verbose();
let output_partitions = plan.properties().output_partitioning().partition_count();
if explain_verbose {
common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}");
}
let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer();
let task_ctx = ctx.build_task_ctx();
@@ -562,9 +568,15 @@ impl QueryExecutor for DatafusionQueryEngine {
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
stream.set_metrics2(plan.clone());
stream.set_explain_verbose(ctx.query_ctx().explain_verbose());
stream.set_explain_verbose(explain_verbose);
let stream = OnDone::new(Box::pin(stream), move || {
exec_timer.observe_duration();
let exec_cost = exec_timer.stop_and_record();
if explain_verbose {
common_telemetry::info!(
"DatafusionQueryEngine execute 1 stream, cost: {:?}s",
exec_cost,
);
}
});
Ok(Box::pin(stream))
}
@@ -591,7 +603,13 @@ impl QueryExecutor for DatafusionQueryEngine {
stream.set_metrics2(plan.clone());
stream.set_explain_verbose(ctx.query_ctx().explain_verbose());
let stream = OnDone::new(Box::pin(stream), move || {
exec_timer.observe_duration();
let exec_cost = exec_timer.stop_and_record();
if explain_verbose {
common_telemetry::info!(
"DatafusionQueryEngine execute {output_partitions} stream, cost: {:?}s",
exec_cost
);
}
});
Ok(Box::pin(stream))
}

View File

@@ -148,6 +148,8 @@ pub struct MergeScanExec {
properties: PlanProperties,
/// Metrics from sub stages
sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
/// Metrics for each partition
partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
query_ctx: QueryContextRef,
target_partition: usize,
partition_cols: Vec<String>,
@@ -244,6 +246,7 @@ impl MergeScanExec {
region_query_handler,
metric: ExecutionPlanMetricsSet::new(),
sub_stage_metrics: Arc::default(),
partition_metrics: Arc::default(),
properties,
query_ctx,
target_partition,
@@ -263,12 +266,14 @@ impl MergeScanExec {
let schema = self.schema.clone();
let query_ctx = self.query_ctx.clone();
let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
let partition_metrics_moved = self.partition_metrics.clone();
let plan = self.plan.clone();
let target_partition = self.target_partition;
let dbname = context.task_id().unwrap_or_default();
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let current_channel = self.query_ctx.channel();
let read_preference = self.query_ctx.read_preference();
let explain_verbose = self.query_ctx.explain_verbose();
let stream = Box::pin(stream!({
// only report metrics once for each MergeScan
@@ -295,7 +300,17 @@ impl MergeScanExec {
region_id,
plan: plan.clone(),
};
let region_start = Instant::now();
let do_get_start = Instant::now();
if explain_verbose {
common_telemetry::info!(
"Merge scan one region, partition: {}, region_id: {}",
partition,
region_id
);
}
let mut stream = region_query_handler
.do_get(read_preference, request)
.await
@@ -332,10 +347,31 @@ impl MergeScanExec {
// reset poll timer
poll_timer = Instant::now();
}
common_telemetry::debug!(
"Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
);
let total_cost = region_start.elapsed();
// Record region metrics and push to global partition_metrics
let region_metrics = RegionMetrics {
region_id,
poll_duration,
do_get_cost,
total_cost,
};
// Push RegionMetrics to global partition_metrics immediately after scanning this region
{
let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
let partition_metrics = partition_metrics_guard
.entry(partition)
.or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
partition_metrics.add_region_metrics(region_metrics);
}
if explain_verbose {
common_telemetry::info!(
"Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
);
}
// process metrics after all data is drained.
if let Some(metrics) = stream.metrics() {
@@ -358,6 +394,14 @@ impl MergeScanExec {
MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
}
// Finish partition metrics and log results
{
let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
partition_metrics.finish();
}
}
}));
Ok(Box::pin(RecordBatchStreamWrapper {
@@ -409,6 +453,7 @@ impl MergeScanExec {
self.properties.boundedness,
),
sub_stage_metrics: self.sub_stage_metrics.clone(),
partition_metrics: self.partition_metrics.clone(),
query_ctx: self.query_ctx.clone(),
target_partition: self.target_partition,
partition_cols: self.partition_cols.clone(),
@@ -436,6 +481,90 @@ impl MergeScanExec {
pub fn region_count(&self) -> usize {
self.regions.len()
}
fn partition_metrics(&self) -> Vec<PartitionMetrics> {
self.partition_metrics
.lock()
.unwrap()
.values()
.cloned()
.collect()
}
}
/// Metrics for a region of a partition.
#[derive(Debug, Clone)]
struct RegionMetrics {
region_id: RegionId,
poll_duration: Duration,
do_get_cost: Duration,
/// Total cost to scan the region.
total_cost: Duration,
}
/// Metrics for a partition of a MergeScanExec.
#[derive(Debug, Clone)]
struct PartitionMetrics {
partition: usize,
region_metrics: Vec<RegionMetrics>,
total_poll_duration: Duration,
total_do_get_cost: Duration,
total_regions: usize,
explain_verbose: bool,
finished: bool,
}
impl PartitionMetrics {
fn new(partition: usize, explain_verbose: bool) -> Self {
Self {
partition,
region_metrics: Vec::new(),
total_poll_duration: Duration::ZERO,
total_do_get_cost: Duration::ZERO,
total_regions: 0,
explain_verbose,
finished: false,
}
}
fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
self.total_poll_duration += region_metrics.poll_duration;
self.total_do_get_cost += region_metrics.do_get_cost;
self.total_regions += 1;
self.region_metrics.push(region_metrics);
}
/// Finish the partition metrics and log the results.
fn finish(&mut self) {
if self.finished {
return;
}
self.finished = true;
self.log_metrics();
}
/// Log partition metrics based on explain_verbose level.
fn log_metrics(&self) {
if self.explain_verbose {
common_telemetry::info!(
"MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
self.partition, self.total_regions, self.total_poll_duration, self.total_do_get_cost
);
} else {
common_telemetry::debug!(
"MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
self.partition, self.total_regions, self.total_poll_duration, self.total_do_get_cost
);
}
}
}
impl Drop for PartitionMetrics {
fn drop(&mut self) {
if !self.finished {
self.log_metrics();
}
}
}
impl ExecutionPlan for MergeScanExec {
@@ -484,12 +613,42 @@ impl ExecutionPlan for MergeScanExec {
}
impl DisplayAs for MergeScanExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MergeScanExec: peers=[")?;
for region_id in self.regions.iter() {
write!(f, "{}, ", region_id)?;
}
write!(f, "]")
write!(f, "]")?;
if matches!(t, DisplayFormatType::Verbose) {
let partition_metrics = self.partition_metrics();
if !partition_metrics.is_empty() {
write!(f, ", metrics={{")?;
for (i, pm) in partition_metrics.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
pm.partition, pm.total_regions,
pm.total_poll_duration,
pm.total_do_get_cost)?;
for (j, rm) in pm.region_metrics.iter().enumerate() {
if j > 0 {
write!(f, ",")?;
}
write!(f, "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
rm.region_id,
rm.poll_duration,
rm.do_get_cost,
rm.total_cost)?;
}
write!(f, "]}}")?;
}
write!(f, "}}")?;
}
}
Ok(())
}
}

View File

@@ -28,6 +28,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use session::context::QueryContextRef;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
@@ -135,6 +136,7 @@ pub struct DummyTableProvider {
metadata: RegionMetadataRef,
/// Keeping a mutable request makes it possible to change in the optimize phase.
scan_request: Arc<Mutex<ScanRequest>>,
query_ctx: Option<QueryContextRef>,
}
impl fmt::Debug for DummyTableProvider {
@@ -178,7 +180,11 @@ impl TableProvider for DummyTableProvider {
.handle_query(self.region_id, request.clone())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(Arc::new(RegionScanExec::new(scanner, request)?))
let mut scan_exec = RegionScanExec::new(scanner, request)?;
if let Some(query_ctx) = &self.query_ctx {
scan_exec.set_explain_verbose(query_ctx.explain_verbose());
}
Ok(Arc::new(scan_exec))
}
fn supports_filters_pushdown(
@@ -221,6 +227,7 @@ impl DummyTableProvider {
engine,
metadata,
scan_request: Default::default(),
query_ctx: None,
}
}
@@ -261,7 +268,7 @@ impl DummyTableProviderFactory {
&self,
region_id: RegionId,
engine: RegionEngineRef,
ctx: Option<&session::context::QueryContext>,
query_ctx: Option<QueryContextRef>,
) -> Result<DummyTableProvider> {
let metadata =
engine
@@ -272,7 +279,8 @@ impl DummyTableProviderFactory {
region_id,
})?;
let scan_request = ctx
let scan_request = query_ctx
.as_ref()
.map(|ctx| ScanRequest {
sequence: ctx.get_snapshot(region_id.as_u64()),
sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
@@ -285,6 +293,7 @@ impl DummyTableProviderFactory {
engine,
metadata,
scan_request: Arc::new(Mutex::new(scan_request)),
query_ctx,
})
}
}
@@ -295,7 +304,7 @@ impl TableProviderFactory for DummyTableProviderFactory {
&self,
region_id: RegionId,
engine: RegionEngineRef,
ctx: Option<&session::context::QueryContext>,
ctx: Option<QueryContextRef>,
) -> Result<Arc<dyn TableProvider>> {
let provider = self.create_table_provider(region_id, engine, ctx).await?;
Ok(Arc::new(provider))
@@ -308,7 +317,7 @@ pub trait TableProviderFactory: Send + Sync {
&self,
region_id: RegionId,
engine: RegionEngineRef,
ctx: Option<&session::context::QueryContext>,
ctx: Option<QueryContextRef>,
) -> Result<Arc<dyn TableProvider>>;
}

View File

@@ -15,6 +15,7 @@
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use arrow_flight::FlightData;
use common_error::ext::ErrorExt;
@@ -22,7 +23,7 @@ use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing::{info_span, Instrument};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{error, warn};
use common_telemetry::{error, info, warn};
use futures::channel::mpsc;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, Stream, StreamExt};
@@ -35,6 +36,61 @@ use crate::error;
use crate::grpc::flight::TonicResult;
use crate::grpc::FlightCompression;
/// Metrics collector for Flight stream with RAII logging pattern
struct StreamMetrics {
send_schema_duration: Duration,
send_record_batch_duration: Duration,
send_metrics_duration: Duration,
fetch_content_duration: Duration,
record_batch_count: usize,
metrics_count: usize,
total_rows: usize,
total_bytes: usize,
should_log: bool,
}
impl StreamMetrics {
fn new(should_log: bool) -> Self {
Self {
send_schema_duration: Duration::ZERO,
send_record_batch_duration: Duration::ZERO,
send_metrics_duration: Duration::ZERO,
fetch_content_duration: Duration::ZERO,
record_batch_count: 0,
metrics_count: 0,
total_rows: 0,
total_bytes: 0,
should_log,
}
}
}
impl Drop for StreamMetrics {
fn drop(&mut self) {
if self.should_log {
info!(
"flight_data_stream finished: \
send_schema_duration={:?}, \
send_record_batch_duration={:?}, \
send_metrics_duration={:?}, \
fetch_content_duration={:?}, \
record_batch_count={}, \
metrics_count={}, \
total_rows={}, \
total_bytes={}",
self.send_schema_duration,
self.send_record_batch_duration,
self.send_metrics_duration,
self.fetch_content_duration,
self.record_batch_count,
self.metrics_count,
self.total_rows,
self.total_bytes
);
}
}
}
#[pin_project(PinnedDrop)]
pub struct FlightRecordBatchStream {
#[pin]
@@ -78,15 +134,29 @@ impl FlightRecordBatchStream {
mut tx: Sender<TonicResult<FlightMessage>>,
should_send_partial_metrics: bool,
) {
let mut metrics = StreamMetrics::new(should_send_partial_metrics);
let schema = recordbatches.schema().arrow_schema().clone();
let start = Instant::now();
if let Err(e) = tx.send(Ok(FlightMessage::Schema(schema))).await {
warn!(e; "stop sending Flight data");
return;
}
metrics.send_schema_duration += start.elapsed();
while let Some(batch_or_err) = recordbatches.next().in_current_span().await {
while let Some(batch_or_err) = {
let start = Instant::now();
let result = recordbatches.next().in_current_span().await;
metrics.fetch_content_duration += start.elapsed();
result
} {
match batch_or_err {
Ok(recordbatch) => {
metrics.total_rows += recordbatch.num_rows();
metrics.record_batch_count += 1;
metrics.total_bytes += recordbatch.df_record_batch().get_array_memory_size();
let start = Instant::now();
if let Err(e) = tx
.send(Ok(FlightMessage::RecordBatch(
recordbatch.into_df_record_batch(),
@@ -96,15 +166,20 @@ impl FlightRecordBatchStream {
warn!(e; "stop sending Flight data");
return;
}
metrics.send_record_batch_duration += start.elapsed();
if should_send_partial_metrics {
if let Some(metrics) = recordbatches
if let Some(metrics_str) = recordbatches
.metrics()
.and_then(|m| serde_json::to_string(&m).ok())
{
if let Err(e) = tx.send(Ok(FlightMessage::Metrics(metrics))).await {
metrics.metrics_count += 1;
let start = Instant::now();
if let Err(e) = tx.send(Ok(FlightMessage::Metrics(metrics_str))).await {
warn!(e; "stop sending Flight data");
return;
}
metrics.send_metrics_duration += start.elapsed();
}
}
}
@@ -126,7 +201,10 @@ impl FlightRecordBatchStream {
.metrics()
.and_then(|m| serde_json::to_string(&m).ok())
{
metrics.metrics_count += 1;
let start = Instant::now();
let _ = tx.send(Ok(FlightMessage::Metrics(metrics_str))).await;
metrics.send_metrics_duration += start.elapsed();
}
}
}

View File

@@ -390,6 +390,13 @@ impl PrepareRequest {
}
}
/// Necessary context of the query for the scanner.
#[derive(Clone, Default)]
pub struct QueryScanContext {
/// Whether the query is EXPLAIN ANALYZE VERBOSE.
pub explain_verbose: bool,
}
/// A scanner that provides a way to scan the region concurrently.
///
/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
@@ -415,6 +422,7 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
/// Panics if the `partition` is out of bound.
fn scan_partition(
&self,
ctx: &QueryScanContext,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError>;
@@ -832,6 +840,7 @@ impl RegionScanner for SinglePartitionScanner {
fn scan_partition(
&self,
_ctx: &QueryScanContext,
_metrics_set: &ExecutionPlanMetricsSet,
_partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {

View File

@@ -40,7 +40,9 @@ use datafusion_physical_expr::{
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datatypes::compute::SortOptions;
use futures::{Stream, StreamExt};
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScannerRef};
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScannerRef,
};
use store_api::storage::{ScanRequest, TimeSeriesDistribution};
use crate::table::metrics::StreamMetrics;
@@ -59,6 +61,7 @@ pub struct RegionScanExec {
is_partition_set: bool,
// TODO(ruihang): handle TimeWindowed dist via this parameter
distribution: Option<TimeSeriesDistribution>,
explain_verbose: bool,
}
impl RegionScanExec {
@@ -165,6 +168,7 @@ impl RegionScanExec {
total_rows,
is_partition_set: false,
distribution: request.distribution,
explain_verbose: false,
})
}
@@ -231,6 +235,7 @@ impl RegionScanExec {
total_rows: self.total_rows,
is_partition_set: true,
distribution: self.distribution,
explain_verbose: self.explain_verbose,
})
}
@@ -266,6 +271,10 @@ impl RegionScanExec {
.map(|col| col.column_schema.name.clone())
.collect()
}
pub fn set_explain_verbose(&mut self, explain_verbose: bool) {
self.explain_verbose = explain_verbose;
}
}
impl ExecutionPlan for RegionScanExec {
@@ -301,11 +310,14 @@ impl ExecutionPlan for RegionScanExec {
let span =
tracing_context.attach(common_telemetry::tracing::info_span!("read_from_region"));
let ctx = QueryScanContext {
explain_verbose: self.explain_verbose,
};
let stream = self
.scanner
.lock()
.unwrap()
.scan_partition(&self.metric, partition)
.scan_partition(&ctx, &self.metric, partition)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream_metrics = StreamMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {