feat: collect merge and dedup metrics (#7375)

* feat: collect FlatMergeReader metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add MergeMetricsReporter, rename Metrics to MergeMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: remove num_input_rows from MergeMetrics

The merge reader won't dedup so there is no need to collect input rows

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: report merge metrics to PartitionMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add dedup cost to DedupMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect dedup metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove metrics from FlatMergeIterator

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: remove num_output_rows from MergeMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: implement merge() for merge and dedup metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: report metrics after observe metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-12-10 17:16:20 +08:00
committed by GitHub
parent 6817a376b5
commit a22d08f1b1
10 changed files with 415 additions and 77 deletions

View File

@@ -144,6 +144,7 @@ async fn flush(mem: &SimpleBulkMemtable) {
let reader = Box::new(DedupReader::new(
merge_reader,
read::dedup::LastRow::new(true),
None,
));
Source::Reader(reader)
};

View File

@@ -730,11 +730,13 @@ async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) ->
// dedup according to merge mode
match options.merge_mode.unwrap_or(MergeMode::LastRow) {
MergeMode::LastRow => {
Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
}
MergeMode::LastNonNull => {
Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _
}
MergeMode::LastNonNull => Box::new(DedupReader::new(
merge_reader,
LastNonNull::new(false),
None,
)) as _,
}
};
Source::Reader(maybe_dedup)

View File

