From 91e015dca8b761f61982e7fbd46041ff1dea8afd Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 23 Apr 2026 20:20:07 +0800 Subject: [PATCH] per review Signed-off-by: discord9 --- src/query/src/metrics.rs | 122 +++++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 43 deletions(-) diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index ca9df90518..9a376d748c 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -29,12 +29,21 @@ use prometheus::*; use crate::dist_plan::MergeScanExec; -#[derive(Clone)] +/// Intermediate merge state for one participating region while collecting +/// terminal correctness watermarks across merge-scan sub-stages. enum MergeState { + /// The region participated, but no explicit watermark result has been seen + /// yet for this merge. + Participated, + /// At least one branch reported that this region cannot prove a safe + /// checkpoint watermark for the current query round. Unproved, + /// All seen branches agree the region can advance safely to this sequence. Proved(u64), + /// Different proved sequences were reported for the same region. The final + /// result is degraded to `None`, and the collected values are logged. Conflict { - region_id: u64, + /// Distinct proved watermark candidates reported for the region. watermarks: Vec, }, } @@ -234,53 +243,38 @@ fn merge_merge_scan_region_watermarks( sub_stage_metrics: impl IntoIterator, ) { for region_id in regions { - merged.entry(region_id).or_insert(MergeState::Unproved); + merged.entry(region_id).or_insert(MergeState::Participated); } for metrics in sub_stage_metrics { for entry in metrics.region_watermarks { merged .entry(entry.region_id) - .and_modify(|existing| { - *existing = match (existing.clone(), entry.watermark) { - ( - MergeState::Conflict { - region_id, - mut watermarks, - }, - Some(seq), - ) => { + .and_modify(|existing| match entry.watermark { + None => match existing { + MergeState::Participated | MergeState::Proved(_) => { + *existing = MergeState::Unproved; + } + MergeState::Unproved | MergeState::Conflict { .. } => {} + }, + Some(seq) => match existing { + MergeState::Participated => { + *existing = MergeState::Proved(seq); + } + MergeState::Unproved => {} + MergeState::Proved(existing_seq) if *existing_seq == seq => {} + MergeState::Proved(existing_seq) => { + let old_seq = *existing_seq; + *existing = MergeState::Conflict { + watermarks: vec![old_seq, seq], + }; + } + MergeState::Conflict { watermarks } => { if !watermarks.contains(&seq) { watermarks.push(seq); } - MergeState::Conflict { - region_id, - watermarks, - } } - ( - MergeState::Conflict { - region_id, - watermarks, - }, - None, - ) => MergeState::Conflict { - region_id, - watermarks, - }, - (MergeState::Unproved, None) => MergeState::Unproved, - (MergeState::Unproved, Some(seq)) => MergeState::Proved(seq), - (MergeState::Proved(existing_seq), None) => { - MergeState::Proved(existing_seq) - } - (MergeState::Proved(existing_seq), Some(seq)) if existing_seq == seq => { - MergeState::Proved(existing_seq) - } - (MergeState::Proved(existing_seq), Some(seq)) => MergeState::Conflict { - region_id: entry.region_id, - watermarks: vec![existing_seq, seq], - }, - } + }, }) .or_insert(match entry.watermark { Some(seq) => MergeState::Proved(seq), @@ -296,12 +290,10 @@ fn finalize_region_watermarks(merged: BTreeMap) -> Vec None, MergeState::Unproved => None, MergeState::Proved(seq) => Some(seq), - MergeState::Conflict { - region_id, - watermarks, - } => { + MergeState::Conflict { watermarks } => { warn!( "Conflicting proved watermarks for region {}: {:?}; degrading to unproved", region_id, watermarks @@ -403,4 +395,48 @@ mod tests { }] ); } + + #[test] + fn merge_merge_scan_region_watermarks_none_vetoes_proved_value() { + let mut merged = BTreeMap::new(); + + merge_merge_scan_region_watermarks( + &mut merged, + [9], + [ + metrics_with_region_watermarks(&[(9, Some(21))]), + metrics_with_region_watermarks(&[(9, None)]), + ], + ); + + assert_eq!( + finalize_region_watermarks(merged), + vec![RegionWatermarkEntry { + region_id: 9, + watermark: None, + }] + ); + } + + #[test] + fn merge_merge_scan_region_watermarks_none_vetoes_proved_value_regardless_of_order() { + let mut merged = BTreeMap::new(); + + merge_merge_scan_region_watermarks( + &mut merged, + [9], + [ + metrics_with_region_watermarks(&[(9, None)]), + metrics_with_region_watermarks(&[(9, Some(21))]), + ], + ); + + assert_eq!( + finalize_region_watermarks(merged), + vec![RegionWatermarkEntry { + region_id: 9, + watermark: None, + }] + ); + } }