From 1167c36c4e3524d9d7d43d4fef89e77acb1aea3d Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 24 Apr 2026 15:54:10 +0800 Subject: [PATCH] feat: add seq watermark in record batch metrics (#8015) * feat: add seq watermark in record batch metrics Signed-off-by: discord9 * per review Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/common/recordbatch/src/adapter.rs | 71 ++++++ src/query/src/dist_plan/merge_scan.rs | 4 + src/query/src/metrics.rs | 313 +++++++++++++++++++++++++- 3 files changed, 387 insertions(+), 1 deletion(-) diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index fc12d87dcf..a69eedf7eb 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -446,6 +446,27 @@ pub struct RecordBatchMetrics { // Detailed per-plan metrics /// An ordered list of plan metrics, from top to bottom in post-order. pub plan_metrics: Vec, + /// Per-region watermark for incremental-read checkpoint advancement. + /// + /// The watermark is the latest sequence (`seq`) this query round safely read + /// for each participating region. Flow uses it to decide where the next + /// incremental round can resume. + /// + /// - `Some(seq)`: the query proved it safely read up to `seq`; downstream + /// may advance the checkpoint to this value. + /// - `None`: the region participated but the query could not prove a safe + /// read upper-bound, so the checkpoint must not advance for this region. + /// + /// Omitted when empty for backward compatibility. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub region_watermarks: Vec, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct RegionWatermarkEntry { + pub region_id: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub watermark: Option, } /// Determines if a metric name represents a time measurement that should be formatted. @@ -706,6 +727,7 @@ mod test { use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::vectors::Int32Vector; + use serde_json::json; use snafu::IntoError; use super::*; @@ -879,4 +901,53 @@ mod test { } } } + + #[test] + fn test_recordbatch_metrics_deserializes_without_region_watermarks() { + let metrics: RecordBatchMetrics = serde_json::from_value(json!({ + "elapsed_compute": 12, + "memory_usage": 34, + "plan_metrics": [] + })) + .unwrap(); + + assert!(metrics.region_watermarks.is_empty()); + assert_eq!(metrics.elapsed_compute, 12); + assert_eq!(metrics.memory_usage, 34); + } + + #[test] + fn test_recordbatch_metrics_region_watermarks_serde_roundtrip() { + let metrics = RecordBatchMetrics { + region_watermarks: vec![ + RegionWatermarkEntry { + region_id: 1, + watermark: Some(100), + }, + RegionWatermarkEntry { + region_id: 2, + watermark: None, + }, + ], + ..Default::default() + }; + + let value = serde_json::to_value(&metrics).unwrap(); + assert_eq!( + value.get("region_watermarks").unwrap(), + &json!([ + { "region_id": 1, "watermark": 100 }, + { "region_id": 2 } + ]) + ); + + let decoded: RecordBatchMetrics = serde_json::from_value(value).unwrap(); + assert_eq!(decoded.region_watermarks, metrics.region_watermarks); + } + + #[test] + fn test_recordbatch_metrics_skips_empty_region_watermarks_on_serialize() { + let value = serde_json::to_value(RecordBatchMetrics::default()).unwrap(); + assert!(value.get("region_watermarks").is_none()); + } } diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 3c326cf390..470b4d325f 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -477,6 +477,10 @@ impl MergeScanExec { .collect() } + pub fn regions(&self) -> &[RegionId] { + &self.regions + } + pub fn partition_count(&self) -> usize { self.target_partition } diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index e0d02e9a3d..9a376d748c 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -12,17 +12,42 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; -use common_recordbatch::adapter::RecordBatchMetrics; +use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_telemetry::warn; +use datafusion::physical_plan::ExecutionPlan; use datatypes::schema::SchemaRef; use futures::Stream; use futures_util::ready; use lazy_static::lazy_static; use prometheus::*; +use crate::dist_plan::MergeScanExec; + +/// Intermediate merge state for one participating region while collecting +/// terminal correctness watermarks across merge-scan sub-stages. +enum MergeState { + /// The region participated, but no explicit watermark result has been seen + /// yet for this merge. + Participated, + /// At least one branch reported that this region cannot prove a safe + /// checkpoint watermark for the current query round. + Unproved, + /// All seen branches agree the region can advance safely to this sequence. + Proved(u64), + /// Different proved sequences were reported for the same region. The final + /// result is degraded to `None`, and the collected values are logged. + Conflict { + /// Distinct proved watermark candidates reported for the region. + watermarks: Vec, + }, +} + lazy_static! { /// Timer of different stages in query. pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( @@ -129,3 +154,289 @@ impl Stream for OnDone { self.stream.size_hint() } } + +pub struct RegionWatermarkMetricsStream { + stream: SendableRecordBatchStream, + plan: Arc, +} + +impl RegionWatermarkMetricsStream { + pub fn new(stream: SendableRecordBatchStream, plan: Arc) -> Self { + Self { stream, plan } + } +} + +impl RecordBatchStream for RegionWatermarkMetricsStream { + fn name(&self) -> &str { + self.stream.name() + } + + fn schema(&self) -> SchemaRef { + self.stream.schema() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.stream.output_ordering() + } + + fn metrics(&self) -> Option { + let mut metrics = self.stream.metrics()?; + let region_watermarks = collect_region_watermarks(self.plan.clone()); + if !region_watermarks.is_empty() { + metrics.region_watermarks = region_watermarks; + } + Some(metrics) + } +} + +impl Stream for RegionWatermarkMetricsStream { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +pub fn terminal_recordbatch_metrics_from_plan( + plan: Arc, +) -> Option { + let region_watermarks = collect_region_watermarks(plan); + if region_watermarks.is_empty() { + None + } else { + Some(RecordBatchMetrics { + region_watermarks, + ..Default::default() + }) + } +} + +fn collect_region_watermarks(plan: Arc) -> Vec { + let mut merged = BTreeMap::::new(); + let mut stack = vec![plan]; + + while let Some(plan) = stack.pop() { + if let Some(merge_scan) = plan.as_any().downcast_ref::() { + merge_merge_scan_region_watermarks( + &mut merged, + merge_scan + .regions() + .iter() + .map(|region_id| region_id.as_u64()), + merge_scan.sub_stage_metrics(), + ); + } + + stack.extend(plan.children().into_iter().cloned()); + } + + finalize_region_watermarks(merged) +} + +fn merge_merge_scan_region_watermarks( + merged: &mut BTreeMap, + regions: impl IntoIterator, + sub_stage_metrics: impl IntoIterator, +) { + for region_id in regions { + merged.entry(region_id).or_insert(MergeState::Participated); + } + + for metrics in sub_stage_metrics { + for entry in metrics.region_watermarks { + merged + .entry(entry.region_id) + .and_modify(|existing| match entry.watermark { + None => match existing { + MergeState::Participated | MergeState::Proved(_) => { + *existing = MergeState::Unproved; + } + MergeState::Unproved | MergeState::Conflict { .. } => {} + }, + Some(seq) => match existing { + MergeState::Participated => { + *existing = MergeState::Proved(seq); + } + MergeState::Unproved => {} + MergeState::Proved(existing_seq) if *existing_seq == seq => {} + MergeState::Proved(existing_seq) => { + let old_seq = *existing_seq; + *existing = MergeState::Conflict { + watermarks: vec![old_seq, seq], + }; + } + MergeState::Conflict { watermarks } => { + if !watermarks.contains(&seq) { + watermarks.push(seq); + } + } + }, + }) + .or_insert(match entry.watermark { + Some(seq) => MergeState::Proved(seq), + None => MergeState::Unproved, + }); + } + } +} + +fn finalize_region_watermarks(merged: BTreeMap) -> Vec { + merged + .into_iter() + .map(|(region_id, state)| RegionWatermarkEntry { + region_id, + watermark: match state { + MergeState::Participated => None, + MergeState::Unproved => None, + MergeState::Proved(seq) => Some(seq), + MergeState::Conflict { watermarks } => { + warn!( + "Conflicting proved watermarks for region {}: {:?}; degrading to unproved", + region_id, watermarks + ); + None + } + }, + }) + .collect() +} + +#[cfg(test)] +mod tests { + use datafusion::arrow::datatypes::Schema as ArrowSchema; + use datafusion::physical_plan::empty::EmptyExec; + + use super::*; + + fn metrics_with_region_watermarks(entries: &[(u64, Option)]) -> RecordBatchMetrics { + RecordBatchMetrics { + region_watermarks: entries + .iter() + .map(|(region_id, watermark)| RegionWatermarkEntry { + region_id: *region_id, + watermark: *watermark, + }) + .collect(), + ..Default::default() + } + } + + #[test] + fn terminal_metrics_returns_none_without_merge_scan() { + let plan: Arc = Arc::new(EmptyExec::new(Arc::new(ArrowSchema::empty()))); + assert!(terminal_recordbatch_metrics_from_plan(plan).is_none()); + } + + #[test] + fn merge_merge_scan_region_watermarks_marks_missing_watermarks_unproved() { + let mut merged = BTreeMap::new(); + + merge_merge_scan_region_watermarks(&mut merged, [1, 2], std::iter::empty()); + + assert_eq!( + finalize_region_watermarks(merged), + vec![ + RegionWatermarkEntry { + region_id: 1, + watermark: None, + }, + RegionWatermarkEntry { + region_id: 2, + watermark: None, + }, + ] + ); + } + + #[test] + fn merge_merge_scan_region_watermarks_keeps_matching_proved_values() { + let mut merged = BTreeMap::new(); + + merge_merge_scan_region_watermarks( + &mut merged, + [42], + [ + metrics_with_region_watermarks(&[(42, Some(7))]), + metrics_with_region_watermarks(&[(42, Some(7))]), + ], + ); + + assert_eq!( + finalize_region_watermarks(merged), + vec![RegionWatermarkEntry { + region_id: 42, + watermark: Some(7), + }] + ); + } + + #[test] + fn merge_merge_scan_region_watermarks_degrades_conflicting_proved_values() { + let mut merged = BTreeMap::new(); + + merge_merge_scan_region_watermarks( + &mut merged, + [7], + [ + metrics_with_region_watermarks(&[(7, Some(11))]), + metrics_with_region_watermarks(&[(7, Some(13))]), + ], + ); + + assert_eq!( + finalize_region_watermarks(merged), + vec![RegionWatermarkEntry { + region_id: 7, + watermark: None, + }] + ); + } + + #[test] + fn merge_merge_scan_region_watermarks_none_vetoes_proved_value() { + let mut merged = BTreeMap::new(); + + merge_merge_scan_region_watermarks( + &mut merged, + [9], + [ + metrics_with_region_watermarks(&[(9, Some(21))]), + metrics_with_region_watermarks(&[(9, None)]), + ], + ); + + assert_eq!( + finalize_region_watermarks(merged), + vec![RegionWatermarkEntry { + region_id: 9, + watermark: None, + }] + ); + } + + #[test] + fn merge_merge_scan_region_watermarks_none_vetoes_proved_value_regardless_of_order() { + let mut merged = BTreeMap::new(); + + merge_merge_scan_region_watermarks( + &mut merged, + [9], + [ + metrics_with_region_watermarks(&[(9, None)]), + metrics_with_region_watermarks(&[(9, Some(21))]), + ], + ); + + assert_eq!( + finalize_region_watermarks(merged), + vec![RegionWatermarkEntry { + region_id: 9, + watermark: None, + }] + ); + } +}