per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-23 20:20:07 +08:00
parent c6e881d68f
commit 91e015dca8

View File

@@ -29,12 +29,21 @@ use prometheus::*;
use crate::dist_plan::MergeScanExec;
#[derive(Clone)]
/// Intermediate merge state for one participating region while collecting
/// terminal correctness watermarks across merge-scan sub-stages.
enum MergeState {
/// The region participated, but no explicit watermark result has been seen
/// yet for this merge.
Participated,
/// At least one branch reported that this region cannot prove a safe
/// checkpoint watermark for the current query round.
Unproved,
/// All seen branches agree the region can advance safely to this sequence.
Proved(u64),
/// Different proved sequences were reported for the same region. The final
/// result is degraded to `None`, and the collected values are logged.
Conflict {
region_id: u64,
/// Distinct proved watermark candidates reported for the region.
watermarks: Vec<u64>,
},
}
@@ -234,53 +243,38 @@ fn merge_merge_scan_region_watermarks(
sub_stage_metrics: impl IntoIterator<Item = RecordBatchMetrics>,
) {
for region_id in regions {
merged.entry(region_id).or_insert(MergeState::Unproved);
merged.entry(region_id).or_insert(MergeState::Participated);
}
for metrics in sub_stage_metrics {
for entry in metrics.region_watermarks {
merged
.entry(entry.region_id)
.and_modify(|existing| {
*existing = match (existing.clone(), entry.watermark) {
(
MergeState::Conflict {
region_id,
mut watermarks,
},
Some(seq),
) => {
.and_modify(|existing| match entry.watermark {
None => match existing {
MergeState::Participated | MergeState::Proved(_) => {
*existing = MergeState::Unproved;
}
MergeState::Unproved | MergeState::Conflict { .. } => {}
},
Some(seq) => match existing {
MergeState::Participated => {
*existing = MergeState::Proved(seq);
}
MergeState::Unproved => {}
MergeState::Proved(existing_seq) if *existing_seq == seq => {}
MergeState::Proved(existing_seq) => {
let old_seq = *existing_seq;
*existing = MergeState::Conflict {
watermarks: vec![old_seq, seq],
};
}
MergeState::Conflict { watermarks } => {
if !watermarks.contains(&seq) {
watermarks.push(seq);
}
MergeState::Conflict {
region_id,
watermarks,
}
}
(
MergeState::Conflict {
region_id,
watermarks,
},
None,
) => MergeState::Conflict {
region_id,
watermarks,
},
(MergeState::Unproved, None) => MergeState::Unproved,
(MergeState::Unproved, Some(seq)) => MergeState::Proved(seq),
(MergeState::Proved(existing_seq), None) => {
MergeState::Proved(existing_seq)
}
(MergeState::Proved(existing_seq), Some(seq)) if existing_seq == seq => {
MergeState::Proved(existing_seq)
}
(MergeState::Proved(existing_seq), Some(seq)) => MergeState::Conflict {
region_id: entry.region_id,
watermarks: vec![existing_seq, seq],
},
}
},
})
.or_insert(match entry.watermark {
Some(seq) => MergeState::Proved(seq),
@@ -296,12 +290,10 @@ fn finalize_region_watermarks(merged: BTreeMap<u64, MergeState>) -> Vec<RegionWa
.map(|(region_id, state)| RegionWatermarkEntry {
region_id,
watermark: match state {
MergeState::Participated => None,
MergeState::Unproved => None,
MergeState::Proved(seq) => Some(seq),
MergeState::Conflict {
region_id,
watermarks,
} => {
MergeState::Conflict { watermarks } => {
warn!(
"Conflicting proved watermarks for region {}: {:?}; degrading to unproved",
region_id, watermarks
@@ -403,4 +395,48 @@ mod tests {
}]
);
}
#[test]
fn merge_merge_scan_region_watermarks_none_vetoes_proved_value() {
let mut merged = BTreeMap::new();
merge_merge_scan_region_watermarks(
&mut merged,
[9],
[
metrics_with_region_watermarks(&[(9, Some(21))]),
metrics_with_region_watermarks(&[(9, None)]),
],
);
assert_eq!(
finalize_region_watermarks(merged),
vec![RegionWatermarkEntry {
region_id: 9,
watermark: None,
}]
);
}
#[test]
fn merge_merge_scan_region_watermarks_none_vetoes_proved_value_regardless_of_order() {
let mut merged = BTreeMap::new();
merge_merge_scan_region_watermarks(
&mut merged,
[9],
[
metrics_with_region_watermarks(&[(9, None)]),
metrics_with_region_watermarks(&[(9, Some(21))]),
],
);
assert_eq!(
finalize_region_watermarks(merged),
vec![RegionWatermarkEntry {
region_id: 9,
watermark: None,
}]
);
}
}