@@ -627,7 +627,7 @@ mod tests {
.await
.unwrap();
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
let mut num_rows = 0;
while let Some(b) = reader.next_batch().await.unwrap() {
num_rows += b.num_rows();
@@ -659,7 +659,7 @@ mod tests {
.await
.unwrap();
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
let mut num_rows = 0;
while let Some(b) = reader.next_batch().await.unwrap() {
num_rows += b.num_rows();

View File

@@ -14,6 +14,10 @@
//! Utilities to remove duplicate rows from a sorted batch.
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use api::v1::OpType;
use async_trait::async_trait;
use common_telemetry::debug;
@@ -27,21 +31,34 @@ use crate::error::Result;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::{Batch, BatchColumn, BatchReader};
/// Trait for reporting dedup metrics.
pub trait DedupMetricsReport: Send + Sync {
/// Reports and resets the metrics.
fn report(&self, metrics: &mut DedupMetrics);
}
/// A reader that dedup sorted batches from a source based on the
/// dedup strategy.
pub struct DedupReader<R, S> {
source: R,
strategy: S,
metrics: DedupMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
}
impl<R, S> DedupReader<R, S> {
/// Creates a new dedup reader.
pub fn new(source: R, strategy: S) -> Self {
pub fn new(
source: R,
strategy: S,
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
) -> Self {
Self {
source,
strategy,
metrics: DedupMetrics::default(),
metrics_reporter,
}
}
}
@@ -51,11 +68,14 @@ impl<R: BatchReader, S: DedupStrategy> DedupReader<R, S> {
async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.source.next_batch().await? {
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
self.metrics.maybe_report(&self.metrics_reporter);
return Ok(Some(batch));
}
}
self.strategy.finish(&mut self.metrics)
let result = self.strategy.finish(&mut self.metrics)?;
self.metrics.maybe_report(&self.metrics_reporter);
Ok(result)
}
}
@@ -76,6 +96,11 @@ impl<R, S> Drop for DedupReader<R, S> {
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_unselected_rows as u64);
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
@@ -138,6 +163,8 @@ impl DedupStrategy for LastRow {
mut batch: Batch,
metrics: &mut DedupMetrics,
) -> Result<Option<Batch>> {
let start = Instant::now();
if batch.is_empty() {
return Ok(None);
}
@@ -160,6 +187,7 @@ impl DedupStrategy for LastRow {
if batch.num_rows() == 1 {
// We don't need to update `prev_batch` because they have the same
// key and timestamp.
metrics.dedup_cost += start.elapsed();
return Ok(None);
}
// Skips the first row.
@@ -189,6 +217,8 @@ impl DedupStrategy for LastRow {
filter_deleted_from_batch(&mut batch, metrics)?;
}
metrics.dedup_cost += start.elapsed();
// The batch can become empty if all rows are deleted.
if batch.is_empty() {
Ok(None)
@@ -215,12 +245,58 @@ fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> R
}
/// Metrics for deduplication.
#[derive(Debug, Default)]
#[derive(Default)]
pub struct DedupMetrics {
/// Number of rows removed during deduplication.
pub(crate) num_unselected_rows: usize,
/// Number of deleted rows.
pub(crate) num_deleted_rows: usize,
/// Time spent on deduplication.
pub(crate) dedup_cost: Duration,
}
impl fmt::Debug for DedupMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Skip output if dedup_cost is zero
if self.dedup_cost.is_zero() {
return write!(f, "{{}}");
}
write!(f, r#"{{"dedup_cost":"{:?}""#, self.dedup_cost)?;
if self.num_unselected_rows > 0 {
write!(f, r#", "num_unselected_rows":{}"#, self.num_unselected_rows)?;
}
if self.num_deleted_rows > 0 {
write!(f, r#", "num_deleted_rows":{}"#, self.num_deleted_rows)?;
}
write!(f, "}}")
}
}
impl DedupMetrics {
/// Merges metrics from another DedupMetrics instance.
pub(crate) fn merge(&mut self, other: &DedupMetrics) {
let DedupMetrics {
num_unselected_rows,
num_deleted_rows,
dedup_cost,
} = other;
self.num_unselected_rows += *num_unselected_rows;
self.num_deleted_rows += *num_deleted_rows;
self.dedup_cost += *dedup_cost;
}
/// Reports the metrics if dedup_cost exceeds 10ms and resets them.
pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn DedupMetricsReport>>) {
if self.dedup_cost.as_millis() > 10
&& let Some(r) = reporter
{
r.report(self);
}
}
}
/// Buffer to store fields in the last row to merge.
@@ -427,6 +503,8 @@ impl LastNonNull {
impl DedupStrategy for LastNonNull {
fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
let start = Instant::now();
if batch.is_empty() {
return Ok(None);
}
@@ -444,6 +522,7 @@ impl DedupStrategy for LastNonNull {
// Next key is different.
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
metrics.dedup_cost += start.elapsed();
return Ok(merged);
}
@@ -451,6 +530,7 @@ impl DedupStrategy for LastNonNull {
// The next batch has a different timestamp.
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
metrics.dedup_cost += start.elapsed();
return Ok(merged);
}
@@ -460,6 +540,7 @@ impl DedupStrategy for LastNonNull {
// We assumes each batch doesn't contain duplicate rows so we only need to check the first row.
if batch.num_rows() == 1 {
self.last_fields.push_first_row(&batch);
metrics.dedup_cost += start.elapsed();
return Ok(None);
}
@@ -472,10 +553,14 @@ impl DedupStrategy for LastNonNull {
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
metrics.dedup_cost += start.elapsed();
Ok(merged)
}
fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
let start = Instant::now();
let Some(buffer) = self.buffer.take() else {
return Ok(None);
};
@@ -485,6 +570,8 @@ impl DedupStrategy for LastNonNull {
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
metrics.dedup_cost += start.elapsed();
Ok(merged)
}
}
@@ -614,14 +701,14 @@ mod tests {
// Test last row.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true));
let mut reader = DedupReader::new(reader, LastRow::new(true), None);
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
// Test last non-null.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
@@ -662,7 +749,7 @@ mod tests {
];
// Filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true));
let mut reader = DedupReader::new(reader, LastRow::new(true), None);
check_reader_result(
&mut reader,
&[
@@ -684,7 +771,7 @@ mod tests {
// Does not filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(false));
let mut reader = DedupReader::new(reader, LastRow::new(false), None);
check_reader_result(
&mut reader,
&[
@@ -801,7 +888,7 @@ mod tests {
// Filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[
@@ -835,7 +922,7 @@ mod tests {
// Does not filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(false));
let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
check_reader_result(
&mut reader,
&[
@@ -885,7 +972,7 @@ mod tests {
)];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[new_batch_multi_fields(
@@ -901,7 +988,7 @@ mod tests {
assert_eq!(1, reader.metrics().num_deleted_rows);
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(false));
let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
@@ -928,7 +1015,7 @@ mod tests {
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[
@@ -962,7 +1049,7 @@ mod tests {
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[

View File

@@ -15,9 +15,12 @@
//! Dedup implementation for flat format.
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use api::v1::OpType;
use async_stream::try_stream;
use common_telemetry::debug;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt8Array, UInt64Array,
make_comparator,
@@ -36,7 +39,8 @@ use snafu::ResultExt;
use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result};
use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
use crate::read::dedup::DedupMetrics;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
use crate::sst::parquet::flat_format::{
op_type_column_index, primary_key_column_index, time_index_column_index,
};
@@ -88,15 +92,22 @@ pub struct FlatDedupReader<I, S> {
stream: I,
strategy: S,
metrics: DedupMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
}
impl<I, S> FlatDedupReader<I, S> {
/// Creates a new dedup iterator.
pub fn new(stream: I, strategy: S) -> Self {
/// Creates a new dedup reader.
pub fn new(
stream: I,
strategy: S,
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
) -> Self {
Self {
stream,
strategy,
metrics: DedupMetrics::default(),
metrics_reporter,
}
}
}
@@ -108,11 +119,14 @@ impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy>
async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
while let Some(batch) = self.stream.try_next().await? {
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
self.metrics.maybe_report(&self.metrics_reporter);
return Ok(Some(batch));
}
}
self.strategy.finish(&mut self.metrics)
let result = self.strategy.finish(&mut self.metrics)?;
self.metrics.maybe_report(&self.metrics_reporter);
Ok(result)
}
/// Converts the reader into a stream.
@@ -125,6 +139,24 @@ impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy>
}
}
impl<I, S> Drop for FlatDedupReader<I, S> {
fn drop(&mut self) {
debug!("Flat dedup reader finished, metrics: {:?}", self.metrics);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["dedup"])
.inc_by(self.metrics.num_unselected_rows as u64);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_deleted_rows as u64);
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
/// Strategy to remove duplicate rows from sorted record batches.
pub trait RecordBatchDedupStrategy: Send {
/// Pushes a batch to the dedup strategy.
@@ -214,6 +246,8 @@ impl RecordBatchDedupStrategy for FlatLastRow {
batch: RecordBatch,
metrics: &mut DedupMetrics,
) -> Result<Option<RecordBatch>> {
let start = Instant::now();
if batch.num_rows() == 0 {
return Ok(None);
}
@@ -235,6 +269,7 @@ impl RecordBatchDedupStrategy for FlatLastRow {
// The batch after dedup is empty.
// We don't need to update `prev_batch` because they have the same
// key and timestamp.
metrics.dedup_cost += start.elapsed();
return Ok(None);
};
@@ -246,7 +281,11 @@ impl RecordBatchDedupStrategy for FlatLastRow {
self.prev_batch = Some(batch_last_row);
// Filters deleted rows at last.
maybe_filter_deleted(batch, self.filter_deleted, metrics)
let result = maybe_filter_deleted(batch, self.filter_deleted, metrics);
metrics.dedup_cost += start.elapsed();
result
}
fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
@@ -275,6 +314,8 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
batch: RecordBatch,
metrics: &mut DedupMetrics,
) -> Result<Option<RecordBatch>> {
let start = Instant::now();
if batch.num_rows() == 0 {
return Ok(None);
}
@@ -290,6 +331,7 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
self.buffer = BatchLastRow::try_new(record_batch);
self.contains_delete = contains_delete;
metrics.dedup_cost += start.elapsed();
return Ok(None);
};
@@ -305,7 +347,9 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
self.buffer = BatchLastRow::try_new(record_batch);
self.contains_delete = contains_delete;
return maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
metrics.dedup_cost += start.elapsed();
return result;
}
// The next batch has duplicated rows.
@@ -332,6 +376,8 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
self.buffer = BatchLastRow::try_new(record_batch);
self.contains_delete = contains_delete;
metrics.dedup_cost += start.elapsed();
Ok(output)
}
@@ -340,7 +386,13 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
return Ok(None);
};
maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics)
let start = Instant::now();
let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
metrics.dedup_cost += start.elapsed();
result
}
}

View File

@@ -15,8 +15,10 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::time::Instant;
use async_stream::try_stream;
use common_telemetry::debug;
use datatypes::arrow::array::{Int64Array, UInt64Array};
use datatypes::arrow::compute::interleave;
use datatypes::arrow::datatypes::SchemaRef;
@@ -29,7 +31,9 @@ use store_api::storage::SequenceNumber;
use crate::error::{ComputeArrowSnafu, Result};
use crate::memtable::BoxedRecordBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::BoxedRecordBatchStream;
use crate::read::merge::{MergeMetrics, MergeMetricsReport};
use crate::sst::parquet::flat_format::{
primary_key_column_index, sequence_column_index, time_index_column_index,
};
@@ -462,12 +466,14 @@ impl FlatMergeIterator {
let algo = MergeAlgo::new(nodes);
Ok(Self {
let iter = Self {
algo,
in_progress,
output_batch: None,
batch_size,
})
};
Ok(iter)
}
/// Fetches next sorted batch.
@@ -484,12 +490,7 @@ impl FlatMergeIterator {
}
}
if let Some(batch) = self.output_batch.take() {
Ok(Some(batch))
} else {
// No more batches.
Ok(None)
}
Ok(self.output_batch.take())
}
/// Fetches a batch from the hottest node.
@@ -562,6 +563,10 @@ pub struct FlatMergeReader {
/// This is not a hard limit, the iterator may return smaller batches to avoid concatenating
/// rows.
batch_size: usize,
/// Local metrics.
metrics: MergeMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
}
impl FlatMergeReader {
@@ -570,7 +575,10 @@ impl FlatMergeReader {
schema: SchemaRef,
iters: Vec<BoxedRecordBatchStream>,
batch_size: usize,
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
) -> Result<Self> {
let start = Instant::now();
let metrics = MergeMetrics::default();
let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
let mut nodes = Vec::with_capacity(iters.len());
// Initialize nodes and the buffer.
@@ -588,16 +596,24 @@ impl FlatMergeReader {
let algo = MergeAlgo::new(nodes);
Ok(Self {
let mut reader = Self {
algo,
in_progress,
output_batch: None,
batch_size,
})
metrics,
metrics_reporter,
};
let elapsed = start.elapsed();
reader.metrics.init_cost += elapsed;
reader.metrics.scan_cost += elapsed;
Ok(reader)
}
/// Fetches next sorted batch.
pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
let start = Instant::now();
while self.algo.has_rows() && self.output_batch.is_none() {
if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
// Only one batch in the hot heap, but we have pending rows, output the pending rows first.
@@ -605,15 +621,21 @@ impl FlatMergeReader {
debug_assert!(self.output_batch.is_some());
} else if self.algo.can_fetch_batch() {
self.fetch_batch_from_hottest().await?;
self.metrics.num_fetch_by_batches += 1;
} else {
self.fetch_row_from_hottest().await?;
self.metrics.num_fetch_by_rows += 1;
}
}
if let Some(batch) = self.output_batch.take() {
self.metrics.scan_cost += start.elapsed();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(Some(batch))
} else {
// No more batches.
self.metrics.scan_cost += start.elapsed();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(None)
}
}
@@ -634,7 +656,9 @@ impl FlatMergeReader {
// Safety: next_batch() ensures the heap is not empty.
let mut hottest = self.algo.pop_hot().unwrap();
debug_assert!(!hottest.current_cursor().is_finished());
let start = Instant::now();
let next = hottest.advance_batch().await?;
self.metrics.fetch_cost += start.elapsed();
// The node is the heap is not empty, so it must have existing rows in the builder.
let batch = self
.in_progress
@@ -658,8 +682,12 @@ impl FlatMergeReader {
}
}
let start = Instant::now();
if let Some(next) = hottest.advance_row().await? {
self.metrics.fetch_cost += start.elapsed();
self.in_progress.push_batch(hottest.node_index, next);
} else {
self.metrics.fetch_cost += start.elapsed();
}
self.algo.reheap(hottest);
@@ -675,6 +703,24 @@ impl FlatMergeReader {
}
}
impl Drop for FlatMergeReader {
fn drop(&mut self) {
debug!("Flat merge reader finished, metrics: {:?}", self.metrics);
READ_STAGE_ELAPSED
.with_label_values(&["flat_merge"])
.observe(self.metrics.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["flat_merge_fetch"])
.observe(self.metrics.fetch_cost.as_secs_f64());
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
/// A sync node in the merge iterator.
struct GenericNode<T> {
/// Index of the node.

View File

@@ -16,8 +16,9 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{fmt, mem};
use async_trait::async_trait;
use common_telemetry::debug;
@@ -27,6 +28,12 @@ use crate::memtable::BoxedBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
/// Trait for reporting merge metrics.
pub trait MergeMetricsReport: Send + Sync {
/// Reports and resets the metrics.
fn report(&self, metrics: &mut MergeMetrics);
}
/// Reader to merge sorted batches.
///
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
@@ -51,7 +58,9 @@ pub struct MergeReader {
/// Batch to output.
output_batch: Option<Batch>,
/// Local metrics.
metrics: Metrics,
metrics: MergeMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
}
#[async_trait]
@@ -72,11 +81,12 @@ impl BatchReader for MergeReader {
if let Some(batch) = self.output_batch.take() {
self.metrics.scan_cost += start.elapsed();
self.metrics.num_output_rows += batch.num_rows();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(Some(batch))
} else {
// Nothing fetched.
self.metrics.scan_cost += start.elapsed();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(None)
}
}
@@ -92,14 +102,22 @@ impl Drop for MergeReader {
READ_STAGE_ELAPSED
.with_label_values(&["merge_fetch"])
.observe(self.metrics.fetch_cost.as_secs_f64());
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
impl MergeReader {
/// Creates and initializes a new [MergeReader].
pub async fn new(sources: Vec<Source>) -> Result<MergeReader> {
pub async fn new(
sources: Vec<Source>,
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
) -> Result<MergeReader> {
let start = Instant::now();
let mut metrics = Metrics::default();
let mut metrics = MergeMetrics::default();
let mut cold = BinaryHeap::with_capacity(sources.len());
let hot = BinaryHeap::with_capacity(sources.len());
@@ -116,11 +134,14 @@ impl MergeReader {
cold,
output_batch: None,
metrics,
metrics_reporter,
};
// Initializes the reader.
reader.refill_hot();
reader.metrics.scan_cost += start.elapsed();
let elapsed = start.elapsed();
reader.metrics.init_cost += elapsed;
reader.metrics.scan_cost += elapsed;
Ok(reader)
}
@@ -250,6 +271,8 @@ pub struct MergeReaderBuilder {
///
/// All source must yield batches with the same schema.
sources: Vec<Source>,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
}
impl MergeReaderBuilder {
@@ -260,7 +283,10 @@ impl MergeReaderBuilder {
/// Creates a builder from sources.
pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
MergeReaderBuilder { sources }
MergeReaderBuilder {
sources,
metrics_reporter: None,
}
}
/// Pushes a batch reader to sources.
@@ -275,28 +301,94 @@ impl MergeReaderBuilder {
self
}
/// Sets the metrics reporter.
pub fn with_metrics_reporter(
&mut self,
reporter: Option<Arc<dyn MergeMetricsReport>>,
) -> &mut Self {
self.metrics_reporter = reporter;
self
}
/// Builds and initializes the reader, then resets the builder.
pub async fn build(&mut self) -> Result<MergeReader> {
let sources = mem::take(&mut self.sources);
MergeReader::new(sources).await
let metrics_reporter = self.metrics_reporter.take();
MergeReader::new(sources, metrics_reporter).await
}
}
/// Metrics for the merge reader.
#[derive(Debug, Default)]
struct Metrics {
#[derive(Default)]
pub struct MergeMetrics {
/// Cost to initialize the reader.
pub(crate) init_cost: Duration,
/// Total scan cost of the reader.
scan_cost: Duration,
pub(crate) scan_cost: Duration,
/// Number of times to fetch batches.
num_fetch_by_batches: usize,
pub(crate) num_fetch_by_batches: usize,
/// Number of times to fetch rows.
num_fetch_by_rows: usize,
/// Number of input rows.
num_input_rows: usize,
/// Number of output rows.
num_output_rows: usize,
pub(crate) num_fetch_by_rows: usize,
/// Cost to fetch batches from sources.
fetch_cost: Duration,
pub(crate) fetch_cost: Duration,
}
impl fmt::Debug for MergeMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Skip output if scan_cost is zero
if self.scan_cost.is_zero() {
return write!(f, "{{}}");
}
write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?;
if !self.init_cost.is_zero() {
write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?;
}
if self.num_fetch_by_batches > 0 {
write!(
f,
r#", "num_fetch_by_batches":{}"#,
self.num_fetch_by_batches
)?;
}
if self.num_fetch_by_rows > 0 {
write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?;
}
if !self.fetch_cost.is_zero() {
write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?;
}
write!(f, "}}")
}
}
impl MergeMetrics {
/// Merges metrics from another MergeMetrics instance.
pub(crate) fn merge(&mut self, other: &MergeMetrics) {
let MergeMetrics {
init_cost,
scan_cost,
num_fetch_by_batches,
num_fetch_by_rows,
fetch_cost,
} = other;
self.init_cost += *init_cost;
self.scan_cost += *scan_cost;
self.num_fetch_by_batches += *num_fetch_by_batches;
self.num_fetch_by_rows += *num_fetch_by_rows;
self.fetch_cost += *fetch_cost;
}
/// Reports the metrics if scan_cost exceeds 10ms and resets them.
pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn MergeMetricsReport>>) {
if self.scan_cost.as_millis() > 10
&& let Some(r) = reporter
{
r.report(self);
}
}
}
/// A `Node` represent an individual input data source to be merged.
@@ -313,12 +405,11 @@ impl Node {
/// Initialize a node.
///
/// It tries to fetch one batch from the `source`.
async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
async fn new(mut source: Source, metrics: &mut MergeMetrics) -> Result<Node> {
// Ensures batch is not empty.
let start = Instant::now();
let current_batch = source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0);
Ok(Node {
source,
@@ -352,17 +443,12 @@ impl Node {
///
/// # Panics
/// Panics if the node has reached EOF.
async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result<Batch> {
async fn fetch_batch(&mut self, metrics: &mut MergeMetrics) -> Result<Batch> {
let current = self.current_batch.take().unwrap();
let start = Instant::now();
// Ensures batch is not empty.
self.current_batch = self.source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += self
.current_batch
.as_ref()
.map(|b| b.0.num_rows())
.unwrap_or(0);
Ok(current.0)
}
@@ -390,7 +476,7 @@ impl Node {
///
/// # Panics
/// Panics if the node is EOF.
async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> Result<()> {
async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut MergeMetrics) -> Result<()> {
let batch = self.current_batch();
debug_assert!(batch.num_rows() >= num_to_skip);
@@ -547,9 +633,6 @@ mod tests {
],
)
.await;
assert_eq!(8, reader.metrics.num_input_rows);
assert_eq!(8, reader.metrics.num_output_rows);
}
#[tokio::test]
@@ -666,9 +749,6 @@ mod tests {
],
)
.await;
assert_eq!(11, reader.metrics.num_input_rows);
assert_eq!(11, reader.metrics.num_output_rows);
}
#[tokio::test]

View File

@@ -37,6 +37,8 @@ use crate::metrics::{
IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
};
use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
use crate::read::merge::{MergeMetrics, MergeMetricsReport};
use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
@@ -130,6 +132,11 @@ pub(crate) struct ScanMetricsSet {
/// Duration of the series distributor to yield.
distributor_yield_cost: Duration,
/// Merge metrics.
merge_metrics: MergeMetrics,
/// Dedup metrics.
dedup_metrics: DedupMetrics,
/// The stream reached EOF
stream_eof: bool,
@@ -180,6 +187,8 @@ impl fmt::Debug for ScanMetricsSet {
num_distributor_batches,
distributor_scan_cost,
distributor_yield_cost,
merge_metrics,
dedup_metrics,
stream_eof,
mem_scan_cost,
mem_rows,
@@ -307,6 +316,16 @@ impl fmt::Debug for ScanMetricsSet {
write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
}
// Write merge metrics if not empty
if !merge_metrics.scan_cost.is_zero() {
write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
}
// Write dedup metrics if not empty
if !dedup_metrics.dedup_cost.is_zero() {
write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
}
write!(f, ", \"stream_eof\":{stream_eof}}}")
}
}
@@ -531,6 +550,28 @@ impl PartitionMetricsInner {
}
}
impl MergeMetricsReport for PartitionMetricsInner {
fn report(&self, metrics: &mut MergeMetrics) {
let mut scan_metrics = self.metrics.lock().unwrap();
// Merge the metrics into scan_metrics
scan_metrics.merge_metrics.merge(metrics);
// Reset the input metrics
*metrics = MergeMetrics::default();
}
}
impl DedupMetricsReport for PartitionMetricsInner {
fn report(&self, metrics: &mut DedupMetrics) {
let mut scan_metrics = self.metrics.lock().unwrap();
// Merge the metrics into scan_metrics
scan_metrics.dedup_metrics.merge(metrics);
// Reset the input metrics
*metrics = DedupMetrics::default();
}
}
impl Drop for PartitionMetricsInner {
fn drop(&mut self) {
self.on_finish(false);
@@ -703,6 +744,16 @@ impl PartitionMetrics {
pub(crate) fn explain_verbose(&self) -> bool {
self.0.explain_verbose
}
/// Returns a MergeMetricsReport trait object for reporting merge metrics.
pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
self.0.clone()
}
/// Returns a DedupMetricsReport trait object for reporting dedup metrics.
pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
self.0.clone()
}
}
impl fmt::Debug for PartitionMetrics {

View File

@@ -189,7 +189,7 @@ impl SeqScan {
partition_ranges.len(),
sources.len()
);
Self::build_reader_from_sources(stream_ctx, sources, None).await
Self::build_reader_from_sources(stream_ctx, sources, None, None).await
}
/// Builds a merge reader that reads all flat ranges.
@@ -223,7 +223,7 @@ impl SeqScan {
partition_ranges.len(),
sources.len()
);
Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
}
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
@@ -233,6 +233,7 @@ impl SeqScan {
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Option<Arc<Semaphore>>,
part_metrics: Option<&PartitionMetrics>,
) -> Result<BoxedBatchReader> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
@@ -244,18 +245,24 @@ impl SeqScan {
}
let mut builder = MergeReaderBuilder::from_sources(sources);
if let Some(metrics) = part_metrics {
builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
}
let reader = builder.build().await?;
let dedup = !stream_ctx.input.append_mode;
let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
let reader = if dedup {
match stream_ctx.input.merge_mode {
MergeMode::LastRow => Box::new(DedupReader::new(
reader,
LastRow::new(stream_ctx.input.filter_deleted),
dedup_metrics_reporter,
)) as _,
MergeMode::LastNonNull => Box::new(DedupReader::new(
reader,
LastNonNull::new(stream_ctx.input.filter_deleted),
dedup_metrics_reporter,
)) as _,
}
} else {
@@ -277,6 +284,7 @@ impl SeqScan {
stream_ctx: &StreamContext,
mut sources: Vec<BoxedRecordBatchStream>,
semaphore: Option<Arc<Semaphore>>,
part_metrics: Option<&PartitionMetrics>,
) -> Result<BoxedRecordBatchStream> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
@@ -290,15 +298,20 @@ impl SeqScan {
let mapper = stream_ctx.input.mapper.as_flat().unwrap();
let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
let reader =
FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
.await?;
let dedup = !stream_ctx.input.append_mode;
let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
let reader = if dedup {
match stream_ctx.input.merge_mode {
MergeMode::LastRow => Box::pin(
FlatDedupReader::new(
reader.into_stream().boxed(),
FlatLastRow::new(stream_ctx.input.filter_deleted),
dedup_metrics_reporter,
)
.into_stream(),
) as _,
@@ -309,6 +322,7 @@ impl SeqScan {
mapper.field_column_start(),
stream_ctx.input.filter_deleted,
),
dedup_metrics_reporter,
)
.into_stream(),
) as _,
@@ -409,7 +423,7 @@ impl SeqScan {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
.await?;
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default()
@@ -505,7 +519,7 @@ impl SeqScan {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let mut reader =
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
.await?;
while let Some(record_batch) = reader.try_next().await? {

View File

@@ -438,6 +438,7 @@ impl SeriesDistributor {
&self.stream_ctx,
sources,
self.semaphore.clone(),
Some(&part_metrics),
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();
@@ -519,9 +520,13 @@ impl SeriesDistributor {
}
// Builds a reader that merge sources from all parts.
let mut reader =
SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
.await?;
let mut reader = SeqScan::build_reader_from_sources(
&self.stream_ctx,
sources,
self.semaphore.clone(),
Some(&part_metrics),
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();
let mut fetch_start = Instant::now();