feat: add seq watermark in record batch metrics (#8015)

* feat: add seq watermark in record batch metrics

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-24 15:54:10 +08:00
committed by GitHub
parent aaa3f2cdf9
commit 1167c36c4e
3 changed files with 387 additions and 1 deletions

View File

@@ -446,6 +446,27 @@ pub struct RecordBatchMetrics {
// Detailed per-plan metrics
/// An ordered list of plan metrics, from top to bottom in post-order.
pub plan_metrics: Vec<PlanMetrics>,
/// Per-region watermark for incremental-read checkpoint advancement.
///
/// The watermark is the latest sequence (`seq`) this query round safely read
/// for each participating region. Flow uses it to decide where the next
/// incremental round can resume.
///
/// - `Some(seq)`: the query proved it safely read up to `seq`; downstream
/// may advance the checkpoint to this value.
/// - `None`: the region participated but the query could not prove a safe
/// read upper-bound, so the checkpoint must not advance for this region.
///
/// Omitted when empty for backward compatibility.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub region_watermarks: Vec<RegionWatermarkEntry>,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct RegionWatermarkEntry {
pub region_id: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub watermark: Option<u64>,
}
/// Determines if a metric name represents a time measurement that should be formatted.
@@ -706,6 +727,7 @@ mod test {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::Int32Vector;
use serde_json::json;
use snafu::IntoError;
use super::*;
@@ -879,4 +901,53 @@ mod test {
}
}
}
#[test]
fn test_recordbatch_metrics_deserializes_without_region_watermarks() {
let metrics: RecordBatchMetrics = serde_json::from_value(json!({
"elapsed_compute": 12,
"memory_usage": 34,
"plan_metrics": []
}))
.unwrap();
assert!(metrics.region_watermarks.is_empty());
assert_eq!(metrics.elapsed_compute, 12);
assert_eq!(metrics.memory_usage, 34);
}
#[test]
fn test_recordbatch_metrics_region_watermarks_serde_roundtrip() {
let metrics = RecordBatchMetrics {
region_watermarks: vec![
RegionWatermarkEntry {
region_id: 1,
watermark: Some(100),
},
RegionWatermarkEntry {
region_id: 2,
watermark: None,
},
],
..Default::default()
};
let value = serde_json::to_value(&metrics).unwrap();
assert_eq!(
value.get("region_watermarks").unwrap(),
&json!([
{ "region_id": 1, "watermark": 100 },
{ "region_id": 2 }
])
);
let decoded: RecordBatchMetrics = serde_json::from_value(value).unwrap();
assert_eq!(decoded.region_watermarks, metrics.region_watermarks);
}
#[test]
fn test_recordbatch_metrics_skips_empty_region_watermarks_on_serialize() {
let value = serde_json::to_value(RecordBatchMetrics::default()).unwrap();
assert!(value.get("region_watermarks").is_none());
}
}

View File

@@ -477,6 +477,10 @@ impl MergeScanExec {
.collect()
}
pub fn regions(&self) -> &[RegionId] {
&self.regions
}
pub fn partition_count(&self) -> usize {
self.target_partition
}

View File

