feat(flow): support incremental read checkpoints (#8179)

* feat: flownode inc mode

Signed-off-by: discord9 <discord9@163.com>

* chore: rename fallback reason

Signed-off-by: discord9 <discord9@163.com>

* fix: harden flow incremental checkpoints

Signed-off-by: discord9 <discord9@163.com>

* fix: address flow watermark lint

Signed-off-by: discord9 <discord9@163.com>

* fix: address flow clippy

Signed-off-by: discord9 <discord9@163.com>

* refactor: clarify incremental plan preparation

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* test: more sqlness test

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-05-28 17:31:46 +08:00
committed by GitHub
parent 17815830ed
commit eccd97b5c7
24 changed files with 3201 additions and 103 deletions

View File

@@ -23,6 +23,7 @@ use session::ReadPreference;
mod checkpoint;
pub(crate) mod engine;
pub(crate) mod frontend_client;
mod incremental_filter;
mod state;
mod table_creator;
mod task;

View File

@@ -12,12 +12,116 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[allow(dead_code)]
use crate::batching_mode::state::CheckpointMode;
pub(super) const CHECKPOINT_DECISION_ADVANCE: &str = "advance";
pub(super) const CHECKPOINT_DECISION_FALLBACK: &str = "fallback";
pub(super) const CHECKPOINT_REASON_NONE: &str = "none";
/// Why the task fell back to full snapshot mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum CheckpointMode {
/// Full-snapshot reads over the source tables.
FullSnapshot,
/// Incremental reads driven by explicitly emitted incremental scan
/// extensions.
Incremental,
pub(super) enum FlowQueryFallbackReason {
/// The query result did not include a region-watermark map at all.
MissingRegionWatermark,
/// Some participating regions could not prove safe advancement against
/// both the returned watermarks and the checkpoint map.
IncompleteRegionWatermark,
/// The query only covered part of the dirty backlog, so global checkpoints
/// cannot advance yet. Incremental SQL drains all dirty windows before
/// checkpoint advancement; this primarily protects scoped full-snapshot
/// runs capped by the per-query dirty-window limit.
DirtyBacklogPending,
/// The datanode detected a stale incremental cursor and the Flow
/// must recompute from scratch.
StaleCursor,
/// A non-stale-cursor query failure; the Flow resets to full snapshot
/// to avoid cascading errors.
IncrementalQueryFailure,
/// Incremental mode has been permanently disabled for this Flow
/// (e.g. because the query shape is not incrementally safe).
IncrementalDisabled,
}
impl FlowQueryFallbackReason {
pub(super) fn as_label(self) -> &'static str {
match self {
Self::MissingRegionWatermark => "missing_region_watermark",
Self::IncompleteRegionWatermark => "incomplete_region_watermark",
Self::DirtyBacklogPending => "dirty_backlog_pending",
Self::StaleCursor => "stale_cursor",
Self::IncrementalQueryFailure => "incremental_query_failure",
Self::IncrementalDisabled => "incremental_disabled",
}
}
}
/// Decision produced by `BatchingTask::apply_query_result_to_state` after
/// each Flow query execution. Describes whether the task advanced its
/// checkpoint state or fell back to full snapshot, and why.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum FlowCheckpointDecision {
/// FullSnapshot → Incremental transition.
///
/// The query exercised every participating region, all returned valid
/// watermarks, and the checkpoint map was populated from scratch.
/// Subsequent executions will use incremental after-seqs.
AdvancedFromFullSnapshot {
participating_regions: usize,
watermarks: usize,
},
/// Existing Incremental → Incremental (in-place advancement).
///
/// A subset of participating regions advanced their watermarks. The
/// task stays in incremental mode with an updated checkpoint map.
AdvancedIncremental {
participating_regions: usize,
watermarks: usize,
},
/// Any mode → FullSnapshot.
///
/// Watermark information was incomplete, a participating region was
/// absent from the existing checkpoint map, the task has permanently
/// disabled incremental mode, or the query itself failed. The task
/// resets to full snapshot semantics for the next execution.
FallbackToFullSnapshot {
previous_mode: CheckpointMode,
reason: FlowQueryFallbackReason,
},
}
impl FlowCheckpointDecision {
pub(super) fn mode_label(self) -> &'static str {
match self {
Self::AdvancedFromFullSnapshot { .. } => {
checkpoint_mode_label(CheckpointMode::FullSnapshot)
}
Self::AdvancedIncremental { .. } => checkpoint_mode_label(CheckpointMode::Incremental),
Self::FallbackToFullSnapshot { previous_mode, .. } => {
checkpoint_mode_label(previous_mode)
}
}
}
pub(super) fn decision_label(self) -> &'static str {
match self {
Self::AdvancedFromFullSnapshot { .. } | Self::AdvancedIncremental { .. } => {
CHECKPOINT_DECISION_ADVANCE
}
Self::FallbackToFullSnapshot { .. } => CHECKPOINT_DECISION_FALLBACK,
}
}
pub(super) fn reason_label(self) -> &'static str {
match self {
Self::FallbackToFullSnapshot { reason, .. } => reason.as_label(),
_ => CHECKPOINT_REASON_NONE,
}
}
}
pub(super) fn checkpoint_mode_label(mode: CheckpointMode) -> &'static str {
match mode {
CheckpointMode::FullSnapshot => "full_snapshot",
CheckpointMode::Incremental => "incremental",
}
}

View File

