From a22d08f1b13096574551c0f9c63158ffc4e93207 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 10 Dec 2025 17:16:20 +0800 Subject: [PATCH] feat: collect merge and dedup metrics (#7375) * feat: collect FlatMergeReader metrics Signed-off-by: evenyag * feat: add MergeMetricsReporter, rename Metrics to MergeMetrics Signed-off-by: evenyag * feat: remove num_input_rows from MergeMetrics The merge reader won't dedup so there is no need to collect input rows Signed-off-by: evenyag * feat: report merge metrics to PartitionMetrics Signed-off-by: evenyag * feat: add dedup cost to DedupMetrics Signed-off-by: evenyag * feat: collect dedup metrics Signed-off-by: evenyag * refactor: remove metrics from FlatMergeIterator Signed-off-by: evenyag * feat: remove num_output_rows from MergeMetrics Signed-off-by: evenyag * chore: fix clippy Signed-off-by: evenyag * feat: implement merge() for merge and dedup metrics Signed-off-by: evenyag * fix: report metrics after observe metrics Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/benches/simple_bulk_memtable.rs | 1 + src/mito2/src/flush.rs | 10 +- .../src/memtable/simple_bulk_memtable.rs | 4 +- src/mito2/src/read/dedup.rs | 113 ++++++++++++-- src/mito2/src/read/flat_dedup.rs | 66 +++++++- src/mito2/src/read/flat_merge.rs | 66 ++++++-- src/mito2/src/read/merge.rs | 146 ++++++++++++++---- src/mito2/src/read/scan_util.rs | 51 ++++++ src/mito2/src/read/seq_scan.rs | 24 ++- src/mito2/src/read/series_scan.rs | 11 +- 10 files changed, 415 insertions(+), 77 deletions(-) diff --git a/src/mito2/benches/simple_bulk_memtable.rs b/src/mito2/benches/simple_bulk_memtable.rs index 02b6538aa9..7166c54afd 100644 --- a/src/mito2/benches/simple_bulk_memtable.rs +++ b/src/mito2/benches/simple_bulk_memtable.rs @@ -144,6 +144,7 @@ async fn flush(mem: &SimpleBulkMemtable) { let reader = Box::new(DedupReader::new( merge_reader, read::dedup::LastRow::new(true), + None, )); Source::Reader(reader) }; diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index eff6529223..8371fd52c7 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -730,11 +730,13 @@ async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> // dedup according to merge mode match options.merge_mode.unwrap_or(MergeMode::LastRow) { MergeMode::LastRow => { - Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _ - } - MergeMode::LastNonNull => { - Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _ + Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _ } + MergeMode::LastNonNull => Box::new(DedupReader::new( + merge_reader, + LastNonNull::new(false), + None, + )) as _, } }; Source::Reader(maybe_dedup) diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 4e0b9ac525..4c3f31c2b8 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -627,7 +627,7 @@ mod tests { .await .unwrap(); - let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false)); + let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None); let mut num_rows = 0; while let Some(b) = reader.next_batch().await.unwrap() { num_rows += b.num_rows(); @@ -659,7 +659,7 @@ mod tests { .await .unwrap(); - let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false)); + let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None); let mut num_rows = 0; while let Some(b) = reader.next_batch().await.unwrap() { num_rows += b.num_rows(); diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index c3db629f84..5c881459b2 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -14,6 +14,10 @@ //! Utilities to remove duplicate rows from a sorted batch. +use std::fmt; +use std::sync::Arc; +use std::time::{Duration, Instant}; + use api::v1::OpType; use async_trait::async_trait; use common_telemetry::debug; @@ -27,21 +31,34 @@ use crate::error::Result; use crate::metrics::MERGE_FILTER_ROWS_TOTAL; use crate::read::{Batch, BatchColumn, BatchReader}; +/// Trait for reporting dedup metrics. +pub trait DedupMetricsReport: Send + Sync { + /// Reports and resets the metrics. + fn report(&self, metrics: &mut DedupMetrics); +} + /// A reader that dedup sorted batches from a source based on the /// dedup strategy. pub struct DedupReader { source: R, strategy: S, metrics: DedupMetrics, + /// Optional metrics reporter. + metrics_reporter: Option>, } impl DedupReader { /// Creates a new dedup reader. - pub fn new(source: R, strategy: S) -> Self { + pub fn new( + source: R, + strategy: S, + metrics_reporter: Option>, + ) -> Self { Self { source, strategy, metrics: DedupMetrics::default(), + metrics_reporter, } } } @@ -51,11 +68,14 @@ impl DedupReader { async fn fetch_next_batch(&mut self) -> Result> { while let Some(batch) = self.source.next_batch().await? { if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? { + self.metrics.maybe_report(&self.metrics_reporter); return Ok(Some(batch)); } } - self.strategy.finish(&mut self.metrics) + let result = self.strategy.finish(&mut self.metrics)?; + self.metrics.maybe_report(&self.metrics_reporter); + Ok(result) } } @@ -76,6 +96,11 @@ impl Drop for DedupReader { MERGE_FILTER_ROWS_TOTAL .with_label_values(&["delete"]) .inc_by(self.metrics.num_unselected_rows as u64); + + // Report any remaining metrics. + if let Some(reporter) = &self.metrics_reporter { + reporter.report(&mut self.metrics); + } } } @@ -138,6 +163,8 @@ impl DedupStrategy for LastRow { mut batch: Batch, metrics: &mut DedupMetrics, ) -> Result> { + let start = Instant::now(); + if batch.is_empty() { return Ok(None); } @@ -160,6 +187,7 @@ impl DedupStrategy for LastRow { if batch.num_rows() == 1 { // We don't need to update `prev_batch` because they have the same // key and timestamp. + metrics.dedup_cost += start.elapsed(); return Ok(None); } // Skips the first row. @@ -189,6 +217,8 @@ impl DedupStrategy for LastRow { filter_deleted_from_batch(&mut batch, metrics)?; } + metrics.dedup_cost += start.elapsed(); + // The batch can become empty if all rows are deleted. if batch.is_empty() { Ok(None) @@ -215,12 +245,58 @@ fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> R } /// Metrics for deduplication. -#[derive(Debug, Default)] +#[derive(Default)] pub struct DedupMetrics { /// Number of rows removed during deduplication. pub(crate) num_unselected_rows: usize, /// Number of deleted rows. pub(crate) num_deleted_rows: usize, + /// Time spent on deduplication. + pub(crate) dedup_cost: Duration, +} + +impl fmt::Debug for DedupMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Skip output if dedup_cost is zero + if self.dedup_cost.is_zero() { + return write!(f, "{{}}"); + } + + write!(f, r#"{{"dedup_cost":"{:?}""#, self.dedup_cost)?; + + if self.num_unselected_rows > 0 { + write!(f, r#", "num_unselected_rows":{}"#, self.num_unselected_rows)?; + } + if self.num_deleted_rows > 0 { + write!(f, r#", "num_deleted_rows":{}"#, self.num_deleted_rows)?; + } + + write!(f, "}}") + } +} + +impl DedupMetrics { + /// Merges metrics from another DedupMetrics instance. + pub(crate) fn merge(&mut self, other: &DedupMetrics) { + let DedupMetrics { + num_unselected_rows, + num_deleted_rows, + dedup_cost, + } = other; + + self.num_unselected_rows += *num_unselected_rows; + self.num_deleted_rows += *num_deleted_rows; + self.dedup_cost += *dedup_cost; + } + + /// Reports the metrics if dedup_cost exceeds 10ms and resets them. + pub(crate) fn maybe_report(&mut self, reporter: &Option>) { + if self.dedup_cost.as_millis() > 10 + && let Some(r) = reporter + { + r.report(self); + } + } } /// Buffer to store fields in the last row to merge. @@ -427,6 +503,8 @@ impl LastNonNull { impl DedupStrategy for LastNonNull { fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result> { + let start = Instant::now(); + if batch.is_empty() { return Ok(None); } @@ -444,6 +522,7 @@ impl DedupStrategy for LastNonNull { // Next key is different. let buffer = std::mem::replace(buffer, batch); let merged = self.last_fields.merge_last_non_null(buffer, metrics)?; + metrics.dedup_cost += start.elapsed(); return Ok(merged); } @@ -451,6 +530,7 @@ impl DedupStrategy for LastNonNull { // The next batch has a different timestamp. let buffer = std::mem::replace(buffer, batch); let merged = self.last_fields.merge_last_non_null(buffer, metrics)?; + metrics.dedup_cost += start.elapsed(); return Ok(merged); } @@ -460,6 +540,7 @@ impl DedupStrategy for LastNonNull { // We assumes each batch doesn't contain duplicate rows so we only need to check the first row. if batch.num_rows() == 1 { self.last_fields.push_first_row(&batch); + metrics.dedup_cost += start.elapsed(); return Ok(None); } @@ -472,10 +553,14 @@ impl DedupStrategy for LastNonNull { let buffer = std::mem::replace(buffer, batch); let merged = self.last_fields.merge_last_non_null(buffer, metrics)?; + metrics.dedup_cost += start.elapsed(); + Ok(merged) } fn finish(&mut self, metrics: &mut DedupMetrics) -> Result> { + let start = Instant::now(); + let Some(buffer) = self.buffer.take() else { return Ok(None); }; @@ -485,6 +570,8 @@ impl DedupStrategy for LastNonNull { let merged = self.last_fields.merge_last_non_null(buffer, metrics)?; + metrics.dedup_cost += start.elapsed(); + Ok(merged) } } @@ -614,14 +701,14 @@ mod tests { // Test last row. let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastRow::new(true)); + let mut reader = DedupReader::new(reader, LastRow::new(true), None); check_reader_result(&mut reader, &input).await; assert_eq!(0, reader.metrics().num_unselected_rows); assert_eq!(0, reader.metrics().num_deleted_rows); // Test last non-null. let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); check_reader_result(&mut reader, &input).await; assert_eq!(0, reader.metrics().num_unselected_rows); assert_eq!(0, reader.metrics().num_deleted_rows); @@ -662,7 +749,7 @@ mod tests { ]; // Filter deleted. let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastRow::new(true)); + let mut reader = DedupReader::new(reader, LastRow::new(true), None); check_reader_result( &mut reader, &[ @@ -684,7 +771,7 @@ mod tests { // Does not filter deleted. let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastRow::new(false)); + let mut reader = DedupReader::new(reader, LastRow::new(false), None); check_reader_result( &mut reader, &[ @@ -801,7 +888,7 @@ mod tests { // Filter deleted. let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); check_reader_result( &mut reader, &[ @@ -835,7 +922,7 @@ mod tests { // Does not filter deleted. let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(false)); + let mut reader = DedupReader::new(reader, LastNonNull::new(false), None); check_reader_result( &mut reader, &[ @@ -885,7 +972,7 @@ mod tests { )]; let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); check_reader_result( &mut reader, &[new_batch_multi_fields( @@ -901,7 +988,7 @@ mod tests { assert_eq!(1, reader.metrics().num_deleted_rows); let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(false)); + let mut reader = DedupReader::new(reader, LastNonNull::new(false), None); check_reader_result(&mut reader, &input).await; assert_eq!(0, reader.metrics().num_unselected_rows); assert_eq!(0, reader.metrics().num_deleted_rows); @@ -928,7 +1015,7 @@ mod tests { ]; let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); check_reader_result( &mut reader, &[ @@ -962,7 +1049,7 @@ mod tests { ]; let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); check_reader_result( &mut reader, &[ diff --git a/src/mito2/src/read/flat_dedup.rs b/src/mito2/src/read/flat_dedup.rs index 62484f9c12..3f8a7ae507 100644 --- a/src/mito2/src/read/flat_dedup.rs +++ b/src/mito2/src/read/flat_dedup.rs @@ -15,9 +15,12 @@ //! Dedup implementation for flat format. use std::ops::Range; +use std::sync::Arc; +use std::time::Instant; use api::v1::OpType; use async_stream::try_stream; +use common_telemetry::debug; use datatypes::arrow::array::{ Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt8Array, UInt64Array, make_comparator, @@ -36,7 +39,8 @@ use snafu::ResultExt; use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result}; use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice; -use crate::read::dedup::DedupMetrics; +use crate::metrics::MERGE_FILTER_ROWS_TOTAL; +use crate::read::dedup::{DedupMetrics, DedupMetricsReport}; use crate::sst::parquet::flat_format::{ op_type_column_index, primary_key_column_index, time_index_column_index, }; @@ -88,15 +92,22 @@ pub struct FlatDedupReader { stream: I, strategy: S, metrics: DedupMetrics, + /// Optional metrics reporter. + metrics_reporter: Option>, } impl FlatDedupReader { - /// Creates a new dedup iterator. - pub fn new(stream: I, strategy: S) -> Self { + /// Creates a new dedup reader. + pub fn new( + stream: I, + strategy: S, + metrics_reporter: Option>, + ) -> Self { Self { stream, strategy, metrics: DedupMetrics::default(), + metrics_reporter, } } } @@ -108,11 +119,14 @@ impl> + Unpin, S: RecordBatchDedupStrategy> async fn fetch_next_batch(&mut self) -> Result> { while let Some(batch) = self.stream.try_next().await? { if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? { + self.metrics.maybe_report(&self.metrics_reporter); return Ok(Some(batch)); } } - self.strategy.finish(&mut self.metrics) + let result = self.strategy.finish(&mut self.metrics)?; + self.metrics.maybe_report(&self.metrics_reporter); + Ok(result) } /// Converts the reader into a stream. @@ -125,6 +139,24 @@ impl> + Unpin, S: RecordBatchDedupStrategy> } } +impl Drop for FlatDedupReader { + fn drop(&mut self) { + debug!("Flat dedup reader finished, metrics: {:?}", self.metrics); + + MERGE_FILTER_ROWS_TOTAL + .with_label_values(&["dedup"]) + .inc_by(self.metrics.num_unselected_rows as u64); + MERGE_FILTER_ROWS_TOTAL + .with_label_values(&["delete"]) + .inc_by(self.metrics.num_deleted_rows as u64); + + // Report any remaining metrics. + if let Some(reporter) = &self.metrics_reporter { + reporter.report(&mut self.metrics); + } + } +} + /// Strategy to remove duplicate rows from sorted record batches. pub trait RecordBatchDedupStrategy: Send { /// Pushes a batch to the dedup strategy. @@ -214,6 +246,8 @@ impl RecordBatchDedupStrategy for FlatLastRow { batch: RecordBatch, metrics: &mut DedupMetrics, ) -> Result> { + let start = Instant::now(); + if batch.num_rows() == 0 { return Ok(None); } @@ -235,6 +269,7 @@ impl RecordBatchDedupStrategy for FlatLastRow { // The batch after dedup is empty. // We don't need to update `prev_batch` because they have the same // key and timestamp. + metrics.dedup_cost += start.elapsed(); return Ok(None); }; @@ -246,7 +281,11 @@ impl RecordBatchDedupStrategy for FlatLastRow { self.prev_batch = Some(batch_last_row); // Filters deleted rows at last. - maybe_filter_deleted(batch, self.filter_deleted, metrics) + let result = maybe_filter_deleted(batch, self.filter_deleted, metrics); + + metrics.dedup_cost += start.elapsed(); + + result } fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result> { @@ -275,6 +314,8 @@ impl RecordBatchDedupStrategy for FlatLastNonNull { batch: RecordBatch, metrics: &mut DedupMetrics, ) -> Result> { + let start = Instant::now(); + if batch.num_rows() == 0 { return Ok(None); } @@ -290,6 +331,7 @@ impl RecordBatchDedupStrategy for FlatLastNonNull { self.buffer = BatchLastRow::try_new(record_batch); self.contains_delete = contains_delete; + metrics.dedup_cost += start.elapsed(); return Ok(None); }; @@ -305,7 +347,9 @@ impl RecordBatchDedupStrategy for FlatLastNonNull { self.buffer = BatchLastRow::try_new(record_batch); self.contains_delete = contains_delete; - return maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics); + let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics); + metrics.dedup_cost += start.elapsed(); + return result; } // The next batch has duplicated rows. @@ -332,6 +376,8 @@ impl RecordBatchDedupStrategy for FlatLastNonNull { self.buffer = BatchLastRow::try_new(record_batch); self.contains_delete = contains_delete; + metrics.dedup_cost += start.elapsed(); + Ok(output) } @@ -340,7 +386,13 @@ impl RecordBatchDedupStrategy for FlatLastNonNull { return Ok(None); }; - maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics) + let start = Instant::now(); + + let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics); + + metrics.dedup_cost += start.elapsed(); + + result } } diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs index 890334f91c..90df227ae9 100644 --- a/src/mito2/src/read/flat_merge.rs +++ b/src/mito2/src/read/flat_merge.rs @@ -15,8 +15,10 @@ use std::cmp::Ordering; use std::collections::BinaryHeap; use std::sync::Arc; +use std::time::Instant; use async_stream::try_stream; +use common_telemetry::debug; use datatypes::arrow::array::{Int64Array, UInt64Array}; use datatypes::arrow::compute::interleave; use datatypes::arrow::datatypes::SchemaRef; @@ -29,7 +31,9 @@ use store_api::storage::SequenceNumber; use crate::error::{ComputeArrowSnafu, Result}; use crate::memtable::BoxedRecordBatchIterator; +use crate::metrics::READ_STAGE_ELAPSED; use crate::read::BoxedRecordBatchStream; +use crate::read::merge::{MergeMetrics, MergeMetricsReport}; use crate::sst::parquet::flat_format::{ primary_key_column_index, sequence_column_index, time_index_column_index, }; @@ -462,12 +466,14 @@ impl FlatMergeIterator { let algo = MergeAlgo::new(nodes); - Ok(Self { + let iter = Self { algo, in_progress, output_batch: None, batch_size, - }) + }; + + Ok(iter) } /// Fetches next sorted batch. @@ -484,12 +490,7 @@ impl FlatMergeIterator { } } - if let Some(batch) = self.output_batch.take() { - Ok(Some(batch)) - } else { - // No more batches. - Ok(None) - } + Ok(self.output_batch.take()) } /// Fetches a batch from the hottest node. @@ -562,6 +563,10 @@ pub struct FlatMergeReader { /// This is not a hard limit, the iterator may return smaller batches to avoid concatenating /// rows. batch_size: usize, + /// Local metrics. + metrics: MergeMetrics, + /// Optional metrics reporter. + metrics_reporter: Option>, } impl FlatMergeReader { @@ -570,7 +575,10 @@ impl FlatMergeReader { schema: SchemaRef, iters: Vec, batch_size: usize, + metrics_reporter: Option>, ) -> Result { + let start = Instant::now(); + let metrics = MergeMetrics::default(); let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size); let mut nodes = Vec::with_capacity(iters.len()); // Initialize nodes and the buffer. @@ -588,16 +596,24 @@ impl FlatMergeReader { let algo = MergeAlgo::new(nodes); - Ok(Self { + let mut reader = Self { algo, in_progress, output_batch: None, batch_size, - }) + metrics, + metrics_reporter, + }; + let elapsed = start.elapsed(); + reader.metrics.init_cost += elapsed; + reader.metrics.scan_cost += elapsed; + + Ok(reader) } /// Fetches next sorted batch. pub async fn next_batch(&mut self) -> Result> { + let start = Instant::now(); while self.algo.has_rows() && self.output_batch.is_none() { if self.algo.can_fetch_batch() && !self.in_progress.is_empty() { // Only one batch in the hot heap, but we have pending rows, output the pending rows first. @@ -605,15 +621,21 @@ impl FlatMergeReader { debug_assert!(self.output_batch.is_some()); } else if self.algo.can_fetch_batch() { self.fetch_batch_from_hottest().await?; + self.metrics.num_fetch_by_batches += 1; } else { self.fetch_row_from_hottest().await?; + self.metrics.num_fetch_by_rows += 1; } } if let Some(batch) = self.output_batch.take() { + self.metrics.scan_cost += start.elapsed(); + self.metrics.maybe_report(&self.metrics_reporter); Ok(Some(batch)) } else { // No more batches. + self.metrics.scan_cost += start.elapsed(); + self.metrics.maybe_report(&self.metrics_reporter); Ok(None) } } @@ -634,7 +656,9 @@ impl FlatMergeReader { // Safety: next_batch() ensures the heap is not empty. let mut hottest = self.algo.pop_hot().unwrap(); debug_assert!(!hottest.current_cursor().is_finished()); + let start = Instant::now(); let next = hottest.advance_batch().await?; + self.metrics.fetch_cost += start.elapsed(); // The node is the heap is not empty, so it must have existing rows in the builder. let batch = self .in_progress @@ -658,8 +682,12 @@ impl FlatMergeReader { } } + let start = Instant::now(); if let Some(next) = hottest.advance_row().await? { + self.metrics.fetch_cost += start.elapsed(); self.in_progress.push_batch(hottest.node_index, next); + } else { + self.metrics.fetch_cost += start.elapsed(); } self.algo.reheap(hottest); @@ -675,6 +703,24 @@ impl FlatMergeReader { } } +impl Drop for FlatMergeReader { + fn drop(&mut self) { + debug!("Flat merge reader finished, metrics: {:?}", self.metrics); + + READ_STAGE_ELAPSED + .with_label_values(&["flat_merge"]) + .observe(self.metrics.scan_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["flat_merge_fetch"]) + .observe(self.metrics.fetch_cost.as_secs_f64()); + + // Report any remaining metrics. + if let Some(reporter) = &self.metrics_reporter { + reporter.report(&mut self.metrics); + } + } +} + /// A sync node in the merge iterator. struct GenericNode { /// Index of the node. diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index f9afbe66fd..0470e4b01a 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -16,8 +16,9 @@ use std::cmp::Ordering; use std::collections::BinaryHeap; -use std::mem; +use std::sync::Arc; use std::time::{Duration, Instant}; +use std::{fmt, mem}; use async_trait::async_trait; use common_telemetry::debug; @@ -27,6 +28,12 @@ use crate::memtable::BoxedBatchIterator; use crate::metrics::READ_STAGE_ELAPSED; use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; +/// Trait for reporting merge metrics. +pub trait MergeMetricsReport: Send + Sync { + /// Reports and resets the metrics. + fn report(&self, metrics: &mut MergeMetrics); +} + /// Reader to merge sorted batches. /// /// The merge reader merges [Batch]es from multiple sources that yield sorted batches. @@ -51,7 +58,9 @@ pub struct MergeReader { /// Batch to output. output_batch: Option, /// Local metrics. - metrics: Metrics, + metrics: MergeMetrics, + /// Optional metrics reporter. + metrics_reporter: Option>, } #[async_trait] @@ -72,11 +81,12 @@ impl BatchReader for MergeReader { if let Some(batch) = self.output_batch.take() { self.metrics.scan_cost += start.elapsed(); - self.metrics.num_output_rows += batch.num_rows(); + self.metrics.maybe_report(&self.metrics_reporter); Ok(Some(batch)) } else { // Nothing fetched. self.metrics.scan_cost += start.elapsed(); + self.metrics.maybe_report(&self.metrics_reporter); Ok(None) } } @@ -92,14 +102,22 @@ impl Drop for MergeReader { READ_STAGE_ELAPSED .with_label_values(&["merge_fetch"]) .observe(self.metrics.fetch_cost.as_secs_f64()); + + // Report any remaining metrics. + if let Some(reporter) = &self.metrics_reporter { + reporter.report(&mut self.metrics); + } } } impl MergeReader { /// Creates and initializes a new [MergeReader]. - pub async fn new(sources: Vec) -> Result { + pub async fn new( + sources: Vec, + metrics_reporter: Option>, + ) -> Result { let start = Instant::now(); - let mut metrics = Metrics::default(); + let mut metrics = MergeMetrics::default(); let mut cold = BinaryHeap::with_capacity(sources.len()); let hot = BinaryHeap::with_capacity(sources.len()); @@ -116,11 +134,14 @@ impl MergeReader { cold, output_batch: None, metrics, + metrics_reporter, }; // Initializes the reader. reader.refill_hot(); - reader.metrics.scan_cost += start.elapsed(); + let elapsed = start.elapsed(); + reader.metrics.init_cost += elapsed; + reader.metrics.scan_cost += elapsed; Ok(reader) } @@ -250,6 +271,8 @@ pub struct MergeReaderBuilder { /// /// All source must yield batches with the same schema. sources: Vec, + /// Optional metrics reporter. + metrics_reporter: Option>, } impl MergeReaderBuilder { @@ -260,7 +283,10 @@ impl MergeReaderBuilder { /// Creates a builder from sources. pub fn from_sources(sources: Vec) -> MergeReaderBuilder { - MergeReaderBuilder { sources } + MergeReaderBuilder { + sources, + metrics_reporter: None, + } } /// Pushes a batch reader to sources. @@ -275,28 +301,94 @@ impl MergeReaderBuilder { self } + /// Sets the metrics reporter. + pub fn with_metrics_reporter( + &mut self, + reporter: Option>, + ) -> &mut Self { + self.metrics_reporter = reporter; + self + } + /// Builds and initializes the reader, then resets the builder. pub async fn build(&mut self) -> Result { let sources = mem::take(&mut self.sources); - MergeReader::new(sources).await + let metrics_reporter = self.metrics_reporter.take(); + MergeReader::new(sources, metrics_reporter).await } } /// Metrics for the merge reader. -#[derive(Debug, Default)] -struct Metrics { +#[derive(Default)] +pub struct MergeMetrics { + /// Cost to initialize the reader. + pub(crate) init_cost: Duration, /// Total scan cost of the reader. - scan_cost: Duration, + pub(crate) scan_cost: Duration, /// Number of times to fetch batches. - num_fetch_by_batches: usize, + pub(crate) num_fetch_by_batches: usize, /// Number of times to fetch rows. - num_fetch_by_rows: usize, - /// Number of input rows. - num_input_rows: usize, - /// Number of output rows. - num_output_rows: usize, + pub(crate) num_fetch_by_rows: usize, /// Cost to fetch batches from sources. - fetch_cost: Duration, + pub(crate) fetch_cost: Duration, +} + +impl fmt::Debug for MergeMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Skip output if scan_cost is zero + if self.scan_cost.is_zero() { + return write!(f, "{{}}"); + } + + write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?; + + if !self.init_cost.is_zero() { + write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?; + } + if self.num_fetch_by_batches > 0 { + write!( + f, + r#", "num_fetch_by_batches":{}"#, + self.num_fetch_by_batches + )?; + } + if self.num_fetch_by_rows > 0 { + write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?; + } + if !self.fetch_cost.is_zero() { + write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?; + } + + write!(f, "}}") + } +} + +impl MergeMetrics { + /// Merges metrics from another MergeMetrics instance. + pub(crate) fn merge(&mut self, other: &MergeMetrics) { + let MergeMetrics { + init_cost, + scan_cost, + num_fetch_by_batches, + num_fetch_by_rows, + fetch_cost, + } = other; + + self.init_cost += *init_cost; + self.scan_cost += *scan_cost; + self.num_fetch_by_batches += *num_fetch_by_batches; + self.num_fetch_by_rows += *num_fetch_by_rows; + self.fetch_cost += *fetch_cost; + } + + /// Reports the metrics if scan_cost exceeds 10ms and resets them. + pub(crate) fn maybe_report(&mut self, reporter: &Option>) { + if self.scan_cost.as_millis() > 10 + && let Some(r) = reporter + { + r.report(self); + } + } } /// A `Node` represent an individual input data source to be merged. @@ -313,12 +405,11 @@ impl Node { /// Initialize a node. /// /// It tries to fetch one batch from the `source`. - async fn new(mut source: Source, metrics: &mut Metrics) -> Result { + async fn new(mut source: Source, metrics: &mut MergeMetrics) -> Result { // Ensures batch is not empty. let start = Instant::now(); let current_batch = source.next_batch().await?.map(CompareFirst); metrics.fetch_cost += start.elapsed(); - metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0); Ok(Node { source, @@ -352,17 +443,12 @@ impl Node { /// /// # Panics /// Panics if the node has reached EOF. - async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result { + async fn fetch_batch(&mut self, metrics: &mut MergeMetrics) -> Result { let current = self.current_batch.take().unwrap(); let start = Instant::now(); // Ensures batch is not empty. self.current_batch = self.source.next_batch().await?.map(CompareFirst); metrics.fetch_cost += start.elapsed(); - metrics.num_input_rows += self - .current_batch - .as_ref() - .map(|b| b.0.num_rows()) - .unwrap_or(0); Ok(current.0) } @@ -390,7 +476,7 @@ impl Node { /// /// # Panics /// Panics if the node is EOF. - async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> Result<()> { + async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut MergeMetrics) -> Result<()> { let batch = self.current_batch(); debug_assert!(batch.num_rows() >= num_to_skip); @@ -547,9 +633,6 @@ mod tests { ], ) .await; - - assert_eq!(8, reader.metrics.num_input_rows); - assert_eq!(8, reader.metrics.num_output_rows); } #[tokio::test] @@ -666,9 +749,6 @@ mod tests { ], ) .await; - - assert_eq!(11, reader.metrics.num_input_rows); - assert_eq!(11, reader.metrics.num_output_rows); } #[tokio::test] diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 5d4769703f..7c69dee845 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -37,6 +37,8 @@ use crate::metrics::{ IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED, }; +use crate::read::dedup::{DedupMetrics, DedupMetricsReport}; +use crate::read::merge::{MergeMetrics, MergeMetricsReport}; use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex}; use crate::read::scan_region::StreamContext; use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; @@ -130,6 +132,11 @@ pub(crate) struct ScanMetricsSet { /// Duration of the series distributor to yield. distributor_yield_cost: Duration, + /// Merge metrics. + merge_metrics: MergeMetrics, + /// Dedup metrics. + dedup_metrics: DedupMetrics, + /// The stream reached EOF stream_eof: bool, @@ -180,6 +187,8 @@ impl fmt::Debug for ScanMetricsSet { num_distributor_batches, distributor_scan_cost, distributor_yield_cost, + merge_metrics, + dedup_metrics, stream_eof, mem_scan_cost, mem_rows, @@ -307,6 +316,16 @@ impl fmt::Debug for ScanMetricsSet { write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?; } + // Write merge metrics if not empty + if !merge_metrics.scan_cost.is_zero() { + write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?; + } + + // Write dedup metrics if not empty + if !dedup_metrics.dedup_cost.is_zero() { + write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?; + } + write!(f, ", \"stream_eof\":{stream_eof}}}") } } @@ -531,6 +550,28 @@ impl PartitionMetricsInner { } } +impl MergeMetricsReport for PartitionMetricsInner { + fn report(&self, metrics: &mut MergeMetrics) { + let mut scan_metrics = self.metrics.lock().unwrap(); + // Merge the metrics into scan_metrics + scan_metrics.merge_metrics.merge(metrics); + + // Reset the input metrics + *metrics = MergeMetrics::default(); + } +} + +impl DedupMetricsReport for PartitionMetricsInner { + fn report(&self, metrics: &mut DedupMetrics) { + let mut scan_metrics = self.metrics.lock().unwrap(); + // Merge the metrics into scan_metrics + scan_metrics.dedup_metrics.merge(metrics); + + // Reset the input metrics + *metrics = DedupMetrics::default(); + } +} + impl Drop for PartitionMetricsInner { fn drop(&mut self) { self.on_finish(false); @@ -703,6 +744,16 @@ impl PartitionMetrics { pub(crate) fn explain_verbose(&self) -> bool { self.0.explain_verbose } + + /// Returns a MergeMetricsReport trait object for reporting merge metrics. + pub(crate) fn merge_metrics_reporter(&self) -> Arc { + self.0.clone() + } + + /// Returns a DedupMetricsReport trait object for reporting dedup metrics. + pub(crate) fn dedup_metrics_reporter(&self) -> Arc { + self.0.clone() + } } impl fmt::Debug for PartitionMetrics { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 8df8d6fb48..41f6dc7772 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -189,7 +189,7 @@ impl SeqScan { partition_ranges.len(), sources.len() ); - Self::build_reader_from_sources(stream_ctx, sources, None).await + Self::build_reader_from_sources(stream_ctx, sources, None, None).await } /// Builds a merge reader that reads all flat ranges. @@ -223,7 +223,7 @@ impl SeqScan { partition_ranges.len(), sources.len() ); - Self::build_flat_reader_from_sources(stream_ctx, sources, None).await + Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await } /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel @@ -233,6 +233,7 @@ impl SeqScan { stream_ctx: &StreamContext, mut sources: Vec, semaphore: Option>, + part_metrics: Option<&PartitionMetrics>, ) -> Result { if let Some(semaphore) = semaphore.as_ref() { // Read sources in parallel. @@ -244,18 +245,24 @@ impl SeqScan { } let mut builder = MergeReaderBuilder::from_sources(sources); + if let Some(metrics) = part_metrics { + builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter())); + } let reader = builder.build().await?; let dedup = !stream_ctx.input.append_mode; + let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter()); let reader = if dedup { match stream_ctx.input.merge_mode { MergeMode::LastRow => Box::new(DedupReader::new( reader, LastRow::new(stream_ctx.input.filter_deleted), + dedup_metrics_reporter, )) as _, MergeMode::LastNonNull => Box::new(DedupReader::new( reader, LastNonNull::new(stream_ctx.input.filter_deleted), + dedup_metrics_reporter, )) as _, } } else { @@ -277,6 +284,7 @@ impl SeqScan { stream_ctx: &StreamContext, mut sources: Vec, semaphore: Option>, + part_metrics: Option<&PartitionMetrics>, ) -> Result { if let Some(semaphore) = semaphore.as_ref() { // Read sources in parallel. @@ -290,15 +298,20 @@ impl SeqScan { let mapper = stream_ctx.input.mapper.as_flat().unwrap(); let schema = mapper.input_arrow_schema(stream_ctx.input.compaction); - let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?; + let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter()); + let reader = + FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter) + .await?; let dedup = !stream_ctx.input.append_mode; + let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter()); let reader = if dedup { match stream_ctx.input.merge_mode { MergeMode::LastRow => Box::pin( FlatDedupReader::new( reader.into_stream().boxed(), FlatLastRow::new(stream_ctx.input.filter_deleted), + dedup_metrics_reporter, ) .into_stream(), ) as _, @@ -309,6 +322,7 @@ impl SeqScan { mapper.field_column_start(), stream_ctx.input.filter_deleted, ), + dedup_metrics_reporter, ) .into_stream(), ) as _, @@ -409,7 +423,7 @@ impl SeqScan { let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); let mut reader = - Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone()) + Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics)) .await?; #[cfg(debug_assertions)] let mut checker = crate::read::BatchChecker::default() @@ -505,7 +519,7 @@ impl SeqScan { let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); let mut reader = - Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone()) + Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics)) .await?; while let Some(record_batch) = reader.try_next().await? { diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index ecb40d438b..c485348806 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -438,6 +438,7 @@ impl SeriesDistributor { &self.stream_ctx, sources, self.semaphore.clone(), + Some(&part_metrics), ) .await?; let mut metrics = SeriesDistributorMetrics::default(); @@ -519,9 +520,13 @@ impl SeriesDistributor { } // Builds a reader that merge sources from all parts. - let mut reader = - SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone()) - .await?; + let mut reader = SeqScan::build_reader_from_sources( + &self.stream_ctx, + sources, + self.semaphore.clone(), + Some(&part_metrics), + ) + .await?; let mut metrics = SeriesDistributorMetrics::default(); let mut fetch_start = Instant::now();