@@ -12,17 +12,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_telemetry::warn;
use datafusion::physical_plan::ExecutionPlan;
use datatypes::schema::SchemaRef;
use futures::Stream;
use futures_util::ready;
use lazy_static::lazy_static;
use prometheus::*;
use crate::dist_plan::MergeScanExec;
/// Intermediate merge state for one participating region while collecting
/// terminal correctness watermarks across merge-scan sub-stages.
enum MergeState {
/// The region participated, but no explicit watermark result has been seen
/// yet for this merge.
Participated,
/// At least one branch reported that this region cannot prove a safe
/// checkpoint watermark for the current query round.
Unproved,
/// All seen branches agree the region can advance safely to this sequence.
Proved(u64),
/// Different proved sequences were reported for the same region. The final
/// result is degraded to `None`, and the collected values are logged.
Conflict {
/// Distinct proved watermark candidates reported for the region.
watermarks: Vec<u64>,
},
}
lazy_static! {
/// Timer of different stages in query.
pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
@@ -129,3 +154,289 @@ impl<F: FnOnce() + Unpin> Stream for OnDone<F> {
self.stream.size_hint()
}
}
pub struct RegionWatermarkMetricsStream {
stream: SendableRecordBatchStream,
plan: Arc<dyn ExecutionPlan>,
}
impl RegionWatermarkMetricsStream {
pub fn new(stream: SendableRecordBatchStream, plan: Arc<dyn ExecutionPlan>) -> Self {
Self { stream, plan }
}
}
impl RecordBatchStream for RegionWatermarkMetricsStream {
fn name(&self) -> &str {
self.stream.name()
}
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.stream.output_ordering()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
let mut metrics = self.stream.metrics()?;
let region_watermarks = collect_region_watermarks(self.plan.clone());
if !region_watermarks.is_empty() {
metrics.region_watermarks = region_watermarks;
}
Some(metrics)
}
}
impl Stream for RegionWatermarkMetricsStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
pub fn terminal_recordbatch_metrics_from_plan(
plan: Arc<dyn ExecutionPlan>,
) -> Option<RecordBatchMetrics> {
let region_watermarks = collect_region_watermarks(plan);
if region_watermarks.is_empty() {
None
} else {
Some(RecordBatchMetrics {
region_watermarks,
..Default::default()
})
}
}
fn collect_region_watermarks(plan: Arc<dyn ExecutionPlan>) -> Vec<RegionWatermarkEntry> {
let mut merged = BTreeMap::<u64, MergeState>::new();
let mut stack = vec![plan];
while let Some(plan) = stack.pop() {
if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
merge_merge_scan_region_watermarks(
&mut merged,
merge_scan
.regions()
.iter()
.map(|region_id| region_id.as_u64()),
merge_scan.sub_stage_metrics(),
);
}
stack.extend(plan.children().into_iter().cloned());
}
finalize_region_watermarks(merged)
}
fn merge_merge_scan_region_watermarks(
merged: &mut BTreeMap<u64, MergeState>,
regions: impl IntoIterator<Item = u64>,
sub_stage_metrics: impl IntoIterator<Item = RecordBatchMetrics>,
) {
for region_id in regions {
merged.entry(region_id).or_insert(MergeState::Participated);
}
for metrics in sub_stage_metrics {
for entry in metrics.region_watermarks {
merged
.entry(entry.region_id)
.and_modify(|existing| match entry.watermark {
None => match existing {
MergeState::Participated | MergeState::Proved(_) => {
*existing = MergeState::Unproved;
}
MergeState::Unproved | MergeState::Conflict { .. } => {}
},
Some(seq) => match existing {
MergeState::Participated => {
*existing = MergeState::Proved(seq);
}
MergeState::Unproved => {}
MergeState::Proved(existing_seq) if *existing_seq == seq => {}
MergeState::Proved(existing_seq) => {
let old_seq = *existing_seq;
*existing = MergeState::Conflict {
watermarks: vec![old_seq, seq],
};
}
MergeState::Conflict { watermarks } => {
if !watermarks.contains(&seq) {
watermarks.push(seq);
}
}
},
})
.or_insert(match entry.watermark {
Some(seq) => MergeState::Proved(seq),
None => MergeState::Unproved,
});
}
}
}
fn finalize_region_watermarks(merged: BTreeMap<u64, MergeState>) -> Vec<RegionWatermarkEntry> {
merged
.into_iter()
.map(|(region_id, state)| RegionWatermarkEntry {
region_id,
watermark: match state {
MergeState::Participated => None,
MergeState::Unproved => None,
MergeState::Proved(seq) => Some(seq),
MergeState::Conflict { watermarks } => {
warn!(
"Conflicting proved watermarks for region {}: {:?}; degrading to unproved",
region_id, watermarks
);
None
}
},
})
.collect()
}
#[cfg(test)]
mod tests {
use datafusion::arrow::datatypes::Schema as ArrowSchema;
use datafusion::physical_plan::empty::EmptyExec;
use super::*;
fn metrics_with_region_watermarks(entries: &[(u64, Option<u64>)]) -> RecordBatchMetrics {
RecordBatchMetrics {
region_watermarks: entries
.iter()
.map(|(region_id, watermark)| RegionWatermarkEntry {
region_id: *region_id,
watermark: *watermark,
})
.collect(),
..Default::default()
}
}
#[test]
fn terminal_metrics_returns_none_without_merge_scan() {
let plan: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::new(ArrowSchema::empty())));
assert!(terminal_recordbatch_metrics_from_plan(plan).is_none());
}
#[test]
fn merge_merge_scan_region_watermarks_marks_missing_watermarks_unproved() {
let mut merged = BTreeMap::new();
merge_merge_scan_region_watermarks(&mut merged, [1, 2], std::iter::empty());
assert_eq!(
finalize_region_watermarks(merged),
vec![
RegionWatermarkEntry {
region_id: 1,
watermark: None,
},
RegionWatermarkEntry {
region_id: 2,
watermark: None,
},
]
);
}
#[test]
fn merge_merge_scan_region_watermarks_keeps_matching_proved_values() {
let mut merged = BTreeMap::new();
merge_merge_scan_region_watermarks(
&mut merged,
[42],
[
metrics_with_region_watermarks(&[(42, Some(7))]),
metrics_with_region_watermarks(&[(42, Some(7))]),
],
);
assert_eq!(
finalize_region_watermarks(merged),
vec![RegionWatermarkEntry {
region_id: 42,
watermark: Some(7),
}]
);
}
#[test]
fn merge_merge_scan_region_watermarks_degrades_conflicting_proved_values() {
let mut merged = BTreeMap::new();
merge_merge_scan_region_watermarks(
&mut merged,
[7],
[
metrics_with_region_watermarks(&[(7, Some(11))]),
metrics_with_region_watermarks(&[(7, Some(13))]),
],
);
assert_eq!(
finalize_region_watermarks(merged),
vec![RegionWatermarkEntry {
region_id: 7,
watermark: None,
}]
);
}
#[test]
fn merge_merge_scan_region_watermarks_none_vetoes_proved_value() {
let mut merged = BTreeMap::new();
merge_merge_scan_region_watermarks(
&mut merged,
[9],
[
metrics_with_region_watermarks(&[(9, Some(21))]),
metrics_with_region_watermarks(&[(9, None)]),
],
);
assert_eq!(
finalize_region_watermarks(merged),
vec![RegionWatermarkEntry {
region_id: 9,
watermark: None,
}]
);
}
#[test]
fn merge_merge_scan_region_watermarks_none_vetoes_proved_value_regardless_of_order() {
let mut merged = BTreeMap::new();
merge_merge_scan_region_watermarks(
&mut merged,
[9],
[
metrics_with_region_watermarks(&[(9, None)]),
metrics_with_region_watermarks(&[(9, Some(21))]),
],
);
assert_eq!(
finalize_region_watermarks(merged),
vec![RegionWatermarkEntry {
region_id: 9,
watermark: None,
}]
);
}
}