@@ -340,12 +340,13 @@ impl FrontendClient {
}
}
pub async fn query_with_terminal_metrics(
pub(crate) async fn query_with_terminal_metrics(
&self,
catalog: &str,
schema: &str,
request: QueryRequest,
extensions: &[(&str, &str)],
peer_desc: &mut Option<PeerDesc>,
) -> Result<OutputWithMetrics, Error> {
let flow_extensions = build_flow_extensions(extensions)?;
match self {
@@ -358,6 +359,9 @@ impl FrontendClient {
(READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
];
let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
db.database
.query_with_terminal_metrics_and_flow_extensions(request, &hints, extensions)
.await
@@ -368,6 +372,7 @@ impl FrontendClient {
database_client,
query,
} => {
*peer_desc = Some(PeerDesc::Standalone);
let mut extensions_map = HashMap::from([(
QUERY_PARALLELISM_HINT.to_string(),
query.parallelism.to_string(),
@@ -556,21 +561,24 @@ fn terminal_recordbatch_metrics_from_snapshots(
}
/// Describe a peer of frontend
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(crate) enum PeerDesc {
/// The query failed before a frontend peer was selected.
#[default]
Unknown,
/// Distributed mode's frontend peer address
Dist {
/// frontend peer address
peer: Peer,
},
/// Standalone mode
#[default]
Standalone,
}
impl std::fmt::Display for PeerDesc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PeerDesc::Unknown => write!(f, "unknown"),
PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
PeerDesc::Standalone => write!(f, "standalone"),
}
@@ -768,6 +776,7 @@ mod tests {
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(MetricsHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
let mut peer_desc = None;
let result = client
.query_with_terminal_metrics(
@@ -777,9 +786,11 @@ mod tests {
query: Some(Query::Sql("select 1".to_string())),
},
&[],
&mut peer_desc,
)
.await
.unwrap();
assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
let terminal_metrics = result.metrics.clone();
assert!(!result.metrics.is_ready());
@@ -802,6 +813,7 @@ mod tests {
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(ExtensionAwareHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
let mut peer_desc = None;
let result = client
.query_with_terminal_metrics(
@@ -811,9 +823,11 @@ mod tests {
query: Some(Query::Sql("insert into t select 1".to_string())),
},
&[("flow.return_region_seq", "true")],
&mut peer_desc,
)
.await
.unwrap();
assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
assert!(result.metrics.is_ready());
assert!(result.region_watermark_map().is_none());
@@ -824,6 +838,7 @@ mod tests {
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(SnapshotBindingHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
let mut peer_desc = None;
let result = client
.query_with_terminal_metrics(
@@ -833,9 +848,11 @@ mod tests {
query: Some(Query::Sql("insert into t select * from src".to_string())),
},
&[("flow.return_region_seq", "true")],
&mut peer_desc,
)
.await
.unwrap();
assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
assert!(result.metrics.is_ready());
assert_eq!(
@@ -849,6 +866,7 @@ mod tests {
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
let mut peer_desc = None;
let err = client
.query_with_terminal_metrics(
@@ -858,6 +876,7 @@ mod tests {
query: Some(Query::Sql("select 1".to_string())),
},
&[("flow.return_region_seq", "not-a-bool")],
&mut peer_desc,
)
.await
.unwrap_err();

View File

@@ -0,0 +1,222 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::tracing::debug;
use datafusion_expr::Expr;
use datatypes::schema::Schema;
use crate::batching_mode::state::FilterExprInfo;
use crate::batching_mode::utils::IncrementalAggregateAnalysis;
use crate::{Error, FlowId};
pub(super) fn build_sink_dirty_time_window_filter_expr(
flow_id: FlowId,
analysis: &IncrementalAggregateAnalysis,
sink_schema: &Schema,
dirty_filter: Option<&FilterExprInfo>,
) -> Result<Option<Expr>, Error> {
let Some(dirty_filter) = dirty_filter else {
return Ok(None);
};
let Some(sink_filter_col) =
infer_sink_time_window_filter_col(flow_id, analysis, sink_schema, dirty_filter)
else {
return Ok(None);
};
dirty_filter.predicate_for_col(&sink_filter_col)
}
fn infer_sink_time_window_filter_col(
flow_id: FlowId,
analysis: &IncrementalAggregateAnalysis,
sink_schema: &Schema,
dirty_filter: &FilterExprInfo,
) -> Option<String> {
if analysis.group_key_names.is_empty() {
return None;
}
let is_timestamp_group_key = |name: &str| {
analysis.group_key_names.iter().any(|key| key == name)
&& sink_schema
.column_schema_by_name(name)
.is_some_and(|col| col.data_type.is_timestamp())
};
if is_timestamp_group_key(&dirty_filter.col_name) {
return Some(dirty_filter.col_name.clone());
}
let candidates = analysis
.group_key_names
.iter()
.filter(|name| is_timestamp_group_key(name))
.cloned()
.collect::<Vec<_>>();
match candidates.as_slice() {
[name] => Some(name.clone()),
[] => {
debug!(
"Flow {} cannot infer sink dirty-window filter column: no timestamp group key in {:?}",
flow_id, analysis.group_key_names
);
None
}
_ => {
debug!(
"Flow {} cannot infer sink dirty-window filter column: ambiguous timestamp group keys {:?}",
flow_id, candidates
);
None
}
}
}
#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use pretty_assertions::assert_eq;
use super::*;
use crate::adapter::AUTO_CREATED_UPDATE_AT_TS_COL;
use crate::batching_mode::state::FilterExprInfo;
use crate::batching_mode::utils::IncrementalAggregateAnalysis;
fn test_analysis_with_group_keys(group_key_names: Vec<&str>) -> IncrementalAggregateAnalysis {
IncrementalAggregateAnalysis {
group_key_names: group_key_names
.into_iter()
.map(|name| name.to_string())
.collect(),
merge_columns: vec![],
literal_columns: vec![],
output_field_names: vec![],
unsupported_exprs: vec![],
}
}
fn test_dirty_filter(col_name: &str) -> FilterExprInfo {
FilterExprInfo {
expr: datafusion_expr::col(col_name),
col_name: col_name.to_string(),
time_ranges: vec![],
window_size: chrono::Duration::seconds(1),
}
}
fn test_sink_schema(columns: Vec<(&str, ConcreteDataType)>) -> Schema {
Schema::new(
columns
.into_iter()
.map(|(name, data_type)| ColumnSchema::new(name, data_type, true))
.collect(),
)
}
#[test]
fn test_infer_sink_time_window_filter_col_uses_matching_source_group_key() {
let analysis = test_analysis_with_group_keys(vec!["ts", "host"]);
let sink_schema = test_sink_schema(vec![
("ts", ConcreteDataType::timestamp_millisecond_datatype()),
("host", ConcreteDataType::string_datatype()),
]);
let dirty_filter = test_dirty_filter("ts");
assert_eq!(
Some("ts".to_string()),
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
#[test]
fn test_infer_sink_time_window_filter_col_uses_unique_timestamp_group_key() {
let analysis = test_analysis_with_group_keys(vec!["host", "time_window"]);
let sink_schema = test_sink_schema(vec![
("host", ConcreteDataType::string_datatype()),
(
"time_window",
ConcreteDataType::timestamp_millisecond_datatype(),
),
(
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
),
]);
let dirty_filter = test_dirty_filter("ts");
assert_eq!(
Some("time_window".to_string()),
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
#[test]
fn test_infer_sink_time_window_filter_col_skips_global_aggregate() {
let analysis = test_analysis_with_group_keys(vec![]);
let sink_schema = test_sink_schema(vec![
("number", ConcreteDataType::uint32_datatype()),
(
"time_window",
ConcreteDataType::timestamp_millisecond_datatype(),
),
]);
let dirty_filter = test_dirty_filter("ts");
assert_eq!(
None,
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
#[test]
fn test_infer_sink_time_window_filter_col_skips_without_timestamp_group_key() {
let analysis = test_analysis_with_group_keys(vec!["host", "device"]);
let sink_schema = test_sink_schema(vec![
("host", ConcreteDataType::string_datatype()),
("device", ConcreteDataType::string_datatype()),
(
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
),
]);
let dirty_filter = test_dirty_filter("ts");
assert_eq!(
None,
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
#[test]
fn test_infer_sink_time_window_filter_col_skips_ambiguous_timestamp_group_keys() {
let analysis = test_analysis_with_group_keys(vec!["ts", "time_window"]);
let sink_schema = test_sink_schema(vec![
("ts", ConcreteDataType::timestamp_millisecond_datatype()),
(
"time_window",
ConcreteDataType::timestamp_millisecond_datatype(),
),
]);
let dirty_filter = test_dirty_filter("source_ts");
assert_eq!(
None,
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
}

View File

@@ -15,7 +15,7 @@
//! Batching mode task state, which changes frequently
//!
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::time::Duration;
use common_telemetry::debug;
@@ -50,6 +50,14 @@ pub struct TaskState {
/// Dirty Time windows need to be updated
/// mapping of `start -> end` and non-overlapping
pub(crate) dirty_time_windows: DirtyTimeWindows,
checkpoint_mode: CheckpointMode,
/// Region id -> last consumed watermark sequence. Incremental scans use
/// this as the next lower sequence bound for each source region.
checkpoints: BTreeMap<u64, u64>,
/// Once set, the task will never attempt incremental mode again.
/// Set when the flow's query shape is deterministically incompatible
/// with incremental execution (e.g. unsupported aggregate expressions).
incremental_disabled: bool,
exec_state: ExecState,
/// Shutdown receiver
pub(crate) shutdown_rx: oneshot::Receiver<()>,
@@ -64,6 +72,9 @@ impl TaskState {
last_query_duration: Duration::from_secs(0),
last_exec_time_millis: None,
dirty_time_windows: Default::default(),
checkpoint_mode: CheckpointMode::FullSnapshot,
checkpoints: Default::default(),
incremental_disabled: false,
exec_state: ExecState::Idle,
shutdown_rx,
task_handle: None,
@@ -85,6 +96,84 @@ impl TaskState {
self.last_exec_time_millis
}
pub fn checkpoint_mode(&self) -> CheckpointMode {
self.checkpoint_mode
}
pub fn checkpoints(&self) -> &BTreeMap<u64, u64> {
&self.checkpoints
}
pub fn is_incremental_disabled(&self) -> bool {
self.incremental_disabled
}
/// Permanently disable incremental mode for this task and
/// immediately fall back to full snapshot for the current cycle.
pub fn disable_incremental(&mut self) {
self.incremental_disabled = true;
self.mark_full_snapshot();
}
pub fn mark_full_snapshot(&mut self) {
self.checkpoint_mode = CheckpointMode::FullSnapshot;
}
pub fn advance_checkpoints(&mut self, watermark_map: HashMap<u64, u64>) {
self.checkpoints = watermark_map.into_iter().collect();
if !self.incremental_disabled {
self.checkpoint_mode = CheckpointMode::Incremental;
}
}
pub fn advance_incremental_checkpoints_with_participation(
&mut self,
participating_regions: &BTreeSet<u64>,
watermark_map: HashMap<u64, u64>,
) {
for region_id in participating_regions {
if let Some(seq) = watermark_map.get(region_id) {
self.checkpoints.insert(*region_id, *seq);
}
}
if !self.incremental_disabled {
self.checkpoint_mode = CheckpointMode::Incremental;
}
}
pub fn can_advance_full_snapshot_checkpoints(
&self,
participating_regions: &BTreeSet<u64>,
watermark_map: &HashMap<u64, u64>,
) -> bool {
!participating_regions.is_empty()
&& participating_regions.len() == watermark_map.len()
&& participating_regions
.iter()
.all(|region_id| watermark_map.contains_key(region_id))
}
pub fn can_advance_incremental_checkpoints_with_participation(
&self,
participating_regions: &BTreeSet<u64>,
watermark_map: &HashMap<u64, u64>,
) -> bool {
!self.incremental_disabled
&& !self.checkpoints.is_empty()
&& !participating_regions.is_empty()
&& participating_regions.len() == watermark_map.len()
&& participating_regions
.iter()
.all(|region_id| self.checkpoints.contains_key(region_id))
&& participating_regions.iter().all(|region_id| {
let checkpoint = self.checkpoints.get(region_id);
watermark_map
.get(region_id)
.zip(checkpoint)
.is_some_and(|(seq, checkpoint)| seq >= checkpoint)
})
}
/// Compute the next query delay based on the time window size or the last query duration.
/// Aiming to avoid too frequent queries. But also not too long delay.
///
@@ -95,6 +184,10 @@ impl TaskState {
/// if current the dirty time range is longer than one query can handle,
/// execute immediately to faster clean up dirty time windows.
///
/// If `prefer_short_incremental_cadence` is true, run incremental queries
/// more often when there is no large dirty backlog. This only reduces the
/// chance of hitting a stale cursor after flush; it is not required for
/// correctness.
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
@@ -102,6 +195,7 @@ impl TaskState {
min_refresh_duration: Duration,
max_timeout: Option<Duration>,
max_filter_num_per_query: usize,
prefer_short_incremental_cadence: bool,
) -> Instant {
// = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
let lower = time_window_size.unwrap_or(min_refresh_duration);
@@ -120,7 +214,20 @@ impl TaskState {
// if dirty time range is more than one query can handle, execute immediately
// to faster clean up dirty time windows
if cur_dirty_window_size < max_query_update_range {
self.last_update_time + next_duration
if prefer_short_incremental_cadence {
// Run incremental queries sooner than the normal time-window
// cadence, while still backing off by at least the previous
// query duration and respecting the max-timeout cap.
let next_duration = self.last_query_duration.max(min_refresh_duration);
let next_duration = if let Some(max_timeout) = max_timeout {
next_duration.min(max_timeout)
} else {
next_duration
};
self.last_update_time + next_duration
} else {
self.last_update_time + next_duration
}
} else {
// if dirty time windows can't be clean up in one query, execute immediately to faster
// clean up dirty time windows
@@ -314,7 +421,7 @@ impl DirtyTimeWindows {
);
self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
if self.windows.len() > self.max_filter_num_per_query {
if self.windows.len() > window_cnt {
let first_time_window = self.windows.first_key_value();
let last_time_window = self.windows.last_key_value();
@@ -323,7 +430,7 @@ impl DirtyTimeWindows {
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
task_ctx.config.flow_id,
self.windows.len(),
self.max_filter_num_per_query,
window_cnt,
task_ctx.config.time_window_expr,
task_ctx.config.expire_after,
first_time_window,
@@ -335,7 +442,7 @@ impl DirtyTimeWindows {
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
flow_id,
self.windows.len(),
self.max_filter_num_per_query,
window_cnt,
first_time_window,
last_time_window
)
@@ -590,6 +697,12 @@ enum ExecState {
Executing,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CheckpointMode {
FullSnapshot,
Incremental,
}
/// Filter Expression's information
#[derive(Debug, Clone)]
pub struct FilterExprInfo {
@@ -607,6 +720,28 @@ impl FilterExprInfo {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
})
}
pub fn predicate_for_col(
&self,
col_name: &str,
) -> Result<Option<datafusion_expr::Expr>, Error> {
use datafusion_common::Column;
use datafusion_expr::{Expr, lit};
let mut expr_lst = Vec::with_capacity(self.time_ranges.len());
for (start, end) in &self.time_ranges {
let lower = to_df_literal(*start)?;
let upper = to_df_literal(*end)?;
let filter_col = || Expr::Column(Column::new_unqualified(col_name));
expr_lst.push(
filter_col()
.gt_eq(lit(lower))
.and(filter_col().lt(lit(upper))),
);
}
Ok(expr_lst.into_iter().reduce(|a, b| a.or(b)))
}
}
#[cfg(test)]
@@ -851,4 +986,370 @@ mod test {
}
}
}
#[test]
fn test_task_state_checkpoint_mode_and_advancement() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert!(state.checkpoints().is_empty());
state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental);
assert_eq!(
state.checkpoints(),
&BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
);
state.mark_full_snapshot();
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert_eq!(
state.checkpoints(),
&BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
);
}
#[test]
fn test_disable_incremental_persists_full_snapshot_mode() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
assert!(!state.is_incremental_disabled());
// After disable, mode becomes FullSnapshot and flag is set.
state.disable_incremental();
assert!(state.is_incremental_disabled());
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
// `advance_checkpoints` will NOT transition to Incremental when disabled.
state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert_eq!(
state.checkpoints(),
&BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
);
// `mark_full_snapshot` does not re-enable incremental.
state.mark_full_snapshot();
assert!(state.is_incremental_disabled());
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
}
#[test]
fn test_full_snapshot_checkpoint_advancement_requires_participating_regions() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let state = TaskState::new(query_ctx, rx);
assert!(!state.can_advance_full_snapshot_checkpoints(&BTreeSet::new(), &HashMap::new()));
assert!(!state.can_advance_full_snapshot_checkpoints(
&BTreeSet::from([1_u64, 2_u64]),
&HashMap::from([(1_u64, 10_u64)]),
));
assert!(state.can_advance_full_snapshot_checkpoints(
&BTreeSet::from([1_u64, 2_u64]),
&HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]),
));
}
#[test]
fn test_incremental_checkpoint_advancement_requires_participation_alignment() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
assert!(
state.can_advance_incremental_checkpoints_with_participation(
&BTreeSet::from([1_u64]),
&HashMap::from([(1_u64, 11_u64)]),
)
);
assert!(
!state.can_advance_incremental_checkpoints_with_participation(
&BTreeSet::from([1_u64, 2_u64]),
&HashMap::from([(1_u64, 11_u64)]),
)
);
assert!(
!state.can_advance_incremental_checkpoints_with_participation(
&BTreeSet::from([3_u64]),
&HashMap::from([(3_u64, 11_u64)]),
)
);
assert!(
!state.can_advance_incremental_checkpoints_with_participation(
&BTreeSet::from([1_u64]),
&HashMap::from([(1_u64, 9_u64)]),
)
);
assert!(
state.can_advance_incremental_checkpoints_with_participation(
&BTreeSet::from([1_u64, 2_u64]),
&HashMap::from([(1_u64, 11_u64), (2_u64, 21_u64)]),
)
);
state.disable_incremental();
assert!(
!state.can_advance_incremental_checkpoints_with_participation(
&BTreeSet::from([1_u64, 2_u64]),
&HashMap::from([(1_u64, 12_u64), (2_u64, 22_u64)]),
)
);
}
#[test]
fn test_incremental_checkpoint_advancement_merges_participating_subset() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
state.advance_checkpoints(HashMap::from([
(1_u64, 10_u64),
(2_u64, 20_u64),
(3_u64, 30_u64),
]));
state.advance_incremental_checkpoints_with_participation(
&BTreeSet::from([1_u64, 3_u64]),
HashMap::from([(1_u64, 12_u64), (3_u64, 35_u64)]),
);
assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental);
assert_eq!(
state.checkpoints(),
&BTreeMap::from([(1_u64, 12_u64), (2_u64, 20_u64), (3_u64, 35_u64)])
);
}
#[test]
fn test_filter_expr_info_predicate_for_col_empty_ranges() {
let filter = FilterExprInfo {
expr: datafusion_expr::col("ts"),
col_name: "ts".to_string(),
time_ranges: vec![],
window_size: chrono::Duration::seconds(1),
};
assert!(filter.predicate_for_col("time_window").unwrap().is_none());
}
#[test]
fn test_filter_expr_info_predicate_for_col_single_range() {
let filter = FilterExprInfo {
expr: datafusion_expr::col("ts"),
col_name: "ts".to_string(),
time_ranges: vec![(Timestamp::new_second(0), Timestamp::new_second(1))],
window_size: chrono::Duration::seconds(1),
};
let predicate = filter.predicate_for_col("time_window").unwrap().unwrap();
let unparser = datafusion::sql::unparser::Unparser::default();
assert_eq!(
"((time_window >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:01' AS TIMESTAMP)))",
unparser.expr_to_sql(&predicate).unwrap().to_string()
);
}
#[test]
fn test_filter_expr_info_predicate_for_col_multiple_ranges() {
let filter = FilterExprInfo {
expr: datafusion_expr::col("ts"),
col_name: "ts".to_string(),
time_ranges: vec![
(Timestamp::new_second(0), Timestamp::new_second(1)),
(Timestamp::new_second(10), Timestamp::new_second(11)),
],
window_size: chrono::Duration::seconds(1),
};
let predicate = filter.predicate_for_col("time_window").unwrap().unwrap();
let unparser = datafusion::sql::unparser::Unparser::default();
assert_eq!(
"(((time_window >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:01' AS TIMESTAMP))) OR ((time_window >= CAST('1970-01-01 00:00:10' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:11' AS TIMESTAMP))))",
unparser.expr_to_sql(&predicate).unwrap().to_string()
);
}
/// Helper: create a `TaskState` whose `last_update_time` is a known duration in the past.
fn state_with_past_update(age: Duration) -> TaskState {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
state.last_update_time = Instant::now() - age;
state
}
#[test]
fn test_short_incremental_cadence_uses_min_refresh() {
// When prefer_short_incremental_cadence is true and dirty backlog is manageable,
// the next start time should be last_update_time + min_refresh (short cadence),
// ignoring the longer time_window_size.
let state = state_with_past_update(Duration::from_secs(10));
let time_window_size = Some(Duration::from_secs(60)); // large window
let min_refresh = Duration::from_secs(5);
let flow_id = 1;
let result = state.get_next_start_query_time(
flow_id,
&time_window_size,
min_refresh,
None,
20,
true, // prefer_short_incremental_cadence
);
// With short cadence, result should be last_update_time + min_refresh.
let expected = state.last_update_time + min_refresh;
assert_eq!(result, expected);
}
#[test]
fn test_short_incremental_cadence_respects_last_query_duration() {
let mut state = state_with_past_update(Duration::from_secs(10));
state.last_query_duration = Duration::from_secs(20);
let time_window_size = Some(Duration::from_secs(60));
let min_refresh = Duration::from_secs(5);
let flow_id = 1;
let result = state.get_next_start_query_time(
flow_id,
&time_window_size,
min_refresh,
None,
20,
true,
);
assert_eq!(result, state.last_update_time + state.last_query_duration);
}
#[test]
fn test_short_incremental_cadence_respects_max_timeout() {
let mut state = state_with_past_update(Duration::from_secs(10));
state.last_query_duration = Duration::from_secs(20);
let time_window_size = Some(Duration::from_secs(60));
let min_refresh = Duration::from_secs(30);
let max_timeout = Duration::from_secs(5);
let flow_id = 1;
let result = state.get_next_start_query_time(
flow_id,
&time_window_size,
min_refresh,
Some(max_timeout),
20,
true,
);
assert_eq!(result, state.last_update_time + max_timeout);
}
#[test]
fn test_full_snapshot_ignores_short_cadence() {
// When prefer_short_incremental_cadence is false (full snapshot mode),
// the normal long-cadence based on time_window_size applies.
let mut state = state_with_past_update(Duration::from_secs(10));
// Make last_query_duration small so the lower bound (time_window_size) dominates.
state.last_query_duration = Duration::from_secs(1);
let time_window_size = Some(Duration::from_secs(60)); // large window
let min_refresh = Duration::from_secs(5);
let flow_id = 1;
let result = state.get_next_start_query_time(
flow_id,
&time_window_size,
min_refresh,
None,
20,
false, // prefer_short_incremental_cadence = false
);
// With normal cadence, result should be last_update_time + time_window_size
// (since last_query_duration < time_window_size).
let expected = state.last_update_time + Duration::from_secs(60);
assert_eq!(result, expected);
}
#[test]
fn test_dirty_window_overflow_schedules_immediately_even_with_short_cadence() {
// Dirty-window overflow must always schedule immediately,
// regardless of prefer_short_incremental_cadence.
let mut state = state_with_past_update(Duration::from_secs(10));
// Create a very large dirty backlog.
state
.dirty_time_windows
.add_window(Timestamp::new_second(0), Some(Timestamp::new_second(3600)));
let time_window_size = Some(Duration::from_secs(1)); // tiny window => overflow
let min_refresh = Duration::from_secs(5);
let flow_id = 1;
// With short cadence flag.
let result = state.get_next_start_query_time(
flow_id,
&time_window_size,
min_refresh,
None,
1, // max 1 filter => tiny capacity
true,
);
assert!(
result <= Instant::now(),
"dirty overflow should schedule immediately"
);
// Without short cadence flag — same behavior.
let result2 = state.get_next_start_query_time(
flow_id,
&time_window_size,
min_refresh,
None,
1,
false,
);
assert!(
result2 <= Instant::now(),
"dirty overflow should schedule immediately"
);
}
#[test]
fn test_incremental_disabled_ignores_short_cadence() {
// When prefer_short_incremental_cadence is true but the dirty backlog is
// manageable, the short cadence is applied. This test verifies that the
// caller-side guard (checkpoint_mode + !is_incremental_disabled) controls
// whether short cadence is requested at all — when incremental is disabled,
// the flag is false, and the long cadence applies.
//
// This simulates the case where the caller computed
// prefer_short_incremental_cadence = false (e.g. incremental disabled
// or FullSnapshot mode), so the long cadence is used.
let mut state = state_with_past_update(Duration::from_secs(10));
state.last_query_duration = Duration::from_secs(1);
let time_window_size = Some(Duration::from_secs(60));
let min_refresh = Duration::from_secs(5);
let flow_id = 1;
let result = state.get_next_start_query_time(
flow_id,
&time_window_size,
min_refresh,
None,
20,
false, // prefer_short_incremental_cadence = false
);
// With normal cadence, result should be last_update_time + time_window_size.
let expected = state.last_update_time + Duration::from_secs(60);
assert_eq!(result, expected);
}
}

View File

@@ -30,6 +30,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
use datatypes::schema::Schema;
use query::QueryEngineRef;
use query::options::FLOW_INCREMENTAL_MODE;
use query::query_engine::DefaultSerializer;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -42,8 +43,9 @@ use tokio::sync::oneshot::error::TryRecvError;
use tokio::time::Instant;
use crate::batching_mode::BatchingModeOptions;
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::{DirtyTimeWindows, FilterExprInfo, TaskState};
use crate::batching_mode::checkpoint::checkpoint_mode_label;
use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc};
use crate::batching_mode::state::{CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState};
use crate::batching_mode::table_creator::{QueryType, create_table_with_expr};
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
@@ -62,6 +64,15 @@ use crate::metrics::{
};
use crate::{Error, FlowId};
mod ckpt;
mod inc;
/// Maximum number of dirty time-window predicates attached to one incremental
/// SQL query. This keeps generated OR filters bounded so Substrait encoding and
/// downstream planning remain predictable; if the backlog is larger, the flow
/// drains one capped batch and postpones checkpoint advancement to a later run.
const MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS: usize = 4096;
/// The task's config, immutable once created
#[derive(Clone)]
pub struct TaskConfig {
@@ -123,6 +134,7 @@ pub struct TaskArgs<'a> {
pub struct PlanInfo {
pub plan: LogicalPlan,
pub dirty_restore: DirtyRestore,
pub can_advance_checkpoints: bool,
}
pub enum DirtyRestore {
@@ -247,8 +259,17 @@ impl BatchingTask {
) -> Result<Option<(usize, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
debug!("Generate new query: {}", new_query.plan);
let dirty_filter = match &new_query.dirty_restore {
DirtyRestore::Scoped(f) => Some(f),
_ => None,
};
match self
.execute_logical_plan(frontend_client, &new_query.plan)
.execute_logical_plan(
frontend_client,
&new_query.plan,
dirty_filter,
new_query.can_advance_checkpoints,
)
.await
{
Ok(result) => Ok(result),
@@ -330,6 +351,7 @@ impl BatchingTask {
let insert_into_info = PlanInfo {
plan,
dirty_restore: new_query.dirty_restore,
can_advance_checkpoints: new_query.can_advance_checkpoints,
};
let insert_into =
match insert_into_info
@@ -349,6 +371,7 @@ impl BatchingTask {
Ok(Some(PlanInfo {
plan: insert_into,
dirty_restore: insert_into_info.dirty_restore,
can_advance_checkpoints: insert_into_info.can_advance_checkpoints,
}))
}
@@ -369,6 +392,8 @@ impl BatchingTask {
&self,
frontend_client: &Arc<FrontendClient>,
plan: &LogicalPlan,
dirty_filter: Option<&FilterExprInfo>,
can_advance_checkpoints: bool,
) -> Result<Option<(usize, Duration)>, Error> {
let instant = Instant::now();
let flow_id = self.config.flow_id;
@@ -398,8 +423,40 @@ impl BatchingTask {
})?
.data;
let mut peer_desc = None;
// For incremental-mode SQL queries, attempt to rewrite the delta aggregate
// plan into a safe delta-LEFT-JOIN-sink form before deciding on extensions.
let incremental_plan = if can_advance_checkpoints {
self.prepare_plan_for_incremental(&plan, dirty_filter)
.await?
} else {
None
};
let incremental_safe = incremental_plan.is_some();
let plan = incremental_plan.unwrap_or_else(|| plan.clone());
let extensions = self
.build_flow_query_extensions(incremental_safe, can_advance_checkpoints)
.await?;
let extension_refs = extensions
.iter()
.map(|(key, value)| (*key, value.as_str()))
.collect::<Vec<_>>();
let query_mode = if extensions
.iter()
.any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
{
CheckpointMode::Incremental
} else {
CheckpointMode::FullSnapshot
};
Self::record_query_mode(flow_id, query_mode);
debug!(
"Flow {flow_id} executing batching query with checkpoint_mode={}, extension_count={}",
checkpoint_mode_label(query_mode),
extensions.len()
);
let mut peer_desc = None;
let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
@@ -411,66 +468,83 @@ impl BatchingTask {
let message = DFLogicalSubstraitConvertor {}
.encode(&insert_plan, DefaultSerializer)
.context(SubstraitEncodeLogicalPlanSnafu)?;
api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
api::v1::QueryRequest {
query: Some(api::v1::query_request::Query::InsertIntoPlan(
api::v1::InsertIntoPlan {
table_name: Some(insert_to),
logical_plan: message.to_vec(),
},
)),
})
}
} else {
let message = DFLogicalSubstraitConvertor {}
.encode(&plan, DefaultSerializer)
.context(SubstraitEncodeLogicalPlanSnafu)?;
api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
api::v1::QueryRequest {
query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
})
}
};
frontend_client
.handle(req, catalog, schema, &mut peer_desc)
.query_with_terminal_metrics(catalog, schema, req, &extension_refs, &mut peer_desc)
.await
};
let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res {
debug!(
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
elapsed
);
METRIC_FLOW_ROWS
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
.inc_by(*affected_rows as _);
} else if let Err(err) = &res {
let peer_label = peer_desc
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| PeerDesc::default().to_string());
if let Err(err) = &res {
warn!(
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
peer_desc, elapsed, &plan
"Failed to execute Flow {flow_id} on frontend {peer_label}, result: {err:?}, elapsed: {:?} with query: {}",
elapsed, &plan
);
let decision = {
let mut state = self.state.write().unwrap();
let reason = Self::query_failure_reason(err);
Self::apply_query_failure_to_state(&mut state, elapsed, reason)
};
if let Some(decision) = decision {
Self::record_checkpoint_decision(flow_id, decision);
}
}
// record slow query
if elapsed >= self.config.batch_opts.slow_query_threshold {
warn!(
"Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
peer_desc, elapsed, &plan
"Flow {flow_id} on frontend {peer_label} executed for {:?} before complete, query: {}",
elapsed, &plan
);
let flow_id = flow_id.to_string();
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_desc.unwrap_or_default().to_string(),
])
.with_label_values(&[flow_id.as_str(), peer_label.as_str()])
.observe(elapsed.as_secs_f64());
}
self.state
.write()
.unwrap()
.after_query_exec(elapsed, res.is_ok());
let res = res?;
Ok(Some((res as usize, elapsed)))
let (affected_rows, _) = res.output.extract_rows_and_cost();
debug!(
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}, watermark: {:?}",
elapsed,
res.region_watermark_map()
);
METRIC_FLOW_ROWS
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
.inc_by(affected_rows as _);
{
let mut state = self.state.write().unwrap();
let decision = Self::apply_query_result_to_state(
&mut state,
&res,
elapsed,
can_advance_checkpoints,
);
Self::record_checkpoint_decision(flow_id, decision);
}
Ok(Some((affected_rows, elapsed)))
}
/// Restore dirty windows consumed by a failed query so they are retried on
@@ -563,8 +637,17 @@ impl BatchingTask {
};
let res = if let Some(new_query) = &new_query {
self.execute_logical_plan(&frontend_client, &new_query.plan)
.await
let dirty_filter = match &new_query.dirty_restore {
DirtyRestore::Scoped(f) => Some(f),
_ => None,
};
self.execute_logical_plan(
&frontend_client,
&new_query.plan,
dirty_filter,
new_query.can_advance_checkpoints,
)
.await
} else {
Ok(None)
};
@@ -592,12 +675,17 @@ impl BatchingTask {
.as_ref()
.and_then(|t| *t.time_window_size());
let prefer_short_incremental_cadence = state.checkpoint_mode()
== CheckpointMode::Incremental
&& !state.is_incremental_disabled();
state.get_next_start_query_time(
self.config.flow_id,
&time_window_size,
min_refresh,
Some(self.config.batch_opts.query_timeout),
self.config.batch_opts.experimental_max_filter_num_per_query,
prefer_short_incremental_cadence,
)
};
@@ -733,6 +821,7 @@ impl BatchingTask {
return Ok(Some(PlanInfo {
plan,
dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
can_advance_checkpoints: true,
}));
}
_ => {
@@ -769,6 +858,7 @@ impl BatchingTask {
return Ok(Some(PlanInfo {
plan,
dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
can_advance_checkpoints: true,
}));
}
};
@@ -799,20 +889,33 @@ impl BatchingTask {
),
})?;
let expr = self
.state
.write()
.unwrap()
.dirty_time_windows
.gen_filter_exprs(
let (expr, can_advance_checkpoints) = {
let mut state = self.state.write().unwrap();
let window_cnt = if state.checkpoint_mode() == CheckpointMode::Incremental
&& !state.is_incremental_disabled()
&& matches!(self.config.query_type, QueryType::Sql)
{
// Incremental scans are bounded by region sequence checkpoints,
// so the dirty-window filter only narrows sink-side/time-window
// work. Drain more windows than normal, but keep a hard cap to
// avoid building a huge OR filter after a long downtime. If
// windows remain, checkpoints won't advance this round.
MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS
} else {
max_window_cnt
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query)
};
let expr = state.dirty_time_windows.gen_filter_exprs(
&col_name,
Some(expire_lower_bound),
window_size,
max_window_cnt
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
window_cnt,
self.config.flow_id,
Some(self),
)?;
let can_advance_checkpoints = state.dirty_time_windows.is_empty();
(expr, can_advance_checkpoints)
};
let Some(expr) = expr else {
// no new data, hence no need to update
@@ -859,6 +962,7 @@ impl BatchingTask {
let info = PlanInfo {
plan: new_plan.clone(),
dirty_restore: DirtyRestore::Scoped(expr),
can_advance_checkpoints,
};
Ok(Some(info))

View File

@@ -0,0 +1,181 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use client::OutputWithMetrics;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use crate::batching_mode::checkpoint::{
FlowCheckpointDecision, FlowQueryFallbackReason, checkpoint_mode_label,
};
use crate::batching_mode::state::{CheckpointMode, TaskState};
use crate::batching_mode::task::BatchingTask;
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT,
};
use crate::{Error, FlowId};
impl BatchingTask {
pub(super) fn query_failure_reason(err: &Error) -> FlowQueryFallbackReason {
if err.status_code() == StatusCode::RequestOutdated {
FlowQueryFallbackReason::StaleCursor
} else {
FlowQueryFallbackReason::IncrementalQueryFailure
}
}
pub(super) fn apply_query_failure_to_state(
state: &mut TaskState,
elapsed: Duration,
reason: FlowQueryFallbackReason,
) -> Option<FlowCheckpointDecision> {
state.after_query_exec(elapsed, false);
let checkpoint_mode = state.checkpoint_mode();
if checkpoint_mode == CheckpointMode::Incremental {
state.mark_full_snapshot();
Some(FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason,
})
} else {
None
}
}
pub(super) fn apply_query_result_to_state(
state: &mut TaskState,
res: &OutputWithMetrics,
elapsed: Duration,
can_advance_checkpoints: bool,
) -> FlowCheckpointDecision {
state.after_query_exec(elapsed, true);
let checkpoint_mode = state.checkpoint_mode();
if !can_advance_checkpoints {
state.mark_full_snapshot();
return FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::DirtyBacklogPending,
};
}
if let (Some(participating_regions), Some(watermark_map)) =
(res.participating_regions(), res.region_watermark_map())
{
let can_advance = match checkpoint_mode {
CheckpointMode::FullSnapshot => state
.can_advance_full_snapshot_checkpoints(&participating_regions, &watermark_map),
CheckpointMode::Incremental => state
.can_advance_incremental_checkpoints_with_participation(
&participating_regions,
&watermark_map,
),
};
if can_advance {
let participating_region_count = participating_regions.len();
let watermark_count = watermark_map.len();
match checkpoint_mode {
CheckpointMode::FullSnapshot => {
state.advance_checkpoints(watermark_map);
if state.is_incremental_disabled() {
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::FullSnapshot,
reason: FlowQueryFallbackReason::IncrementalDisabled,
}
} else {
FlowCheckpointDecision::AdvancedFromFullSnapshot {
participating_regions: participating_region_count,
watermarks: watermark_count,
}
}
}
CheckpointMode::Incremental => {
state.advance_incremental_checkpoints_with_participation(
&participating_regions,
watermark_map,
);
FlowCheckpointDecision::AdvancedIncremental {
participating_regions: participating_region_count,
watermarks: watermark_count,
}
}
}
} else {
state.mark_full_snapshot();
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
}
}
} else {
state.mark_full_snapshot();
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::MissingRegionWatermark,
}
}
}
pub(super) fn record_checkpoint_decision(flow_id: FlowId, decision: FlowCheckpointDecision) {
let flow_id = flow_id.to_string();
METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT
.with_label_values(&[
flow_id.as_str(),
decision.mode_label(),
decision.decision_label(),
decision.reason_label(),
])
.inc();
match decision {
FlowCheckpointDecision::AdvancedFromFullSnapshot {
participating_regions,
watermarks,
} => {
info!(
"Flow {flow_id} switched to incremental mode after full snapshot, participating_regions={participating_regions}, watermarks={watermarks}"
);
}
FlowCheckpointDecision::AdvancedIncremental {
participating_regions,
watermarks,
} => {
debug!(
"Flow {flow_id} advanced incremental checkpoints, participating_regions={participating_regions}, watermarks={watermarks}"
);
}
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode,
reason,
} => {
warn!(
"Flow {flow_id} switched to full snapshot mode, previous_mode={}, reason={}",
checkpoint_mode_label(previous_mode),
reason.as_label()
);
}
}
}
pub(super) fn record_query_mode(flow_id: FlowId, mode: CheckpointMode) {
let flow_id = flow_id.to_string();
METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT
.with_label_values(&[flow_id.as_str(), checkpoint_mode_label(mode)])
.inc();
}
}

View File

@@ -0,0 +1,252 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_telemetry::debug;
use common_telemetry::tracing::warn;
use datafusion_expr::{DmlStatement, LogicalPlan};
use query::options::{
FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY,
FLOW_SINK_TABLE_ID,
};
use snafu::ResultExt;
use table::metadata::TableId;
use crate::Error;
use crate::batching_mode::incremental_filter::build_sink_dirty_time_window_filter_expr;
use crate::batching_mode::state::{CheckpointMode, FilterExprInfo};
use crate::batching_mode::table_creator::QueryType;
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::utils::{
analyze_incremental_aggregate_plan, get_table_info_df_schema,
rewrite_incremental_aggregate_with_sink_merge,
};
use crate::error::{ExternalSnafu, UnexpectedSnafu};
impl BatchingTask {
async fn sink_table_id(&self) -> Result<TableId, Error> {
let table = self
.config
.catalog_manager
.table(
&self.config.sink_table_name[0],
&self.config.sink_table_name[1],
&self.config.sink_table_name[2],
None,
)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.ok_or_else(|| {
UnexpectedSnafu {
reason: format!(
"Flow {} cannot build incremental extensions because sink table {:?} was not found",
self.config.flow_id, self.config.sink_table_name
),
}
.build()
})?;
Ok(table.table_info().table_id())
}
/// For incremental-mode SQL queries, attempt to prepare an executable plan
/// that is safe for incremental scan extensions.
///
/// Returns `Some(plan)` when incremental extensions are safe, and `None`
/// when the caller should execute the original plan without incremental
/// extensions. The returned plan may be either a rewritten
/// delta-LEFT-JOIN-sink merge plan or the original plan. In particular,
/// plain GROUP BY queries with no aggregate merge columns are incremental
/// safe without a rewrite, so they return `Some(original_plan)`.
pub(super) async fn prepare_plan_for_incremental(
&self,
plan: &LogicalPlan,
dirty_filter: Option<&FilterExprInfo>,
) -> Result<Option<LogicalPlan>, Error> {
let is_incremental_sql = {
let state = self.state.read().unwrap();
if state.is_incremental_disabled() {
return Ok(None);
}
state.checkpoint_mode() == CheckpointMode::Incremental
&& matches!(self.config.query_type, QueryType::Sql)
};
if !is_incremental_sql {
return Ok(None);
}
// Extract inner query plan from the DML wrapper.
// Non-DML or non-SQL plans bypass the rewrite and keep checkpoint mode;
// non-aggregate TQL or non-INSERT plans do not need incremental scan extensions.
let inner_plan = match plan {
LogicalPlan::Dml(dml) => dml.input.as_ref().clone(),
_ => return Ok(None),
};
// Analyze the plan for incremental rewritability.
// Incremental reads currently require aggregate / group-by plans that
// can be rewritten into a delta-left-join-sink merge. Non-aggregate SQL
// (projection, filter, or other non-aggregate shapes) stays full-snapshot
// until separately supported, and incremental mode is permanently
// disabled for this flow.
let Some(analysis) = analyze_incremental_aggregate_plan(&inner_plan)? else {
warn!(
"Flow {} incremental mode but plan is not an aggregate query; \
permanently disabling incremental for this flow",
self.config.flow_id
);
self.state.write().unwrap().disable_incremental();
return Ok(None);
};
if !analysis.unsupported_exprs.is_empty() {
warn!(
"Flow {} incremental aggregate contains unsupported expressions {:?}; \
permanently disabling incremental for this flow",
self.config.flow_id, analysis.unsupported_exprs
);
self.state.write().unwrap().disable_incremental();
return Ok(None);
}
// Plain GROUP BY without aggregate expressions has no values to
// merge between delta and sink. The incremental delta scan emits
// changed groups, and sink primary-key write semantics make this
// idempotent; no explicit left-join rewrite is needed.
if analysis.merge_columns.is_empty() {
return Ok(Some(plan.clone()));
}
// Fetch sink table for the merge rewrite.
// Transient errors (catalog, schema, filter, or rewrite) should not
// permanently disable incremental mode. Instead, we fall back to a
// full-snapshot plan for this round while keeping incremental retryable.
let sink_table = match get_table_info_df_schema(
self.config.catalog_manager.clone(),
self.config.sink_table_name.clone(),
)
.await
{
Ok((table, _)) => table,
Err(err) => {
warn!(
"Flow {} failed to fetch sink table for incremental rewrite; \
falling back to full snapshot for this round: {:?}",
self.config.flow_id, err
);
self.state.write().unwrap().mark_full_snapshot();
return Ok(None);
}
};
let sink_schema = sink_table.table_info().meta.schema.clone();
let sink_dirty_filter = match build_sink_dirty_time_window_filter_expr(
self.config.flow_id,
&analysis,
&sink_schema,
dirty_filter,
) {
Ok(filter) => filter,
Err(err) => {
warn!(
"Flow {} failed to build sink dirty time window filter; \
falling back to full snapshot for this round: {:?}",
self.config.flow_id, err
);
self.state.write().unwrap().mark_full_snapshot();
return Ok(None);
}
};
let rewritten_inner = match rewrite_incremental_aggregate_with_sink_merge(
&inner_plan,
&analysis,
sink_table,
&self.config.sink_table_name,
sink_dirty_filter,
)
.await
{
Ok(plan) => plan,
Err(err) => {
warn!(
"Flow {} failed to rewrite incremental aggregate with sink merge; \
falling back to full snapshot for this round: {:?}",
self.config.flow_id, err
);
self.state.write().unwrap().mark_full_snapshot();
return Ok(None);
}
};
// Reconstruct DML plan with the rewritten inner plan
let rewritten = match plan {
LogicalPlan::Dml(dml) => LogicalPlan::Dml(DmlStatement::new(
dml.table_name.clone(),
dml.target.clone(),
dml.op.clone(),
Arc::new(rewritten_inner),
)),
_ => unreachable!("already matched Dml above"),
};
debug!(
"Flow {} rewrote incremental SQL aggregate query with sink merge",
self.config.flow_id
);
Ok(Some(rewritten))
}
pub(super) async fn build_flow_query_extensions(
&self,
incremental_safe: bool,
can_advance_checkpoints: bool,
) -> Result<Vec<(&'static str, String)>, Error> {
let mut extensions = vec![("flow.return_region_seq", "true".to_string())];
let incremental_checkpoints_json = {
let state = self.state.read().unwrap();
if incremental_safe
&& can_advance_checkpoints
&& !state.is_incremental_disabled()
&& state.checkpoint_mode() == CheckpointMode::Incremental
&& !state.checkpoints().is_empty()
{
Some(serde_json::to_string(state.checkpoints()).map_err(|err| {
UnexpectedSnafu {
reason: format!("Failed to serialize checkpoint map: {err}"),
}
.build()
})?)
} else {
None
}
};
if let Some(checkpoints_json) = incremental_checkpoints_json {
let sink_table_id = self.sink_table_id().await?;
extensions.push((FLOW_SINK_TABLE_ID, sink_table_id.to_string()));
extensions.push((
FLOW_INCREMENTAL_MODE,
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
));
extensions.push((FLOW_INCREMENTAL_AFTER_SEQS, checkpoints_json));
}
Ok(extensions)
}
}

View File

@@ -12,18 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use catalog::RegisterTableRequest;
use catalog::memory::MemoryCatalogManager;
use client::OutputWithMetrics;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::RecordBatch;
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{UInt32Vector, VectorRef};
use datatypes::vectors::{TimestampMillisecondVector, UInt32Vector, VectorRef};
use pretty_assertions::assert_eq;
use query::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY};
use session::context::QueryContext;
use table::test_util::MemTable;
use super::*;
use crate::batching_mode::checkpoint::{
CHECKPOINT_DECISION_ADVANCE, CHECKPOINT_DECISION_FALLBACK, CHECKPOINT_REASON_NONE,
FlowCheckpointDecision, FlowQueryFallbackReason,
};
use crate::batching_mode::state::CheckpointMode;
use crate::batching_mode::time_window::find_time_window_expr;
use crate::test_utils::create_test_query_engine;
@@ -172,6 +183,34 @@ fn register_number_only_sink(query_engine: &QueryEngineRef, table_name: &str) {
memory_catalog.register_table_sync(request).unwrap();
}
fn register_auto_created_aggregate_sink(query_engine: &QueryEngineRef, table_name: &str) {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), true),
ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true),
ColumnSchema::new("update_at", CDT::timestamp_millisecond_datatype(), true),
]));
let columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice([1_u32])),
Arc::new(TimestampMillisecondVector::from_slice([0_i64])),
Arc::new(TimestampMillisecondVector::from_slice([0_i64])),
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let table = MemTable::table(table_name, recordbatch);
let request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id: 9002,
table,
};
let catalog_manager = query_engine.engine_state().catalog_manager();
let memory_catalog = catalog_manager
.as_any()
.downcast_ref::<MemoryCatalogManager>()
.unwrap();
memory_catalog.register_table_sync(request).unwrap();
}
fn dirty_marker() -> DirtyTimeWindows {
let mut dirty = DirtyTimeWindows::default();
dirty.set_dirty();
@@ -204,6 +243,7 @@ async fn assert_unscoped_failure_restore(
let unscoped_query = PlanInfo {
plan,
dirty_restore: DirtyRestore::Unscoped(consumed_dirty_windows),
can_advance_checkpoints: true,
};
task.handle_executed_query_failure(Some(&unscoped_query));
@@ -216,6 +256,442 @@ async fn assert_unscoped_failure_restore(
);
}
fn output_with_region_watermarks(
watermarks: impl IntoIterator<Item = (u64, Option<u64>)>,
) -> OutputWithMetrics {
let result = OutputWithMetrics::from_output(Output::new_with_affected_rows(0));
result.metrics.update(Some(RecordBatchMetrics {
region_watermarks: watermarks
.into_iter()
.map(|(region_id, watermark)| RegionWatermarkEntry {
region_id,
watermark,
})
.collect(),
..Default::default()
}));
result.metrics.mark_ready();
result
}
#[test]
fn test_apply_query_result_to_state_advances_full_snapshot_to_incremental() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
let result = output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]);
let decision = BatchingTask::apply_query_result_to_state(
&mut state,
&result,
std::time::Duration::from_millis(1),
true,
);
assert_eq!(
decision,
FlowCheckpointDecision::AdvancedFromFullSnapshot {
participating_regions: 2,
watermarks: 2,
}
);
assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental);
assert_eq!(
state.checkpoints(),
&BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
);
}
#[test]
fn test_apply_query_result_to_state_stays_full_snapshot_when_incremental_disabled() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
state.disable_incremental();
assert!(state.is_incremental_disabled());
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
let result = output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]);
let decision = BatchingTask::apply_query_result_to_state(
&mut state,
&result,
std::time::Duration::from_millis(1),
true,
);
// Should NOT claim advancement to incremental; should fallback with correct reason.
assert_eq!(
decision,
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::FullSnapshot,
reason: FlowQueryFallbackReason::IncrementalDisabled,
}
);
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert!(state.is_incremental_disabled());
// Checkpoints are still updated even if mode doesn't advance.
assert_eq!(
state.checkpoints(),
&BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
);
}
#[test]
fn test_apply_query_result_to_state_rejects_unproved_watermark() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
let result = output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, None)]);
let decision = BatchingTask::apply_query_result_to_state(
&mut state,
&result,
std::time::Duration::from_millis(1),
true,
);
assert_eq!(
decision,
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::FullSnapshot,
reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
}
);
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert!(state.checkpoints().is_empty());
}
#[test]
fn test_apply_query_result_to_state_reports_missing_watermark() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
let result = OutputWithMetrics::from_output(Output::new_with_affected_rows(0));
let decision = BatchingTask::apply_query_result_to_state(
&mut state,
&result,
std::time::Duration::from_millis(1),
true,
);
assert_eq!(
decision,
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::FullSnapshot,
reason: FlowQueryFallbackReason::MissingRegionWatermark,
}
);
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert!(state.checkpoints().is_empty());
}
#[test]
fn test_apply_query_result_to_state_advances_incremental_subset() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
state.advance_checkpoints(HashMap::from([
(1_u64, 10_u64),
(2_u64, 20_u64),
(3_u64, 30_u64),
]));
let result = output_with_region_watermarks([(1_u64, Some(12_u64)), (3_u64, Some(35_u64))]);
let decision = BatchingTask::apply_query_result_to_state(
&mut state,
&result,
std::time::Duration::from_millis(1),
true,
);
assert_eq!(
decision,
FlowCheckpointDecision::AdvancedIncremental {
participating_regions: 2,
watermarks: 2,
}
);
assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental);
assert_eq!(
state.checkpoints(),
&BTreeMap::from([(1_u64, 12_u64), (2_u64, 20_u64), (3_u64, 35_u64)])
);
}
#[test]
fn test_apply_query_result_to_state_blocks_full_snapshot_when_dirty_backlog_pending() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
let result = output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]);
let decision = BatchingTask::apply_query_result_to_state(
&mut state,
&result,
std::time::Duration::from_millis(1),
false,
);
assert_eq!(
decision,
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::FullSnapshot,
reason: FlowQueryFallbackReason::DirtyBacklogPending,
}
);
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert!(state.checkpoints().is_empty());
}
#[test]
fn test_apply_query_result_to_state_blocks_incremental_when_dirty_backlog_pending() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
let result = output_with_region_watermarks([(1_u64, Some(12_u64)), (2_u64, Some(25_u64))]);
let decision = BatchingTask::apply_query_result_to_state(
&mut state,
&result,
std::time::Duration::from_millis(1),
false,
);
assert_eq!(
decision,
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::Incremental,
reason: FlowQueryFallbackReason::DirtyBacklogPending,
}
);
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert_eq!(
state.checkpoints(),
&BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
);
}
#[test]
fn test_apply_query_failure_to_state_falls_back_from_incremental() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental);
let decision = BatchingTask::apply_query_failure_to_state(
&mut state,
std::time::Duration::from_millis(1),
FlowQueryFallbackReason::IncrementalQueryFailure,
);
assert_eq!(
decision,
Some(FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::Incremental,
reason: FlowQueryFallbackReason::IncrementalQueryFailure,
})
);
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert_eq!(
state.checkpoints(),
&BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
);
}
#[test]
fn test_apply_query_failure_to_state_keeps_full_snapshot_without_decision() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
let decision = BatchingTask::apply_query_failure_to_state(
&mut state,
std::time::Duration::from_millis(1),
FlowQueryFallbackReason::StaleCursor,
);
assert_eq!(decision, None);
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert!(state.checkpoints().is_empty());
}
#[test]
fn test_checkpoint_decision_labels_are_stable() {
let advance = FlowCheckpointDecision::AdvancedIncremental {
participating_regions: 1,
watermarks: 1,
};
let fallback = FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::Incremental,
reason: FlowQueryFallbackReason::StaleCursor,
};
assert_eq!(advance.mode_label(), "incremental");
assert_eq!(advance.decision_label(), CHECKPOINT_DECISION_ADVANCE);
assert_eq!(advance.reason_label(), CHECKPOINT_REASON_NONE);
assert_eq!(fallback.mode_label(), "incremental");
assert_eq!(fallback.decision_label(), CHECKPOINT_DECISION_FALLBACK);
assert_eq!(fallback.reason_label(), "stale_cursor");
assert_eq!(
FlowQueryFallbackReason::DirtyBacklogPending.as_label(),
"dirty_backlog_pending"
);
}
#[tokio::test]
async fn test_build_flow_query_extensions_switches_with_checkpoint_mode() {
let (task, _) = new_test_task_engine_and_plan_with_query(
"SELECT number, ts FROM numbers_with_ts",
"numbers_with_ts",
)
.await
.into_task_and_plan();
let extensions = task.build_flow_query_extensions(false, true).await.unwrap();
assert_eq!(
extensions,
vec![("flow.return_region_seq", "true".to_string())]
);
task.state
.write()
.unwrap()
.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
let extensions = task.build_flow_query_extensions(false, true).await.unwrap();
assert!(extensions.contains(&("flow.return_region_seq", "true".to_string())));
assert!(
!extensions
.iter()
.any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
);
assert!(
!extensions
.iter()
.any(|(key, _)| *key == FLOW_INCREMENTAL_AFTER_SEQS)
);
let extensions = task.build_flow_query_extensions(true, true).await.unwrap();
assert!(extensions.contains(&("flow.return_region_seq", "true".to_string())));
assert!(extensions.contains(&(
FLOW_INCREMENTAL_MODE,
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string()
)));
assert!(extensions.contains(&(
FLOW_INCREMENTAL_AFTER_SEQS,
serde_json::json!({"1": 10, "2": 20}).to_string(),
)));
let extensions = task.build_flow_query_extensions(true, false).await.unwrap();
assert!(extensions.contains(&("flow.return_region_seq", "true".to_string())));
assert!(
!extensions
.iter()
.any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
);
assert!(
!extensions
.iter()
.any(|(key, _)| *key == FLOW_INCREMENTAL_AFTER_SEQS)
);
task.state.write().unwrap().disable_incremental();
let extensions = task.build_flow_query_extensions(true, true).await.unwrap();
assert!(extensions.contains(&("flow.return_region_seq", "true".to_string())));
assert!(
!extensions
.iter()
.any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
);
assert!(
!extensions
.iter()
.any(|(key, _)| *key == FLOW_INCREMENTAL_AFTER_SEQS)
);
}
#[tokio::test]
async fn test_full_snapshot_scoped_plan_marks_checkpoint_advance_safe_only_after_backlog_drained() {
let TestTaskParts {
task,
query_engine,
..
} = new_time_window_test_task_with_query(
"SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number",
)
.await;
{
let mut state = task.state.write().unwrap();
state
.dirty_time_windows
.add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
state
.dirty_time_windows
.add_window(Timestamp::new_second(30), Some(Timestamp::new_second(35)));
}
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), false),
ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
]));
let first = task
.gen_query_with_time_window(query_engine.clone(), &sink_schema, &[], false, Some(1))
.await
.unwrap()
.unwrap();
assert!(!first.can_advance_checkpoints);
assert_eq!(task.state.read().unwrap().dirty_time_windows.len(), 1);
let second = task
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1))
.await
.unwrap()
.unwrap();
assert!(second.can_advance_checkpoints);
assert!(task.state.read().unwrap().dirty_time_windows.is_empty());
}
#[tokio::test]
async fn test_incremental_scoped_plan_consumes_all_dirty_windows_for_checkpoint_safety() {
let TestTaskParts {
task,
query_engine,
..
} = new_time_window_test_task_with_query(
"SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number",
)
.await;
{
let mut state = task.state.write().unwrap();
state.advance_checkpoints(HashMap::from([(1_u64, 10_u64)]));
state
.dirty_time_windows
.add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
state
.dirty_time_windows
.add_window(Timestamp::new_second(30), Some(Timestamp::new_second(35)));
}
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), false),
ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
]));
let plan = task
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1))
.await
.unwrap()
.unwrap();
assert!(plan.can_advance_checkpoints);
assert!(task.state.read().unwrap().dirty_time_windows.is_empty());
}
#[tokio::test]
async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_path() {
let (task, plan) = new_test_task_and_plan_with_missing_sink().await;
@@ -231,12 +707,293 @@ async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_pat
time_ranges: vec![(Timestamp::new_second(10), Timestamp::new_second(20))],
window_size: chrono::Duration::seconds(10),
}),
can_advance_checkpoints: true,
};
task.handle_executed_query_failure(Some(&scoped_query));
let state = task.state.read().unwrap();
assert_eq!(state.dirty_time_windows.len(), 1);
assert_eq!(
state.dirty_time_windows.window_size(),
std::time::Duration::from_secs(10)
);
}
#[tokio::test]
async fn test_prepare_plan_for_incremental_disables_on_non_aggregate() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let plan = sql_to_df_plan(
ctx.clone(),
query_engine.clone(),
"SELECT number, ts FROM numbers_with_ts",
true,
)
.await
.unwrap();
// Build a DML wrapper using a real sink table from the test engine.
let (sink_table, _) = get_table_info_df_schema(
query_engine.engine_state().catalog_manager().clone(),
[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
],
)
.await
.unwrap();
let table_provider = Arc::new(DfTableProviderAdapter::new(sink_table));
let table_source = Arc::new(DefaultTableSource::new(table_provider));
let dml_plan = LogicalPlan::Dml(DmlStatement::new(
datafusion_common::TableReference::bare("test"),
table_source,
WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
Arc::new(plan),
));
let (_tx, rx) = tokio::sync::oneshot::channel();
let task = BatchingTask::try_new(TaskArgs {
flow_id: 1,
query: "SELECT number, ts FROM numbers_with_ts",
plan: dml_plan.clone(),
time_window_expr: None,
expire_after: None,
sink_table_name: [
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
],
source_table_names: vec![[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
]],
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
flow_eval_interval: None,
})
.unwrap();
// Put the state into Incremental mode with checkpoints.
task.state
.write()
.unwrap()
.advance_checkpoints(HashMap::from([(1_u64, 10_u64)]));
assert_eq!(
task.state.read().unwrap().checkpoint_mode(),
CheckpointMode::Incremental
);
let incremental_plan = task
.prepare_plan_for_incremental(&dml_plan, None)
.await
.unwrap();
assert!(incremental_plan.is_none());
let state = task.state.read().unwrap();
assert!(state.is_incremental_disabled());
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
}
#[tokio::test]
async fn test_prepare_plan_for_incremental_falls_back_without_disable_on_rewrite_error() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let plan = sql_to_df_plan(
ctx.clone(),
query_engine.clone(),
"SELECT sum(number) AS total, ts FROM numbers_with_ts GROUP BY ts",
true,
)
.await
.unwrap();
let (sink_table, _) = get_table_info_df_schema(
query_engine.engine_state().catalog_manager().clone(),
[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
],
)
.await
.unwrap();
let table_provider = Arc::new(DfTableProviderAdapter::new(sink_table));
let table_source = Arc::new(DefaultTableSource::new(table_provider));
let dml_plan = LogicalPlan::Dml(DmlStatement::new(
datafusion_common::TableReference::bare("test"),
table_source,
WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
Arc::new(plan),
));
let (_tx, rx) = tokio::sync::oneshot::channel();
let task = BatchingTask::try_new(TaskArgs {
flow_id: 1,
query: "SELECT sum(number) AS total, ts FROM numbers_with_ts GROUP BY ts",
plan: dml_plan.clone(),
time_window_expr: None,
expire_after: None,
// The sink table exists, but does not have the rewritten aggregate
// output column `total`, so the rewrite fails deterministically.
sink_table_name: [
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
],
source_table_names: vec![[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
]],
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
flow_eval_interval: None,
})
.unwrap();
task.state
.write()
.unwrap()
.advance_checkpoints(HashMap::from([(1_u64, 10_u64)]));
assert_eq!(
task.state.read().unwrap().checkpoint_mode(),
CheckpointMode::Incremental
);
let incremental_plan = task
.prepare_plan_for_incremental(&dml_plan, None)
.await
.unwrap();
assert!(incremental_plan.is_none());
let state = task.state.read().unwrap();
assert!(!state.is_incremental_disabled());
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
}
#[tokio::test]
async fn test_prepare_plan_for_incremental_group_by_without_merge_columns_uses_original_plan() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let plan = sql_to_df_plan(
ctx.clone(),
query_engine.clone(),
"SELECT ts FROM numbers_with_ts GROUP BY ts",
true,
)
.await
.unwrap();
let (sink_table, _) = get_table_info_df_schema(
query_engine.engine_state().catalog_manager().clone(),
[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
],
)
.await
.unwrap();
let table_provider = Arc::new(DfTableProviderAdapter::new(sink_table));
let table_source = Arc::new(DefaultTableSource::new(table_provider));
let dml_plan = LogicalPlan::Dml(DmlStatement::new(
datafusion_common::TableReference::bare("test"),
table_source,
WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
Arc::new(plan),
));
let (_tx, rx) = tokio::sync::oneshot::channel();
let task = BatchingTask::try_new(TaskArgs {
flow_id: 1,
query: "SELECT ts FROM numbers_with_ts GROUP BY ts",
plan: dml_plan.clone(),
time_window_expr: None,
expire_after: None,
sink_table_name: [
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
],
source_table_names: vec![[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
]],
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
flow_eval_interval: None,
})
.unwrap();
task.state
.write()
.unwrap()
.advance_checkpoints(HashMap::from([(1_u64, 10_u64)]));
let incremental_plan = task
.prepare_plan_for_incremental(&dml_plan, None)
.await
.unwrap()
.expect("plain GROUP BY is incremental-safe without a rewrite");
assert_eq!(format!("{incremental_plan}"), format!("{dml_plan}"));
assert!(!task.state.read().unwrap().is_incremental_disabled());
}
#[tokio::test]
async fn test_auto_created_sql_aggregate_sink_reaches_incremental_safe() {
let sink_table = "auto_created_aggregate_sink";
let TestTaskParts {
task, query_engine, ..
} = new_test_task_engine_and_plan_with_query(
"SELECT max(number) AS number, ts FROM numbers_with_ts GROUP BY ts",
sink_table,
)
.await;
register_auto_created_aggregate_sink(&query_engine, sink_table);
task.state.write().unwrap().dirty_time_windows.set_dirty();
let plan_info = task
.gen_insert_plan(&query_engine, None)
.await
.unwrap()
.unwrap();
assert!(plan_info.can_advance_checkpoints);
task.state
.write()
.unwrap()
.advance_checkpoints(HashMap::from([(1_u64, 10_u64)]));
let incremental_plan = task
.prepare_plan_for_incremental(&plan_info.plan, None)
.await
.unwrap();
let incremental_safe = incremental_plan.is_some();
assert!(incremental_safe);
assert!(!task.state.read().unwrap().is_incremental_disabled());
let extensions = task
.build_flow_query_extensions(incremental_safe, plan_info.can_advance_checkpoints)
.await
.unwrap();
assert!(extensions.contains(&(
FLOW_INCREMENTAL_MODE,
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string()
)));
assert!(
extensions
.iter()
.any(|(key, _)| *key == FLOW_INCREMENTAL_AFTER_SEQS)
);
}
#[tokio::test]

View File

@@ -278,7 +278,7 @@ fn collect_output_projection_info(plan: &LogicalPlan) -> OutputProjectionInfo {
let mut col_names = Vec::new();
find_column_names(&alias.expr, &mut col_names);
match col_names.len() {
0 if matches!(alias.expr.as_ref(), Expr::Literal(_, _)) => {
0 if is_passthrough_output_column(&alias_name, alias.expr.as_ref()) => {
projection_info.literal_columns.insert(alias_name);
}
1 => {
@@ -315,10 +315,38 @@ fn collect_output_projection_info(plan: &LogicalPlan) -> OutputProjectionInfo {
}
}
if projection_info
.output_field_names
.iter()
.any(|name| name == AUTO_CREATED_PLACEHOLDER_TS_COL)
{
projection_info
.literal_columns
.insert(AUTO_CREATED_PLACEHOLDER_TS_COL.to_string());
}
projection_info.output_aliases = output_aliases;
projection_info
}
fn is_passthrough_output_column(alias_name: &str, expr: &Expr) -> bool {
matches!(expr, Expr::Literal(_, _))
|| match alias_name {
AUTO_CREATED_UPDATE_AT_TS_COL => expr == &datafusion::prelude::now(),
AUTO_CREATED_PLACEHOLDER_TS_COL => is_literal_or_cast_literal(expr),
_ => false,
}
}
fn is_literal_or_cast_literal(expr: &Expr) -> bool {
match expr {
Expr::Literal(_, _) => true,
Expr::Cast(cast) => is_literal_or_cast_literal(cast.expr.as_ref()),
Expr::TryCast(cast) => is_literal_or_cast_literal(cast.expr.as_ref()),
_ => false,
}
}
fn merge_op_for_aggregate_expr(aggr_expr: &Expr) -> Result<IncrementalAggregateMergeOp, String> {
let Some(aggr_func) = get_aggr_func(aggr_expr) else {
return Err(aggr_expr.to_string());
@@ -385,6 +413,11 @@ fn find_uncovered_output_fields(
!group_key_names.contains(*name)
&& !merge_column_names.contains(*name)
&& !projection_info.literal_columns.contains(*name)
// Auto-created sink columns injected by ColumnMatcherRewriter
// are not part of the original aggregate semantics and must
// not prevent incremental aggregate rewrites.
&& name.as_str() != AUTO_CREATED_UPDATE_AT_TS_COL
&& name.as_str() != AUTO_CREATED_PLACEHOLDER_TS_COL
})
.cloned()
.collect()
@@ -536,7 +569,8 @@ pub fn analyze_incremental_aggregate_plan(
///
/// ```text
/// delta = SELECT ts, number FROM <delta_plan> AS __flow_delta
/// sink = SELECT ts, number FROM <sink_table> AS __flow_sink
/// sink_scan = SELECT * FROM <sink_table> [WHERE <sink_dirty_filter>]
/// sink = SELECT ts, number FROM sink_scan AS __flow_sink
/// SELECT
/// CASE
/// WHEN __flow_sink.number IS NULL THEN __flow_delta.number
@@ -548,11 +582,17 @@ pub fn analyze_incremental_aggregate_plan(
/// LEFT JOIN sink
/// ON __flow_delta.ts IS NOT DISTINCT FROM __flow_sink.ts
/// ```
///
/// If `sink_dirty_filter` is provided, it is applied to the sink table scan
/// before projection, aliasing, and the left join. The predicate must reference
/// raw sink table columns structurally (unqualified), before the `__flow_sink`
/// alias exists.
pub async fn rewrite_incremental_aggregate_with_sink_merge(
delta_plan: &LogicalPlan,
analysis: &IncrementalAggregateAnalysis,
sink_table: TableRef,
sink_table_name: &TableName,
sink_dirty_filter: Option<Expr>,
) -> Result<LogicalPlan, Error> {
ensure!(
analysis.unsupported_exprs.is_empty(),
@@ -637,7 +677,22 @@ pub async fn rewrite_incremental_aggregate_with_sink_merge(
.cloned()
.map(unqualified_col)
.collect::<Vec<_>>();
let sink_selected = LogicalPlanBuilder::from(sink_scan)
let sink_input = if let Some(predicate) = sink_dirty_filter {
LogicalPlanBuilder::from(sink_scan)
.filter(predicate)
.with_context(|_| DatafusionSnafu {
context: "Failed to filter sink table scan for incremental sink merge".to_string(),
})?
.build()
.with_context(|_| DatafusionSnafu {
context: "Failed to build filtered sink plan for incremental sink merge"
.to_string(),
})?
} else {
sink_scan
};
let sink_selected = LogicalPlanBuilder::from(sink_input)
.project(sink_selected_exprs)
.with_context(|_| DatafusionSnafu {
context: "Failed to project sink table scan for incremental sink merge".to_string(),

View File

@@ -15,10 +15,13 @@
use std::sync::Arc;
use common_recordbatch::RecordBatch;
use common_time::Timestamp;
use datafusion_common::tree_node::TreeNode as _;
use datafusion_expr::GroupingSet;
use datatypes::prelude::{ConcreteDataType, Scalar, VectorRef};
use datatypes::prelude::{ConcreteDataType, MutableVector, Scalar, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::timestamp::TimestampMillisecond;
use datatypes::vectors::TimestampMillisecondVectorBuilder;
use pretty_assertions::assert_eq;
use query::query_engine::DefaultSerializer;
use session::context::QueryContext;
@@ -26,6 +29,7 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::test_util::MemTable;
use super::*;
use crate::batching_mode::state::FilterExprInfo;
use crate::test_utils::create_test_query_engine;
fn u32_table(table_name: &str, columns: Vec<&str>, rows: usize) -> TableRef {
@@ -50,6 +54,30 @@ fn empty_u32_table(table_name: &str, columns: Vec<&str>) -> TableRef {
u32_table(table_name, columns, 0)
}
fn time_window_u32_table(table_name: &str) -> TableRef {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(
"time_window",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
]));
let mut time_window_builder = TimestampMillisecondVectorBuilder::with_capacity(1);
time_window_builder.push(Some(TimestampMillisecond::new(0)));
let recordbatch = RecordBatch::new(
schema,
vec![
time_window_builder.to_vector_cloned(),
Arc::new(<u32 as Scalar>::VectorType::from_vec(vec![1])) as VectorRef,
],
)
.unwrap();
MemTable::table(table_name, recordbatch)
}
fn assert_same_logical_plan(actual: &LogicalPlan, expected: &LogicalPlan) {
assert_eq!(
format!("{}", expected.display_indent()),
@@ -84,6 +112,29 @@ fn expected_left_join_rewrite(
sink_selected_exprs: Vec<Expr>,
join_keys: (Vec<Column>, Vec<Column>),
projection_exprs: Vec<Expr>,
) -> LogicalPlan {
expected_left_join_rewrite_with_sink_filter(
delta_plan,
sink_table,
sink_table_name,
delta_selected_exprs,
sink_selected_exprs,
None,
join_keys,
projection_exprs,
)
}
#[allow(clippy::too_many_arguments)]
fn expected_left_join_rewrite_with_sink_filter(
delta_plan: &LogicalPlan,
sink_table: TableRef,
sink_table_name: &TableName,
delta_selected_exprs: Vec<Expr>,
sink_selected_exprs: Vec<Expr>,
sink_filter: Option<Expr>,
join_keys: (Vec<Column>, Vec<Column>),
projection_exprs: Vec<Expr>,
) -> LogicalPlan {
let delta_alias = "__flow_delta";
let sink_alias = "__flow_sink";
@@ -94,7 +145,17 @@ fn expected_left_join_rewrite(
.unwrap()
.build()
.unwrap();
let sink_selected = LogicalPlanBuilder::from(test_sink_scan(sink_table, sink_table_name))
let sink_scan = test_sink_scan(sink_table, sink_table_name);
let sink_input = if let Some(predicate) = sink_filter {
LogicalPlanBuilder::from(sink_scan)
.filter(predicate)
.unwrap()
.build()
.unwrap()
} else {
sink_scan
};
let sink_selected = LogicalPlanBuilder::from(sink_input)
.project(sink_selected_exprs)
.unwrap()
.alias(sink_alias)
@@ -576,6 +637,44 @@ async fn test_analyze_incremental_aggregate_plan_keeps_aliases_for_multiple_aggr
}));
}
#[tokio::test]
async fn test_analyze_incremental_aggregate_plan_allows_auto_created_sink_columns() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sql = format!(
"SELECT max(number) AS total, ts, now() AS {}, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS {} FROM numbers_with_ts GROUP BY ts",
AUTO_CREATED_UPDATE_AT_TS_COL, AUTO_CREATED_PLACEHOLDER_TS_COL
);
let plan = sql_to_df_plan(ctx, query_engine, &sql, false)
.await
.unwrap();
let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap();
assert!(
analysis.unsupported_exprs.is_empty(),
"auto-created sink columns should not disable incremental analysis: {:?}",
analysis.unsupported_exprs
);
assert!(
analysis
.literal_columns
.iter()
.any(|name| name == AUTO_CREATED_UPDATE_AT_TS_COL)
);
assert!(
analysis
.literal_columns
.iter()
.any(|name| name == AUTO_CREATED_PLACEHOLDER_TS_COL)
);
assert_eq!(analysis.merge_columns.len(), 1);
assert_eq!(analysis.merge_columns[0].output_field_name, "total");
assert_eq!(
analysis.merge_columns[0].merge_op,
IncrementalAggregateMergeOp::Max
);
}
#[tokio::test]
async fn test_analyze_incremental_aggregate_plan_allows_where_before_aggregate() {
let query_engine = create_test_query_engine();
@@ -641,6 +740,7 @@ async fn test_rewrite_incremental_aggregate_allows_alias_wrapped_scan() {
"public".to_string(),
"alias_wrapped_sink".to_string(),
],
None,
)
.await
.unwrap();
@@ -887,6 +987,7 @@ async fn test_analyze_incremental_aggregate_plan_allows_literal_outputs() {
&analysis,
sink_table.clone(),
&sink_table_name,
None,
)
.await
.unwrap();
@@ -975,6 +1076,7 @@ async fn test_rewrite_incremental_aggregate_preserves_non_identifier_aliases() {
"public".to_string(),
"non_identifier_alias_sink".to_string(),
],
None,
)
.await
.unwrap();
@@ -1161,6 +1263,7 @@ async fn test_rewrite_incremental_aggregate_with_left_join() {
&analysis,
sink_table.clone(),
&sink_table_name,
None,
)
.await
.unwrap();
@@ -1183,6 +1286,67 @@ async fn test_rewrite_incremental_aggregate_with_left_join() {
assert_same_logical_plan(&rewritten, &expected);
}
#[tokio::test]
async fn test_rewrite_incremental_aggregate_filters_sink_dirty_time_window() {
// This verifies the rewrite placement when callers supply an already
// inferred sink dirty-window predicate. The task-level inference rules are
// covered by `infer_sink_time_window_filter_col` tests in task.rs.
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sql = "SELECT max(number) AS number, date_bin(INTERVAL '1 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window";
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
.await
.unwrap();
let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap();
let sink_table = time_window_u32_table("time_window_sink");
let sink_table_name = [
"greptime".to_string(),
"public".to_string(),
"time_window_sink".to_string(),
];
let dirty_filter = FilterExprInfo {
expr: unqualified_col("ts"),
col_name: "ts".to_string(),
time_ranges: vec![(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
)],
window_size: chrono::Duration::seconds(1),
};
let sink_filter = dirty_filter
.predicate_for_col("time_window")
.unwrap()
.unwrap();
let rewritten = rewrite_incremental_aggregate_with_sink_merge(
&plan,
&analysis,
sink_table.clone(),
&sink_table_name,
Some(sink_filter.clone()),
)
.await
.unwrap();
let expected = expected_left_join_rewrite_with_sink_filter(
&plan,
sink_table,
&sink_table_name,
vec![unqualified_col("time_window"), unqualified_col("number")],
vec![unqualified_col("time_window"), unqualified_col("number")],
Some(sink_filter),
(
vec![qualified_column("__flow_delta", "time_window")],
vec![qualified_column("__flow_sink", "time_window")],
),
vec![
max_merge_expr("number"),
qualified_col("__flow_delta", "time_window").alias("time_window"),
],
);
assert_same_logical_plan(&rewritten, &expected);
}
#[tokio::test]
async fn test_analyze_incremental_aggregate_plan_rejects_global_aggregate() {
let query_engine = create_test_query_engine();
@@ -1230,6 +1394,7 @@ async fn test_rewrite_incremental_aggregate_rejects_empty_group_keys() {
&analysis,
sink_table,
&sink_table_name,
None,
)
.await
.unwrap_err();
@@ -1261,6 +1426,7 @@ async fn test_rewrite_incremental_aggregate_preserves_raw_aggregate_field_name()
&analysis,
sink_table.clone(),
&sink_table_name,
None,
)
.await
.unwrap();

View File

@@ -87,6 +87,20 @@ lazy_static! {
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_checkpoint_decision_count",
"flow batching checkpoint state-machine decisions",
&["flow_id", "mode", "decision", "reason"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_query_mode_count",
"flow batching query attempts by checkpoint mode",
&["flow_id", "mode"],
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(

View File

@@ -100,7 +100,7 @@ async fn test_incremental_query_stale_error() {
region_id,
ScanRequest {
memtable_min_sequence: Some(min_readable_seq),
sst_min_sequence: Some(u64::MAX),
skip_sst_files: true,
..Default::default()
},
)

View File

@@ -456,26 +456,28 @@ impl ScanRegion {
let ssts = &self.version.ssts;
let mut files = Vec::new();
for level in ssts.levels() {
for file in level.files.values() {
let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
(Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
// If the file's sequence is None (or actually is zero), it could mean the file
// is generated and added to the region "directly". In this case, its data should
// be considered as fresh as the memtable. So its sequence is treated greater than
// the min_sequence, whatever the value of min_sequence is. Hence the default
// "true" in this arm.
(Some(_), None) => true,
(None, _) => true,
};
if !self.request.skip_sst_files {
for level in ssts.levels() {
for file in level.files.values() {
let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
(Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
// If the file's sequence is None (or actually is zero), it could mean the file
// is generated and added to the region "directly". In this case, its data should
// be considered as fresh as the memtable. So its sequence is treated greater than
// the min_sequence, whatever the value of min_sequence is. Hence the default
// "true" in this arm.
(Some(_), None) => true,
(None, _) => true,
};
// Finds SST files in range.
if exceed_min_sequence && file_in_range(file, &time_range) {
files.push(file.clone());
// Finds SST files in range.
if exceed_min_sequence && file_in_range(file, &time_range) {
files.push(file.clone());
}
// There is no need to check and prune for file's sequence here as the sequence number is usually very new,
// unless the timing is too good, or the sequence number wouldn't be in file.
// and the batch will be filtered out by tree reader anyway.
}
// There is no need to check and prune for file's sequence here as the sequence number is usually very new,
// unless the timing is too good, or the sequence number wouldn't be in file.
// and the batch will be filtered out by tree reader anyway.
}
}
@@ -579,7 +581,9 @@ impl ScanRegion {
.with_vector_index_k(vector_index_k);
#[cfg(feature = "enterprise")]
let input = if let Some(provider) = self.extension_range_provider {
let input = if !self.request.skip_sst_files
&& let Some(provider) = self.extension_range_provider
{
let ranges = provider
.find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
.await?;

View File

@@ -51,6 +51,7 @@ use tracing::{Instrument, Span};
use crate::dist_plan::analyzer::AliasMapping;
use crate::dist_plan::analyzer::utils::patch_batch_timezone;
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
use crate::options::FlowQueryExtensions;
use crate::region_query::RegionQueryHandlerRef;
#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
@@ -481,6 +482,23 @@ impl MergeScanExec {
&self.regions
}
pub fn is_flow_sink_scan(&self) -> bool {
let Some(sink_table_id) =
FlowQueryExtensions::parse_flow_extensions(&self.query_ctx.extensions())
.ok()
.flatten()
.and_then(|extensions| extensions.sink_table_id)
else {
return false;
};
!self.regions.is_empty()
&& self
.regions
.iter()
.all(|region_id| region_id.table_id() == sink_table_id)
}
pub fn partition_count(&self) -> usize {
self.target_partition
}

View File

@@ -45,7 +45,7 @@ use table::metadata::{TableId, TableInfoRef};
use table::table::scan::RegionScanExec;
use crate::error::{GetRegionMetadataSnafu, Result};
use crate::options::FlowQueryExtensions;
use crate::options::{FlowIncrementalMode, FlowQueryExtensions};
/// Resolve to the given region (specified by [RegionId]) unconditionally.
#[derive(Clone, Debug)]
@@ -357,6 +357,8 @@ struct FlowScanDecision {
/// When present, this becomes the effective memtable upper bound and suppresses
/// binding a new snapshot on scan open.
memtable_max_sequence: Option<u64>,
/// Whether to skip SST files for memtable-only incremental source scans.
skip_sst_files: bool,
}
impl FlowScanDecision {
@@ -366,6 +368,7 @@ impl FlowScanDecision {
snapshot_on_scan: false,
memtable_min_sequence: None,
memtable_max_sequence: None,
skip_sst_files: false,
}
}
}
@@ -379,6 +382,7 @@ fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<Flo
snapshot_on_scan: false,
memtable_min_sequence: None,
memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()),
skip_sst_files: false,
});
};
@@ -403,12 +407,16 @@ fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<Flo
let memtable_max_sequence = query_ctx.get_snapshot(region_id.as_u64());
let skip_sst_files = apply_incremental
&& flow_extensions.incremental_mode == Some(FlowIncrementalMode::MemtableOnly);
Ok(FlowScanDecision {
is_sink_scan: false,
snapshot_on_scan: memtable_max_sequence.is_none()
&& flow_extensions.should_collect_region_watermark(),
memtable_min_sequence,
memtable_max_sequence,
skip_sst_files,
})
}
@@ -424,6 +432,7 @@ fn build_scan_request(
sst_min_sequence: (!decision.is_sink_scan)
.then(|| query_ctx.sst_min_sequence(region_id.as_u64()))
.flatten(),
skip_sst_files: decision.skip_sst_files,
snapshot_on_scan: decision.snapshot_on_scan,
memtable_min_sequence: decision.memtable_min_sequence,
memtable_max_sequence: decision.memtable_max_sequence,
@@ -841,6 +850,8 @@ mod tests {
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert_eq!(request.memtable_min_sequence, Some(55));
assert_eq!(request.sst_min_sequence, None);
assert!(request.skip_sst_files);
}
#[test]
@@ -875,6 +886,7 @@ mod tests {
assert_eq!(request.memtable_min_sequence, None);
assert_eq!(request.memtable_max_sequence, None);
assert_eq!(request.sst_min_sequence, None);
assert!(!request.skip_sst_files);
assert!(!request.snapshot_on_scan);
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -35,9 +35,6 @@ use crate::options::FlowQueryExtensions;
/// Intermediate merge state for one participating region while collecting
/// terminal correctness watermarks across merge-scan sub-stages.
enum MergeState {
/// The region participated, but no explicit watermark result has been seen
/// yet for this merge.
Participated,
/// At least one branch reported that this region cannot prove a safe
/// checkpoint watermark for the current query round.
Unproved,
@@ -256,7 +253,9 @@ fn collect_region_watermarks(plan: Arc<dyn ExecutionPlan>) -> Vec<RegionWatermar
let mut stack = vec![plan];
while let Some(plan) = stack.pop() {
if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>()
&& !merge_scan.is_flow_sink_scan()
{
merge_merge_scan_region_watermarks(
&mut merged,
merge_scan
@@ -281,8 +280,6 @@ fn collect_region_watermarks(plan: Arc<dyn ExecutionPlan>) -> Vec<RegionWatermar
///
/// | Current state | New entry | Result | Rationale |
/// |---------------|-----------------|-------------------|-----------|
/// | Participated | Proved(seq) | Proved(seq) | First proof for the region |
/// | Participated | Unproved | Unproved | One branch cannot prove → region is unsafe |
/// | Proved(old) | Proved(same) | Proved(old) | Convergent proof, keep |
/// | Proved(old) | Proved(diff) | Conflict([old,diff]) | Ambiguous → degrade to unproved |
/// | Unproved | _anything_ | Unproved | Already unsafe, stays unsafe |
@@ -300,15 +297,12 @@ fn merge_region_watermark_entries(
.entry(entry.region_id)
.and_modify(|existing| match entry.watermark {
None => match existing {
MergeState::Participated | MergeState::Proved(_) => {
MergeState::Proved(_) => {
*existing = MergeState::Unproved;
}
MergeState::Unproved | MergeState::Conflict { .. } => {}
},
Some(seq) => match existing {
MergeState::Participated => {
*existing = MergeState::Proved(seq);
}
MergeState::Unproved => {}
MergeState::Proved(existing_seq) if *existing_seq == seq => {}
MergeState::Proved(existing_seq) => {
@@ -336,16 +330,32 @@ fn merge_merge_scan_region_watermarks(
regions: impl IntoIterator<Item = u64>,
sub_stage_metrics: impl IntoIterator<Item = RecordBatchMetrics>,
) {
// Regions listed by MergeScanExec participated even when no sub-stage can
// prove a watermark. Keep them as explicit `None` entries so callers can
// distinguish unproved participation from non-participation.
for region_id in regions {
merged.entry(region_id).or_insert(MergeState::Participated);
}
let regions = regions.into_iter().collect::<Vec<_>>();
let mut proved_or_unproved_regions = BTreeSet::new();
for metrics in sub_stage_metrics {
proved_or_unproved_regions.extend(
metrics
.region_watermarks
.iter()
.map(|entry| entry.region_id),
);
merge_region_watermark_entries(merged, metrics.region_watermarks);
}
// Regions listed by a MergeScanExec participated even when no sub-stage can
// prove a watermark. Merge missing per-scan region entries as explicit
// `None` entries so an unproved participating branch vetoes any proof from
// another branch for the same region.
merge_region_watermark_entries(
merged,
regions
.into_iter()
.filter(|region_id| !proved_or_unproved_regions.contains(region_id))
.map(|region_id| RegionWatermarkEntry {
region_id,
watermark: None,
}),
);
}
fn finalize_region_watermarks(merged: BTreeMap<u64, MergeState>) -> Vec<RegionWatermarkEntry> {
@@ -354,7 +364,6 @@ fn finalize_region_watermarks(merged: BTreeMap<u64, MergeState>) -> Vec<RegionWa
.map(|(region_id, state)| RegionWatermarkEntry {
region_id,
watermark: match state {
MergeState::Participated => None,
MergeState::Unproved => None,
MergeState::Proved(seq) => Some(seq),
MergeState::Conflict { watermarks } => {
@@ -371,10 +380,35 @@ fn finalize_region_watermarks(merged: BTreeMap<u64, MergeState>) -> Vec<RegionWa
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::arrow::datatypes::Schema as ArrowSchema;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_expr::LogicalPlanBuilder;
use session::ReadPreference;
use session::context::QueryContextBuilder;
use store_api::storage::RegionId;
use table::table_name::TableName;
use super::*;
use crate::options::{FLOW_RETURN_REGION_SEQ, FLOW_SINK_TABLE_ID};
use crate::region_query::RegionQueryHandler;
struct NoopRegionQueryHandler;
#[async_trait]
impl RegionQueryHandler for NoopRegionQueryHandler {
async fn do_get(
&self,
_read_preference: ReadPreference,
_request: common_query::request::QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!("metrics tests should not execute remote queries")
}
}
fn metrics_with_region_watermarks(entries: &[(u64, Option<u64>)]) -> RecordBatchMetrics {
RecordBatchMetrics {
@@ -389,12 +423,66 @@ mod tests {
}
}
fn test_merge_scan_exec(table_id: u32, query_ctx: QueryContextRef) -> Arc<dyn ExecutionPlan> {
let session_state = SessionStateBuilder::new().with_default_features().build();
let plan = LogicalPlanBuilder::empty(false).build().unwrap();
let schema = ArrowSchema::empty();
Arc::new(
MergeScanExec::new(
&session_state,
TableName::new("greptime", "public", "test"),
vec![RegionId::new(table_id, 0)],
plan,
&schema,
Arc::new(NoopRegionQueryHandler),
query_ctx,
1,
BTreeMap::<String, BTreeSet<datafusion_common::Column>>::new(),
)
.unwrap(),
)
}
fn flow_query_ctx_with_sink_table_id(sink_table_id: u32) -> QueryContextRef {
Arc::new(
QueryContextBuilder::default()
.set_extension(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())
.set_extension(FLOW_SINK_TABLE_ID.to_string(), sink_table_id.to_string())
.build(),
)
}
#[test]
fn terminal_metrics_returns_none_without_merge_scan() {
let plan: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::new(ArrowSchema::empty())));
assert!(terminal_recordbatch_metrics_from_plan(plan).is_none());
}
#[test]
fn terminal_metrics_skip_flow_sink_merge_scan_regions() {
let query_ctx = flow_query_ctx_with_sink_table_id(42);
let plan = test_merge_scan_exec(42, query_ctx);
assert!(terminal_recordbatch_metrics_from_plan(plan).is_none());
}
#[test]
fn terminal_metrics_keep_source_merge_scan_regions_with_sink_extension() {
let query_ctx = flow_query_ctx_with_sink_table_id(42);
let plan = test_merge_scan_exec(43, query_ctx);
assert_eq!(
terminal_recordbatch_metrics_from_plan(plan)
.unwrap()
.region_watermarks,
vec![RegionWatermarkEntry {
region_id: RegionId::new(43, 0).as_u64(),
watermark: None,
}]
);
}
#[test]
fn merge_merge_scan_region_watermarks_marks_missing_watermarks_unproved() {
let mut merged = BTreeMap::new();
@@ -503,4 +591,44 @@ mod tests {
}]
);
}
#[test]
fn merge_merge_scan_region_watermarks_missing_branch_vetoes_proved_value() {
let mut merged = BTreeMap::new();
merge_merge_scan_region_watermarks(
&mut merged,
[9],
[metrics_with_region_watermarks(&[(9, Some(21))])],
);
merge_merge_scan_region_watermarks(&mut merged, [9], std::iter::empty());
assert_eq!(
finalize_region_watermarks(merged),
vec![RegionWatermarkEntry {
region_id: 9,
watermark: None,
}]
);
}
#[test]
fn merge_merge_scan_region_watermarks_missing_branch_vetoes_proved_value_regardless_of_order() {
let mut merged = BTreeMap::new();
merge_merge_scan_region_watermarks(&mut merged, [9], std::iter::empty());
merge_merge_scan_region_watermarks(
&mut merged,
[9],
[metrics_with_region_watermarks(&[(9, Some(21))])],
);
assert_eq!(
finalize_region_watermarks(merged),
vec![RegionWatermarkEntry {
region_id: 9,
watermark: None,
}]
);
}
}

View File

@@ -124,6 +124,9 @@ pub struct ScanRequest {
/// Optional constraint on the minimal sequence number in the SST files.
/// If set, only the SST files that contain sequences greater than this value will be scanned.
pub sst_min_sequence: Option<SequenceNumber>,
/// Whether to skip all SST files.
/// This is stronger than `sst_min_sequence` and also skips SST files without sequence metadata.
pub skip_sst_files: bool,
/// Whether to bind the effective snapshot upper bound when opening the scan.
pub snapshot_on_scan: bool,
/// Optional hint for the distribution of time-series data.
@@ -211,6 +214,14 @@ impl Display for ScanRequest {
sst_min_sequence
)?;
}
if self.skip_sst_files {
write!(
f,
"{}skip_sst_files: {}",
delimiter.as_str(),
self.skip_sst_files
)?;
}
if self.snapshot_on_scan {
write!(
f,
@@ -321,5 +332,11 @@ mod tests {
request.to_string(),
"ScanRequest { snapshot_on_scan: true }"
);
let request = ScanRequest {
skip_sst_files: true,
..Default::default()
};
assert_eq!(request.to_string(), "ScanRequest { skip_sst_files: true }");
}
}

View File

@@ -0,0 +1,119 @@
CREATE TABLE incremental_aggr_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
) WITH (
append_mode = 'true'
);
Affected Rows: 0
CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink AS
SELECT
sum(n) AS total,
min(n) AS min_n,
max(n) AS max_n,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
incremental_aggr_input
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO incremental_aggr_input VALUES
(1, 10, '2024-01-01 00:00:00'),
(2, 20, '2024-01-01 00:00:30');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('incremental_aggr_flow');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('incremental_aggr_flow') |
+-------------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 30 | 10 | 20 | 2024-01-01T00:00:00 |
+-------+-------+-------+---------------------+
-- Move already checkpointed source rows into SST. The next incremental run
-- must still read only the memtable delta and must not merge these old SST
-- rows again.
ADMIN FLUSH_TABLE('incremental_aggr_input');
+---------------------------------------------+
| ADMIN FLUSH_TABLE('incremental_aggr_input') |
+---------------------------------------------+
| 0 |
+---------------------------------------------+
-- Insert more rows into the same time window. An incremental-safe flow should
-- merge the delta aggregate with the existing sink aggregate state.
INSERT INTO incremental_aggr_input VALUES
(3, 30, '2024-01-01 00:00:15'),
(4, 40, '2024-01-01 00:00:45');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('incremental_aggr_flow');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('incremental_aggr_flow') |
+-------------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 100 | 10 | 40 | 2024-01-01T00:00:00 |
+-------+-------+-------+---------------------+
-- Insert a row into a new time window to cover append of a new aggregate key.
INSERT INTO incremental_aggr_input VALUES
(5, 50, '2024-01-01 00:01:00');
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('incremental_aggr_flow');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('incremental_aggr_flow') |
+-------------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 100 | 10 | 40 | 2024-01-01T00:00:00 |
| 50 | 50 | 50 | 2024-01-01T00:01:00 |
+-------+-------+-------+---------------------+
DROP FLOW incremental_aggr_flow;
Affected Rows: 0
DROP TABLE incremental_aggr_input;
Affected Rows: 0
DROP TABLE incremental_aggr_sink;
Affected Rows: 0

View File

@@ -0,0 +1,57 @@
CREATE TABLE incremental_aggr_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
) WITH (
append_mode = 'true'
);
CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink AS
SELECT
sum(n) AS total,
min(n) AS min_n,
max(n) AS max_n,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
incremental_aggr_input
GROUP BY
time_window;
INSERT INTO incremental_aggr_input VALUES
(1, 10, '2024-01-01 00:00:00'),
(2, 20, '2024-01-01 00:00:30');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('incremental_aggr_flow');
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
-- Move already checkpointed source rows into SST. The next incremental run
-- must still read only the memtable delta and must not merge these old SST
-- rows again.
ADMIN FLUSH_TABLE('incremental_aggr_input');
-- Insert more rows into the same time window. An incremental-safe flow should
-- merge the delta aggregate with the existing sink aggregate state.
INSERT INTO incremental_aggr_input VALUES
(3, 30, '2024-01-01 00:00:15'),
(4, 40, '2024-01-01 00:00:45');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('incremental_aggr_flow');
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
-- Insert a row into a new time window to cover append of a new aggregate key.
INSERT INTO incremental_aggr_input VALUES
(5, 50, '2024-01-01 00:01:00');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('incremental_aggr_flow');
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
DROP FLOW incremental_aggr_flow;
DROP TABLE incremental_aggr_input;
DROP TABLE incremental_aggr_sink;

View File

@@ -0,0 +1,132 @@
-- Validate that a flow performing an incremental aggregate read only reads memtable
-- data and does NOT re-read source rows that have already been flushed to SST after
-- a previous checkpoint.
CREATE TABLE flow_incr_memtable_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
) WITH (
append_mode = 'true'
);
Affected Rows: 0
CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink AS
SELECT
sum(n) AS total,
min(n) AS min_n,
max(n) AS max_n,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
flow_incr_memtable_input
GROUP BY
time_window;
Affected Rows: 0
-- ==== Phase 1: initial insert + checkpoint ====
INSERT INTO flow_incr_memtable_input VALUES
(1, 10, '2024-01-01 00:00:00'),
(2, 20, '2024-01-01 00:00:30');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_memtable');
+----------------------------------------+
| ADMIN FLUSH_FLOW('flow_incr_memtable') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 30 | 10 | 20 | 2024-01-01T00:00:00 |
+-------+-------+-------+---------------------+
-- ==== Phase 2: flush sink and source tables to SST ====
-- The next incremental run must still read the flushed sink aggregate state,
-- while skipping already-checkpointed source SST files.
ADMIN FLUSH_TABLE('flow_incr_memtable_sink');
+----------------------------------------------+
| ADMIN FLUSH_TABLE('flow_incr_memtable_sink') |
+----------------------------------------------+
| 0 |
+----------------------------------------------+
ADMIN FLUSH_TABLE('flow_incr_memtable_input');
+-----------------------------------------------+
| ADMIN FLUSH_TABLE('flow_incr_memtable_input') |
+-----------------------------------------------+
| 0 |
+-----------------------------------------------+
-- ==== Phase 3: empty incremental window ====
-- Flush the flow without inserting any new source rows to verify that
-- the incremental read correctly handles the case where no new memtable
-- data exists.
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_memtable');
+----------------------------------------+
| ADMIN FLUSH_FLOW('flow_incr_memtable') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 30 | 10 | 20 | 2024-01-01T00:00:00 |
+-------+-------+-------+---------------------+
-- ==== Phase 4: insert new delta within the same time window ====
INSERT INTO flow_incr_memtable_input VALUES
(3, 30, '2024-01-01 00:00:15'),
(4, 40, '2024-01-01 00:00:45');
Affected Rows: 2
-- ==== Phase 5: flush flow again (incremental read) ====
-- The flow must only read the new memtable delta and merge with the existing
-- sink aggregate. If it mistakenly re-reads the SST, the result will be
-- inflated (initial data counted twice).
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_memtable');
+----------------------------------------+
| ADMIN FLUSH_FLOW('flow_incr_memtable') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 100 | 10 | 40 | 2024-01-01T00:00:00 |
+-------+-------+-------+---------------------+
-- Clean up
DROP FLOW flow_incr_memtable;
Affected Rows: 0
DROP TABLE flow_incr_memtable_input;
Affected Rows: 0
DROP TABLE flow_incr_memtable_sink;
Affected Rows: 0

View File

@@ -0,0 +1,66 @@
-- Validate that a flow performing an incremental aggregate read only reads memtable
-- data and does NOT re-read source rows that have already been flushed to SST after
-- a previous checkpoint.
CREATE TABLE flow_incr_memtable_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
) WITH (
append_mode = 'true'
);
CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink AS
SELECT
sum(n) AS total,
min(n) AS min_n,
max(n) AS max_n,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
flow_incr_memtable_input
GROUP BY
time_window;
-- ==== Phase 1: initial insert + checkpoint ====
INSERT INTO flow_incr_memtable_input VALUES
(1, 10, '2024-01-01 00:00:00'),
(2, 20, '2024-01-01 00:00:30');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_memtable');
SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window;
-- ==== Phase 2: flush sink and source tables to SST ====
-- The next incremental run must still read the flushed sink aggregate state,
-- while skipping already-checkpointed source SST files.
ADMIN FLUSH_TABLE('flow_incr_memtable_sink');
ADMIN FLUSH_TABLE('flow_incr_memtable_input');
-- ==== Phase 3: empty incremental window ====
-- Flush the flow without inserting any new source rows to verify that
-- the incremental read correctly handles the case where no new memtable
-- data exists.
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_memtable');
SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window;
-- ==== Phase 4: insert new delta within the same time window ====
INSERT INTO flow_incr_memtable_input VALUES
(3, 30, '2024-01-01 00:00:15'),
(4, 40, '2024-01-01 00:00:45');
-- ==== Phase 5: flush flow again (incremental read) ====
-- The flow must only read the new memtable delta and merge with the existing
-- sink aggregate. If it mistakenly re-reads the SST, the result will be
-- inflated (initial data counted twice).
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_memtable');
SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window;
-- Clean up
DROP FLOW flow_incr_memtable;
DROP TABLE flow_incr_memtable_input;
DROP TABLE flow_incr_memtable_sink;

View File

@@ -0,0 +1,108 @@
-- Validate that a flow performing an incremental aggregate read on a
-- partitioned source table (multiple regions) only reads memtable data
-- and does NOT re-read source rows that have already been flushed to SST.
CREATE TABLE flow_incr_part_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
)
PARTITION ON COLUMNS (host_id) (
host_id < 3,
host_id >= 3
)
WITH (
append_mode = 'true'
);
Affected Rows: 0
CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink AS
SELECT
sum(n) AS total,
min(n) AS min_n,
max(n) AS max_n,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
flow_incr_part_input
GROUP BY
time_window;
Affected Rows: 0
-- ==== Phase 1: initial insert across both partitions ====
INSERT INTO flow_incr_part_input VALUES
(1, 10, '2024-01-01 00:00:00'),
(4, 20, '2024-01-01 00:00:30');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_part');
+------------------------------------+
| ADMIN FLUSH_FLOW('flow_incr_part') |
+------------------------------------+
| FLOW_FLUSHED |
+------------------------------------+
SELECT total, min_n, max_n, time_window FROM flow_incr_part_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 30 | 10 | 20 | 2024-01-01T00:00:00 |
+-------+-------+-------+---------------------+
-- ==== Phase 2: flush source table to SST ====
-- Move already checkpointed source rows into SST so the next incremental run
-- must skip them.
ADMIN FLUSH_TABLE('flow_incr_part_input');
+-------------------------------------------+
| ADMIN FLUSH_TABLE('flow_incr_part_input') |
+-------------------------------------------+
| 0 |
+-------------------------------------------+
-- ==== Phase 3: insert new delta across both partitions, same time window ====
INSERT INTO flow_incr_part_input VALUES
(2, 30, '2024-01-01 00:00:15'),
(3, 40, '2024-01-01 00:00:45');
Affected Rows: 2
-- ==== Phase 4: flush flow again (incremental read) ====
-- The flow must only read the new memtable delta from both regions and merge
-- with the existing sink aggregate. If it mistakenly re-reads the SST, the
-- result will be inflated (initial data counted twice).
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_part');
+------------------------------------+
| ADMIN FLUSH_FLOW('flow_incr_part') |
+------------------------------------+
| FLOW_FLUSHED |
+------------------------------------+
SELECT total, min_n, max_n, time_window FROM flow_incr_part_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 100 | 10 | 40 | 2024-01-01T00:00:00 |
+-------+-------+-------+---------------------+
-- Clean up
DROP FLOW flow_incr_part;
Affected Rows: 0
DROP TABLE flow_incr_part_input;
Affected Rows: 0
DROP TABLE flow_incr_part_sink;
Affected Rows: 0

View File

@@ -0,0 +1,61 @@
-- Validate that a flow performing an incremental aggregate read on a
-- partitioned source table (multiple regions) only reads memtable data
-- and does NOT re-read source rows that have already been flushed to SST.
CREATE TABLE flow_incr_part_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
)
PARTITION ON COLUMNS (host_id) (
host_id < 3,
host_id >= 3
)
WITH (
append_mode = 'true'
);
CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink AS
SELECT
sum(n) AS total,
min(n) AS min_n,
max(n) AS max_n,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
flow_incr_part_input
GROUP BY
time_window;
-- ==== Phase 1: initial insert across both partitions ====
INSERT INTO flow_incr_part_input VALUES
(1, 10, '2024-01-01 00:00:00'),
(4, 20, '2024-01-01 00:00:30');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_part');
SELECT total, min_n, max_n, time_window FROM flow_incr_part_sink ORDER BY time_window;
-- ==== Phase 2: flush source table to SST ====
-- Move already checkpointed source rows into SST so the next incremental run
-- must skip them.
ADMIN FLUSH_TABLE('flow_incr_part_input');
-- ==== Phase 3: insert new delta across both partitions, same time window ====
INSERT INTO flow_incr_part_input VALUES
(2, 30, '2024-01-01 00:00:15'),
(3, 40, '2024-01-01 00:00:45');
-- ==== Phase 4: flush flow again (incremental read) ====
-- The flow must only read the new memtable delta from both regions and merge
-- with the existing sink aggregate. If it mistakenly re-reads the SST, the
-- result will be inflated (initial data counted twice).
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_incr_part');
SELECT total, min_n, max_n, time_window FROM flow_incr_part_sink ORDER BY time_window;
-- Clean up
DROP FLOW flow_incr_part;
DROP TABLE flow_incr_part_input;
DROP TABLE flow_incr_part_sink;