diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs index 47b3054f54..580762a142 100644 --- a/src/flow/src/batching_mode.rs +++ b/src/flow/src/batching_mode.rs @@ -23,6 +23,7 @@ use session::ReadPreference; mod checkpoint; pub(crate) mod engine; pub(crate) mod frontend_client; +mod incremental_filter; mod state; mod table_creator; mod task; diff --git a/src/flow/src/batching_mode/checkpoint.rs b/src/flow/src/batching_mode/checkpoint.rs index c359360dc5..7341d3d9e7 100644 --- a/src/flow/src/batching_mode/checkpoint.rs +++ b/src/flow/src/batching_mode/checkpoint.rs @@ -12,12 +12,116 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[allow(dead_code)] +use crate::batching_mode::state::CheckpointMode; + +pub(super) const CHECKPOINT_DECISION_ADVANCE: &str = "advance"; +pub(super) const CHECKPOINT_DECISION_FALLBACK: &str = "fallback"; +pub(super) const CHECKPOINT_REASON_NONE: &str = "none"; + +/// Why the task fell back to full snapshot mode. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(super) enum CheckpointMode { - /// Full-snapshot reads over the source tables. - FullSnapshot, - /// Incremental reads driven by explicitly emitted incremental scan - /// extensions. - Incremental, +pub(super) enum FlowQueryFallbackReason { + /// The query result did not include a region-watermark map at all. + MissingRegionWatermark, + /// Some participating regions could not prove safe advancement against + /// both the returned watermarks and the checkpoint map. + IncompleteRegionWatermark, + /// The query only covered part of the dirty backlog, so global checkpoints + /// cannot advance yet. Incremental SQL drains all dirty windows before + /// checkpoint advancement; this primarily protects scoped full-snapshot + /// runs capped by the per-query dirty-window limit. + DirtyBacklogPending, + /// The datanode detected a stale incremental cursor and the Flow + /// must recompute from scratch. + StaleCursor, + /// A non-stale-cursor query failure; the Flow resets to full snapshot + /// to avoid cascading errors. + IncrementalQueryFailure, + /// Incremental mode has been permanently disabled for this Flow + /// (e.g. because the query shape is not incrementally safe). + IncrementalDisabled, +} + +impl FlowQueryFallbackReason { + pub(super) fn as_label(self) -> &'static str { + match self { + Self::MissingRegionWatermark => "missing_region_watermark", + Self::IncompleteRegionWatermark => "incomplete_region_watermark", + Self::DirtyBacklogPending => "dirty_backlog_pending", + Self::StaleCursor => "stale_cursor", + Self::IncrementalQueryFailure => "incremental_query_failure", + Self::IncrementalDisabled => "incremental_disabled", + } + } +} + +/// Decision produced by `BatchingTask::apply_query_result_to_state` after +/// each Flow query execution. Describes whether the task advanced its +/// checkpoint state or fell back to full snapshot, and why. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum FlowCheckpointDecision { + /// FullSnapshot → Incremental transition. + /// + /// The query exercised every participating region, all returned valid + /// watermarks, and the checkpoint map was populated from scratch. + /// Subsequent executions will use incremental after-seqs. + AdvancedFromFullSnapshot { + participating_regions: usize, + watermarks: usize, + }, + /// Existing Incremental → Incremental (in-place advancement). + /// + /// A subset of participating regions advanced their watermarks. The + /// task stays in incremental mode with an updated checkpoint map. + AdvancedIncremental { + participating_regions: usize, + watermarks: usize, + }, + /// Any mode → FullSnapshot. + /// + /// Watermark information was incomplete, a participating region was + /// absent from the existing checkpoint map, the task has permanently + /// disabled incremental mode, or the query itself failed. The task + /// resets to full snapshot semantics for the next execution. + FallbackToFullSnapshot { + previous_mode: CheckpointMode, + reason: FlowQueryFallbackReason, + }, +} + +impl FlowCheckpointDecision { + pub(super) fn mode_label(self) -> &'static str { + match self { + Self::AdvancedFromFullSnapshot { .. } => { + checkpoint_mode_label(CheckpointMode::FullSnapshot) + } + Self::AdvancedIncremental { .. } => checkpoint_mode_label(CheckpointMode::Incremental), + Self::FallbackToFullSnapshot { previous_mode, .. } => { + checkpoint_mode_label(previous_mode) + } + } + } + + pub(super) fn decision_label(self) -> &'static str { + match self { + Self::AdvancedFromFullSnapshot { .. } | Self::AdvancedIncremental { .. } => { + CHECKPOINT_DECISION_ADVANCE + } + Self::FallbackToFullSnapshot { .. } => CHECKPOINT_DECISION_FALLBACK, + } + } + + pub(super) fn reason_label(self) -> &'static str { + match self { + Self::FallbackToFullSnapshot { reason, .. } => reason.as_label(), + _ => CHECKPOINT_REASON_NONE, + } + } +} + +pub(super) fn checkpoint_mode_label(mode: CheckpointMode) -> &'static str { + match mode { + CheckpointMode::FullSnapshot => "full_snapshot", + CheckpointMode::Incremental => "incremental", + } } diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 8fbffc5a38..c6194d96b3 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -340,12 +340,13 @@ impl FrontendClient { } } - pub async fn query_with_terminal_metrics( + pub(crate) async fn query_with_terminal_metrics( &self, catalog: &str, schema: &str, request: QueryRequest, extensions: &[(&str, &str)], + peer_desc: &mut Option, ) -> Result { let flow_extensions = build_flow_extensions(extensions)?; match self { @@ -358,6 +359,9 @@ impl FrontendClient { (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()), ]; let db = self.get_random_active_frontend(catalog, schema).await?; + *peer_desc = Some(PeerDesc::Dist { + peer: db.peer.clone(), + }); db.database .query_with_terminal_metrics_and_flow_extensions(request, &hints, extensions) .await @@ -368,6 +372,7 @@ impl FrontendClient { database_client, query, } => { + *peer_desc = Some(PeerDesc::Standalone); let mut extensions_map = HashMap::from([( QUERY_PARALLELISM_HINT.to_string(), query.parallelism.to_string(), @@ -556,21 +561,24 @@ fn terminal_recordbatch_metrics_from_snapshots( } /// Describe a peer of frontend -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub(crate) enum PeerDesc { + /// The query failed before a frontend peer was selected. + #[default] + Unknown, /// Distributed mode's frontend peer address Dist { /// frontend peer address peer: Peer, }, /// Standalone mode - #[default] Standalone, } impl std::fmt::Display for PeerDesc { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { + PeerDesc::Unknown => write!(f, "unknown"), PeerDesc::Dist { peer } => write!(f, "{}", peer.addr), PeerDesc::Standalone => write!(f, "standalone"), } @@ -768,6 +776,7 @@ mod tests { let handler: Arc = Arc::new(MetricsHandler); let client = FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); + let mut peer_desc = None; let result = client .query_with_terminal_metrics( @@ -777,9 +786,11 @@ mod tests { query: Some(Query::Sql("select 1".to_string())), }, &[], + &mut peer_desc, ) .await .unwrap(); + assert!(matches!(peer_desc, Some(PeerDesc::Standalone))); let terminal_metrics = result.metrics.clone(); assert!(!result.metrics.is_ready()); @@ -802,6 +813,7 @@ mod tests { let handler: Arc = Arc::new(ExtensionAwareHandler); let client = FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); + let mut peer_desc = None; let result = client .query_with_terminal_metrics( @@ -811,9 +823,11 @@ mod tests { query: Some(Query::Sql("insert into t select 1".to_string())), }, &[("flow.return_region_seq", "true")], + &mut peer_desc, ) .await .unwrap(); + assert!(matches!(peer_desc, Some(PeerDesc::Standalone))); assert!(result.metrics.is_ready()); assert!(result.region_watermark_map().is_none()); @@ -824,6 +838,7 @@ mod tests { let handler: Arc = Arc::new(SnapshotBindingHandler); let client = FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); + let mut peer_desc = None; let result = client .query_with_terminal_metrics( @@ -833,9 +848,11 @@ mod tests { query: Some(Query::Sql("insert into t select * from src".to_string())), }, &[("flow.return_region_seq", "true")], + &mut peer_desc, ) .await .unwrap(); + assert!(matches!(peer_desc, Some(PeerDesc::Standalone))); assert!(result.metrics.is_ready()); assert_eq!( @@ -849,6 +866,7 @@ mod tests { let handler: Arc = Arc::new(NoopHandler); let client = FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); + let mut peer_desc = None; let err = client .query_with_terminal_metrics( @@ -858,6 +876,7 @@ mod tests { query: Some(Query::Sql("select 1".to_string())), }, &[("flow.return_region_seq", "not-a-bool")], + &mut peer_desc, ) .await .unwrap_err(); diff --git a/src/flow/src/batching_mode/incremental_filter.rs b/src/flow/src/batching_mode/incremental_filter.rs new file mode 100644 index 0000000000..ddc58d0378 --- /dev/null +++ b/src/flow/src/batching_mode/incremental_filter.rs @@ -0,0 +1,222 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::tracing::debug; +use datafusion_expr::Expr; +use datatypes::schema::Schema; + +use crate::batching_mode::state::FilterExprInfo; +use crate::batching_mode::utils::IncrementalAggregateAnalysis; +use crate::{Error, FlowId}; + +pub(super) fn build_sink_dirty_time_window_filter_expr( + flow_id: FlowId, + analysis: &IncrementalAggregateAnalysis, + sink_schema: &Schema, + dirty_filter: Option<&FilterExprInfo>, +) -> Result, Error> { + let Some(dirty_filter) = dirty_filter else { + return Ok(None); + }; + + let Some(sink_filter_col) = + infer_sink_time_window_filter_col(flow_id, analysis, sink_schema, dirty_filter) + else { + return Ok(None); + }; + + dirty_filter.predicate_for_col(&sink_filter_col) +} + +fn infer_sink_time_window_filter_col( + flow_id: FlowId, + analysis: &IncrementalAggregateAnalysis, + sink_schema: &Schema, + dirty_filter: &FilterExprInfo, +) -> Option { + if analysis.group_key_names.is_empty() { + return None; + } + + let is_timestamp_group_key = |name: &str| { + analysis.group_key_names.iter().any(|key| key == name) + && sink_schema + .column_schema_by_name(name) + .is_some_and(|col| col.data_type.is_timestamp()) + }; + + if is_timestamp_group_key(&dirty_filter.col_name) { + return Some(dirty_filter.col_name.clone()); + } + + let candidates = analysis + .group_key_names + .iter() + .filter(|name| is_timestamp_group_key(name)) + .cloned() + .collect::>(); + + match candidates.as_slice() { + [name] => Some(name.clone()), + [] => { + debug!( + "Flow {} cannot infer sink dirty-window filter column: no timestamp group key in {:?}", + flow_id, analysis.group_key_names + ); + None + } + _ => { + debug!( + "Flow {} cannot infer sink dirty-window filter column: ambiguous timestamp group keys {:?}", + flow_id, candidates + ); + None + } + } +} + +#[cfg(test)] +mod test { + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use pretty_assertions::assert_eq; + + use super::*; + use crate::adapter::AUTO_CREATED_UPDATE_AT_TS_COL; + use crate::batching_mode::state::FilterExprInfo; + use crate::batching_mode::utils::IncrementalAggregateAnalysis; + + fn test_analysis_with_group_keys(group_key_names: Vec<&str>) -> IncrementalAggregateAnalysis { + IncrementalAggregateAnalysis { + group_key_names: group_key_names + .into_iter() + .map(|name| name.to_string()) + .collect(), + merge_columns: vec![], + literal_columns: vec![], + output_field_names: vec![], + unsupported_exprs: vec![], + } + } + + fn test_dirty_filter(col_name: &str) -> FilterExprInfo { + FilterExprInfo { + expr: datafusion_expr::col(col_name), + col_name: col_name.to_string(), + time_ranges: vec![], + window_size: chrono::Duration::seconds(1), + } + } + + fn test_sink_schema(columns: Vec<(&str, ConcreteDataType)>) -> Schema { + Schema::new( + columns + .into_iter() + .map(|(name, data_type)| ColumnSchema::new(name, data_type, true)) + .collect(), + ) + } + + #[test] + fn test_infer_sink_time_window_filter_col_uses_matching_source_group_key() { + let analysis = test_analysis_with_group_keys(vec!["ts", "host"]); + let sink_schema = test_sink_schema(vec![ + ("ts", ConcreteDataType::timestamp_millisecond_datatype()), + ("host", ConcreteDataType::string_datatype()), + ]); + let dirty_filter = test_dirty_filter("ts"); + + assert_eq!( + Some("ts".to_string()), + infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) + ); + } + + #[test] + fn test_infer_sink_time_window_filter_col_uses_unique_timestamp_group_key() { + let analysis = test_analysis_with_group_keys(vec!["host", "time_window"]); + let sink_schema = test_sink_schema(vec![ + ("host", ConcreteDataType::string_datatype()), + ( + "time_window", + ConcreteDataType::timestamp_millisecond_datatype(), + ), + ( + AUTO_CREATED_UPDATE_AT_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + ]); + let dirty_filter = test_dirty_filter("ts"); + + assert_eq!( + Some("time_window".to_string()), + infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) + ); + } + + #[test] + fn test_infer_sink_time_window_filter_col_skips_global_aggregate() { + let analysis = test_analysis_with_group_keys(vec![]); + let sink_schema = test_sink_schema(vec![ + ("number", ConcreteDataType::uint32_datatype()), + ( + "time_window", + ConcreteDataType::timestamp_millisecond_datatype(), + ), + ]); + let dirty_filter = test_dirty_filter("ts"); + + assert_eq!( + None, + infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) + ); + } + + #[test] + fn test_infer_sink_time_window_filter_col_skips_without_timestamp_group_key() { + let analysis = test_analysis_with_group_keys(vec!["host", "device"]); + let sink_schema = test_sink_schema(vec![ + ("host", ConcreteDataType::string_datatype()), + ("device", ConcreteDataType::string_datatype()), + ( + AUTO_CREATED_UPDATE_AT_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + ]); + let dirty_filter = test_dirty_filter("ts"); + + assert_eq!( + None, + infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) + ); + } + + #[test] + fn test_infer_sink_time_window_filter_col_skips_ambiguous_timestamp_group_keys() { + let analysis = test_analysis_with_group_keys(vec!["ts", "time_window"]); + let sink_schema = test_sink_schema(vec![ + ("ts", ConcreteDataType::timestamp_millisecond_datatype()), + ( + "time_window", + ConcreteDataType::timestamp_millisecond_datatype(), + ), + ]); + let dirty_filter = test_dirty_filter("source_ts"); + + assert_eq!( + None, + infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) + ); + } +} diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index c470a2f2c1..42b71a4ec7 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -15,7 +15,7 @@ //! Batching mode task state, which changes frequently //! -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::time::Duration; use common_telemetry::debug; @@ -50,6 +50,14 @@ pub struct TaskState { /// Dirty Time windows need to be updated /// mapping of `start -> end` and non-overlapping pub(crate) dirty_time_windows: DirtyTimeWindows, + checkpoint_mode: CheckpointMode, + /// Region id -> last consumed watermark sequence. Incremental scans use + /// this as the next lower sequence bound for each source region. + checkpoints: BTreeMap, + /// Once set, the task will never attempt incremental mode again. + /// Set when the flow's query shape is deterministically incompatible + /// with incremental execution (e.g. unsupported aggregate expressions). + incremental_disabled: bool, exec_state: ExecState, /// Shutdown receiver pub(crate) shutdown_rx: oneshot::Receiver<()>, @@ -64,6 +72,9 @@ impl TaskState { last_query_duration: Duration::from_secs(0), last_exec_time_millis: None, dirty_time_windows: Default::default(), + checkpoint_mode: CheckpointMode::FullSnapshot, + checkpoints: Default::default(), + incremental_disabled: false, exec_state: ExecState::Idle, shutdown_rx, task_handle: None, @@ -85,6 +96,84 @@ impl TaskState { self.last_exec_time_millis } + pub fn checkpoint_mode(&self) -> CheckpointMode { + self.checkpoint_mode + } + + pub fn checkpoints(&self) -> &BTreeMap { + &self.checkpoints + } + + pub fn is_incremental_disabled(&self) -> bool { + self.incremental_disabled + } + + /// Permanently disable incremental mode for this task and + /// immediately fall back to full snapshot for the current cycle. + pub fn disable_incremental(&mut self) { + self.incremental_disabled = true; + self.mark_full_snapshot(); + } + + pub fn mark_full_snapshot(&mut self) { + self.checkpoint_mode = CheckpointMode::FullSnapshot; + } + + pub fn advance_checkpoints(&mut self, watermark_map: HashMap) { + self.checkpoints = watermark_map.into_iter().collect(); + if !self.incremental_disabled { + self.checkpoint_mode = CheckpointMode::Incremental; + } + } + + pub fn advance_incremental_checkpoints_with_participation( + &mut self, + participating_regions: &BTreeSet, + watermark_map: HashMap, + ) { + for region_id in participating_regions { + if let Some(seq) = watermark_map.get(region_id) { + self.checkpoints.insert(*region_id, *seq); + } + } + if !self.incremental_disabled { + self.checkpoint_mode = CheckpointMode::Incremental; + } + } + + pub fn can_advance_full_snapshot_checkpoints( + &self, + participating_regions: &BTreeSet, + watermark_map: &HashMap, + ) -> bool { + !participating_regions.is_empty() + && participating_regions.len() == watermark_map.len() + && participating_regions + .iter() + .all(|region_id| watermark_map.contains_key(region_id)) + } + + pub fn can_advance_incremental_checkpoints_with_participation( + &self, + participating_regions: &BTreeSet, + watermark_map: &HashMap, + ) -> bool { + !self.incremental_disabled + && !self.checkpoints.is_empty() + && !participating_regions.is_empty() + && participating_regions.len() == watermark_map.len() + && participating_regions + .iter() + .all(|region_id| self.checkpoints.contains_key(region_id)) + && participating_regions.iter().all(|region_id| { + let checkpoint = self.checkpoints.get(region_id); + watermark_map + .get(region_id) + .zip(checkpoint) + .is_some_and(|(seq, checkpoint)| seq >= checkpoint) + }) + } + /// Compute the next query delay based on the time window size or the last query duration. /// Aiming to avoid too frequent queries. But also not too long delay. /// @@ -95,6 +184,10 @@ impl TaskState { /// if current the dirty time range is longer than one query can handle, /// execute immediately to faster clean up dirty time windows. /// + /// If `prefer_short_incremental_cadence` is true, run incremental queries + /// more often when there is no large dirty backlog. This only reduces the + /// chance of hitting a stale cursor after flush; it is not required for + /// correctness. pub fn get_next_start_query_time( &self, flow_id: FlowId, @@ -102,6 +195,7 @@ impl TaskState { min_refresh_duration: Duration, max_timeout: Option, max_filter_num_per_query: usize, + prefer_short_incremental_cadence: bool, ) -> Instant { // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout` let lower = time_window_size.unwrap_or(min_refresh_duration); @@ -120,7 +214,20 @@ impl TaskState { // if dirty time range is more than one query can handle, execute immediately // to faster clean up dirty time windows if cur_dirty_window_size < max_query_update_range { - self.last_update_time + next_duration + if prefer_short_incremental_cadence { + // Run incremental queries sooner than the normal time-window + // cadence, while still backing off by at least the previous + // query duration and respecting the max-timeout cap. + let next_duration = self.last_query_duration.max(min_refresh_duration); + let next_duration = if let Some(max_timeout) = max_timeout { + next_duration.min(max_timeout) + } else { + next_duration + }; + self.last_update_time + next_duration + } else { + self.last_update_time + next_duration + } } else { // if dirty time windows can't be clean up in one query, execute immediately to faster // clean up dirty time windows @@ -314,7 +421,7 @@ impl DirtyTimeWindows { ); self.merge_dirty_time_windows(window_size, expire_lower_bound)?; - if self.windows.len() > self.max_filter_num_per_query { + if self.windows.len() > window_cnt { let first_time_window = self.windows.first_key_value(); let last_time_window = self.windows.last_key_value(); @@ -323,7 +430,7 @@ impl DirtyTimeWindows { "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}", task_ctx.config.flow_id, self.windows.len(), - self.max_filter_num_per_query, + window_cnt, task_ctx.config.time_window_expr, task_ctx.config.expire_after, first_time_window, @@ -335,7 +442,7 @@ impl DirtyTimeWindows { "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}", flow_id, self.windows.len(), - self.max_filter_num_per_query, + window_cnt, first_time_window, last_time_window ) @@ -590,6 +697,12 @@ enum ExecState { Executing, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CheckpointMode { + FullSnapshot, + Incremental, +} + /// Filter Expression's information #[derive(Debug, Clone)] pub struct FilterExprInfo { @@ -607,6 +720,28 @@ impl FilterExprInfo { acc + end.sub(start).unwrap_or(chrono::Duration::zero()) }) } + + pub fn predicate_for_col( + &self, + col_name: &str, + ) -> Result, Error> { + use datafusion_common::Column; + use datafusion_expr::{Expr, lit}; + + let mut expr_lst = Vec::with_capacity(self.time_ranges.len()); + for (start, end) in &self.time_ranges { + let lower = to_df_literal(*start)?; + let upper = to_df_literal(*end)?; + let filter_col = || Expr::Column(Column::new_unqualified(col_name)); + expr_lst.push( + filter_col() + .gt_eq(lit(lower)) + .and(filter_col().lt(lit(upper))), + ); + } + + Ok(expr_lst.into_iter().reduce(|a, b| a.or(b))) + } } #[cfg(test)] @@ -851,4 +986,370 @@ mod test { } } } + + #[test] + fn test_task_state_checkpoint_mode_and_advancement() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.checkpoints().is_empty()); + + state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])); + assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental); + assert_eq!( + state.checkpoints(), + &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + ); + + state.mark_full_snapshot(); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert_eq!( + state.checkpoints(), + &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + ); + } + + #[test] + fn test_disable_incremental_persists_full_snapshot_mode() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + + assert!(!state.is_incremental_disabled()); + + // After disable, mode becomes FullSnapshot and flag is set. + state.disable_incremental(); + assert!(state.is_incremental_disabled()); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + + // `advance_checkpoints` will NOT transition to Incremental when disabled. + state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert_eq!( + state.checkpoints(), + &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + ); + + // `mark_full_snapshot` does not re-enable incremental. + state.mark_full_snapshot(); + assert!(state.is_incremental_disabled()); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + } + + #[test] + fn test_full_snapshot_checkpoint_advancement_requires_participating_regions() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let state = TaskState::new(query_ctx, rx); + + assert!(!state.can_advance_full_snapshot_checkpoints(&BTreeSet::new(), &HashMap::new())); + assert!(!state.can_advance_full_snapshot_checkpoints( + &BTreeSet::from([1_u64, 2_u64]), + &HashMap::from([(1_u64, 10_u64)]), + )); + assert!(state.can_advance_full_snapshot_checkpoints( + &BTreeSet::from([1_u64, 2_u64]), + &HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]), + )); + } + + #[test] + fn test_incremental_checkpoint_advancement_requires_participation_alignment() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])); + + assert!( + state.can_advance_incremental_checkpoints_with_participation( + &BTreeSet::from([1_u64]), + &HashMap::from([(1_u64, 11_u64)]), + ) + ); + assert!( + !state.can_advance_incremental_checkpoints_with_participation( + &BTreeSet::from([1_u64, 2_u64]), + &HashMap::from([(1_u64, 11_u64)]), + ) + ); + assert!( + !state.can_advance_incremental_checkpoints_with_participation( + &BTreeSet::from([3_u64]), + &HashMap::from([(3_u64, 11_u64)]), + ) + ); + assert!( + !state.can_advance_incremental_checkpoints_with_participation( + &BTreeSet::from([1_u64]), + &HashMap::from([(1_u64, 9_u64)]), + ) + ); + assert!( + state.can_advance_incremental_checkpoints_with_participation( + &BTreeSet::from([1_u64, 2_u64]), + &HashMap::from([(1_u64, 11_u64), (2_u64, 21_u64)]), + ) + ); + + state.disable_incremental(); + assert!( + !state.can_advance_incremental_checkpoints_with_participation( + &BTreeSet::from([1_u64, 2_u64]), + &HashMap::from([(1_u64, 12_u64), (2_u64, 22_u64)]), + ) + ); + } + + #[test] + fn test_incremental_checkpoint_advancement_merges_participating_subset() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state.advance_checkpoints(HashMap::from([ + (1_u64, 10_u64), + (2_u64, 20_u64), + (3_u64, 30_u64), + ])); + + state.advance_incremental_checkpoints_with_participation( + &BTreeSet::from([1_u64, 3_u64]), + HashMap::from([(1_u64, 12_u64), (3_u64, 35_u64)]), + ); + + assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental); + assert_eq!( + state.checkpoints(), + &BTreeMap::from([(1_u64, 12_u64), (2_u64, 20_u64), (3_u64, 35_u64)]) + ); + } + + #[test] + fn test_filter_expr_info_predicate_for_col_empty_ranges() { + let filter = FilterExprInfo { + expr: datafusion_expr::col("ts"), + col_name: "ts".to_string(), + time_ranges: vec![], + window_size: chrono::Duration::seconds(1), + }; + + assert!(filter.predicate_for_col("time_window").unwrap().is_none()); + } + + #[test] + fn test_filter_expr_info_predicate_for_col_single_range() { + let filter = FilterExprInfo { + expr: datafusion_expr::col("ts"), + col_name: "ts".to_string(), + time_ranges: vec![(Timestamp::new_second(0), Timestamp::new_second(1))], + window_size: chrono::Duration::seconds(1), + }; + + let predicate = filter.predicate_for_col("time_window").unwrap().unwrap(); + let unparser = datafusion::sql::unparser::Unparser::default(); + assert_eq!( + "((time_window >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:01' AS TIMESTAMP)))", + unparser.expr_to_sql(&predicate).unwrap().to_string() + ); + } + + #[test] + fn test_filter_expr_info_predicate_for_col_multiple_ranges() { + let filter = FilterExprInfo { + expr: datafusion_expr::col("ts"), + col_name: "ts".to_string(), + time_ranges: vec![ + (Timestamp::new_second(0), Timestamp::new_second(1)), + (Timestamp::new_second(10), Timestamp::new_second(11)), + ], + window_size: chrono::Duration::seconds(1), + }; + + let predicate = filter.predicate_for_col("time_window").unwrap().unwrap(); + let unparser = datafusion::sql::unparser::Unparser::default(); + assert_eq!( + "(((time_window >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:01' AS TIMESTAMP))) OR ((time_window >= CAST('1970-01-01 00:00:10' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:11' AS TIMESTAMP))))", + unparser.expr_to_sql(&predicate).unwrap().to_string() + ); + } + + /// Helper: create a `TaskState` whose `last_update_time` is a known duration in the past. + fn state_with_past_update(age: Duration) -> TaskState { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state.last_update_time = Instant::now() - age; + state + } + + #[test] + fn test_short_incremental_cadence_uses_min_refresh() { + // When prefer_short_incremental_cadence is true and dirty backlog is manageable, + // the next start time should be last_update_time + min_refresh (short cadence), + // ignoring the longer time_window_size. + let state = state_with_past_update(Duration::from_secs(10)); + + let time_window_size = Some(Duration::from_secs(60)); // large window + let min_refresh = Duration::from_secs(5); + let flow_id = 1; + + let result = state.get_next_start_query_time( + flow_id, + &time_window_size, + min_refresh, + None, + 20, + true, // prefer_short_incremental_cadence + ); + + // With short cadence, result should be last_update_time + min_refresh. + let expected = state.last_update_time + min_refresh; + assert_eq!(result, expected); + } + + #[test] + fn test_short_incremental_cadence_respects_last_query_duration() { + let mut state = state_with_past_update(Duration::from_secs(10)); + state.last_query_duration = Duration::from_secs(20); + + let time_window_size = Some(Duration::from_secs(60)); + let min_refresh = Duration::from_secs(5); + let flow_id = 1; + + let result = state.get_next_start_query_time( + flow_id, + &time_window_size, + min_refresh, + None, + 20, + true, + ); + + assert_eq!(result, state.last_update_time + state.last_query_duration); + } + + #[test] + fn test_short_incremental_cadence_respects_max_timeout() { + let mut state = state_with_past_update(Duration::from_secs(10)); + state.last_query_duration = Duration::from_secs(20); + + let time_window_size = Some(Duration::from_secs(60)); + let min_refresh = Duration::from_secs(30); + let max_timeout = Duration::from_secs(5); + let flow_id = 1; + + let result = state.get_next_start_query_time( + flow_id, + &time_window_size, + min_refresh, + Some(max_timeout), + 20, + true, + ); + + assert_eq!(result, state.last_update_time + max_timeout); + } + + #[test] + fn test_full_snapshot_ignores_short_cadence() { + // When prefer_short_incremental_cadence is false (full snapshot mode), + // the normal long-cadence based on time_window_size applies. + let mut state = state_with_past_update(Duration::from_secs(10)); + // Make last_query_duration small so the lower bound (time_window_size) dominates. + state.last_query_duration = Duration::from_secs(1); + + let time_window_size = Some(Duration::from_secs(60)); // large window + let min_refresh = Duration::from_secs(5); + let flow_id = 1; + + let result = state.get_next_start_query_time( + flow_id, + &time_window_size, + min_refresh, + None, + 20, + false, // prefer_short_incremental_cadence = false + ); + + // With normal cadence, result should be last_update_time + time_window_size + // (since last_query_duration < time_window_size). + let expected = state.last_update_time + Duration::from_secs(60); + assert_eq!(result, expected); + } + + #[test] + fn test_dirty_window_overflow_schedules_immediately_even_with_short_cadence() { + // Dirty-window overflow must always schedule immediately, + // regardless of prefer_short_incremental_cadence. + let mut state = state_with_past_update(Duration::from_secs(10)); + // Create a very large dirty backlog. + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(3600))); + + let time_window_size = Some(Duration::from_secs(1)); // tiny window => overflow + let min_refresh = Duration::from_secs(5); + let flow_id = 1; + + // With short cadence flag. + let result = state.get_next_start_query_time( + flow_id, + &time_window_size, + min_refresh, + None, + 1, // max 1 filter => tiny capacity + true, + ); + assert!( + result <= Instant::now(), + "dirty overflow should schedule immediately" + ); + + // Without short cadence flag — same behavior. + let result2 = state.get_next_start_query_time( + flow_id, + &time_window_size, + min_refresh, + None, + 1, + false, + ); + assert!( + result2 <= Instant::now(), + "dirty overflow should schedule immediately" + ); + } + + #[test] + fn test_incremental_disabled_ignores_short_cadence() { + // When prefer_short_incremental_cadence is true but the dirty backlog is + // manageable, the short cadence is applied. This test verifies that the + // caller-side guard (checkpoint_mode + !is_incremental_disabled) controls + // whether short cadence is requested at all — when incremental is disabled, + // the flag is false, and the long cadence applies. + // + // This simulates the case where the caller computed + // prefer_short_incremental_cadence = false (e.g. incremental disabled + // or FullSnapshot mode), so the long cadence is used. + let mut state = state_with_past_update(Duration::from_secs(10)); + state.last_query_duration = Duration::from_secs(1); + + let time_window_size = Some(Duration::from_secs(60)); + let min_refresh = Duration::from_secs(5); + let flow_id = 1; + + let result = state.get_next_start_query_time( + flow_id, + &time_window_size, + min_refresh, + None, + 20, + false, // prefer_short_incremental_cadence = false + ); + + // With normal cadence, result should be last_update_time + time_window_size. + let expected = state.last_update_time + Duration::from_secs(60); + assert_eq!(result, expected); + } } diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 51a417c0d1..3cdf7899a6 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -30,6 +30,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp}; use datatypes::schema::Schema; use query::QueryEngineRef; +use query::options::FLOW_INCREMENTAL_MODE; use query::query_engine::DefaultSerializer; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; @@ -42,8 +43,9 @@ use tokio::sync::oneshot::error::TryRecvError; use tokio::time::Instant; use crate::batching_mode::BatchingModeOptions; -use crate::batching_mode::frontend_client::FrontendClient; -use crate::batching_mode::state::{DirtyTimeWindows, FilterExprInfo, TaskState}; +use crate::batching_mode::checkpoint::checkpoint_mode_label; +use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc}; +use crate::batching_mode::state::{CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState}; use crate::batching_mode::table_creator::{QueryType, create_table_with_expr}; use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::utils::{ @@ -62,6 +64,15 @@ use crate::metrics::{ }; use crate::{Error, FlowId}; +mod ckpt; +mod inc; + +/// Maximum number of dirty time-window predicates attached to one incremental +/// SQL query. This keeps generated OR filters bounded so Substrait encoding and +/// downstream planning remain predictable; if the backlog is larger, the flow +/// drains one capped batch and postpones checkpoint advancement to a later run. +const MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS: usize = 4096; + /// The task's config, immutable once created #[derive(Clone)] pub struct TaskConfig { @@ -123,6 +134,7 @@ pub struct TaskArgs<'a> { pub struct PlanInfo { pub plan: LogicalPlan, pub dirty_restore: DirtyRestore, + pub can_advance_checkpoints: bool, } pub enum DirtyRestore { @@ -247,8 +259,17 @@ impl BatchingTask { ) -> Result, Error> { if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? { debug!("Generate new query: {}", new_query.plan); + let dirty_filter = match &new_query.dirty_restore { + DirtyRestore::Scoped(f) => Some(f), + _ => None, + }; match self - .execute_logical_plan(frontend_client, &new_query.plan) + .execute_logical_plan( + frontend_client, + &new_query.plan, + dirty_filter, + new_query.can_advance_checkpoints, + ) .await { Ok(result) => Ok(result), @@ -330,6 +351,7 @@ impl BatchingTask { let insert_into_info = PlanInfo { plan, dirty_restore: new_query.dirty_restore, + can_advance_checkpoints: new_query.can_advance_checkpoints, }; let insert_into = match insert_into_info @@ -349,6 +371,7 @@ impl BatchingTask { Ok(Some(PlanInfo { plan: insert_into, dirty_restore: insert_into_info.dirty_restore, + can_advance_checkpoints: insert_into_info.can_advance_checkpoints, })) } @@ -369,6 +392,8 @@ impl BatchingTask { &self, frontend_client: &Arc, plan: &LogicalPlan, + dirty_filter: Option<&FilterExprInfo>, + can_advance_checkpoints: bool, ) -> Result, Error> { let instant = Instant::now(); let flow_id = self.config.flow_id; @@ -398,8 +423,40 @@ impl BatchingTask { })? .data; - let mut peer_desc = None; + // For incremental-mode SQL queries, attempt to rewrite the delta aggregate + // plan into a safe delta-LEFT-JOIN-sink form before deciding on extensions. + let incremental_plan = if can_advance_checkpoints { + self.prepare_plan_for_incremental(&plan, dirty_filter) + .await? + } else { + None + }; + let incremental_safe = incremental_plan.is_some(); + let plan = incremental_plan.unwrap_or_else(|| plan.clone()); + let extensions = self + .build_flow_query_extensions(incremental_safe, can_advance_checkpoints) + .await?; + let extension_refs = extensions + .iter() + .map(|(key, value)| (*key, value.as_str())) + .collect::>(); + let query_mode = if extensions + .iter() + .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE) + { + CheckpointMode::Incremental + } else { + CheckpointMode::FullSnapshot + }; + Self::record_query_mode(flow_id, query_mode); + debug!( + "Flow {flow_id} executing batching query with checkpoint_mode={}, extension_count={}", + checkpoint_mode_label(query_mode), + extensions.len() + ); + + let mut peer_desc = None; let res = { let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME .with_label_values(&[flow_id.to_string().as_str()]) @@ -411,66 +468,83 @@ impl BatchingTask { let message = DFLogicalSubstraitConvertor {} .encode(&insert_plan, DefaultSerializer) .context(SubstraitEncodeLogicalPlanSnafu)?; - api::v1::greptime_request::Request::Query(api::v1::QueryRequest { + api::v1::QueryRequest { query: Some(api::v1::query_request::Query::InsertIntoPlan( api::v1::InsertIntoPlan { table_name: Some(insert_to), logical_plan: message.to_vec(), }, )), - }) + } } else { let message = DFLogicalSubstraitConvertor {} .encode(&plan, DefaultSerializer) .context(SubstraitEncodeLogicalPlanSnafu)?; - api::v1::greptime_request::Request::Query(api::v1::QueryRequest { + api::v1::QueryRequest { query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())), - }) + } }; frontend_client - .handle(req, catalog, schema, &mut peer_desc) + .query_with_terminal_metrics(catalog, schema, req, &extension_refs, &mut peer_desc) .await }; let elapsed = instant.elapsed(); - if let Ok(affected_rows) = &res { - debug!( - "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}", - elapsed - ); - METRIC_FLOW_ROWS - .with_label_values(&[format!("{}-out-batching", flow_id).as_str()]) - .inc_by(*affected_rows as _); - } else if let Err(err) = &res { + let peer_label = peer_desc + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| PeerDesc::default().to_string()); + if let Err(err) = &res { warn!( - "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}", - peer_desc, elapsed, &plan + "Failed to execute Flow {flow_id} on frontend {peer_label}, result: {err:?}, elapsed: {:?} with query: {}", + elapsed, &plan ); + let decision = { + let mut state = self.state.write().unwrap(); + let reason = Self::query_failure_reason(err); + Self::apply_query_failure_to_state(&mut state, elapsed, reason) + }; + if let Some(decision) = decision { + Self::record_checkpoint_decision(flow_id, decision); + } } // record slow query if elapsed >= self.config.batch_opts.slow_query_threshold { warn!( - "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}", - peer_desc, elapsed, &plan + "Flow {flow_id} on frontend {peer_label} executed for {:?} before complete, query: {}", + elapsed, &plan ); + let flow_id = flow_id.to_string(); METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY - .with_label_values(&[ - flow_id.to_string().as_str(), - &peer_desc.unwrap_or_default().to_string(), - ]) + .with_label_values(&[flow_id.as_str(), peer_label.as_str()]) .observe(elapsed.as_secs_f64()); } - self.state - .write() - .unwrap() - .after_query_exec(elapsed, res.is_ok()); - let res = res?; - Ok(Some((res as usize, elapsed))) + let (affected_rows, _) = res.output.extract_rows_and_cost(); + debug!( + "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}, watermark: {:?}", + elapsed, + res.region_watermark_map() + ); + METRIC_FLOW_ROWS + .with_label_values(&[format!("{}-out-batching", flow_id).as_str()]) + .inc_by(affected_rows as _); + { + let mut state = self.state.write().unwrap(); + let decision = Self::apply_query_result_to_state( + &mut state, + &res, + elapsed, + can_advance_checkpoints, + ); + Self::record_checkpoint_decision(flow_id, decision); + } + + Ok(Some((affected_rows, elapsed))) } /// Restore dirty windows consumed by a failed query so they are retried on @@ -563,8 +637,17 @@ impl BatchingTask { }; let res = if let Some(new_query) = &new_query { - self.execute_logical_plan(&frontend_client, &new_query.plan) - .await + let dirty_filter = match &new_query.dirty_restore { + DirtyRestore::Scoped(f) => Some(f), + _ => None, + }; + self.execute_logical_plan( + &frontend_client, + &new_query.plan, + dirty_filter, + new_query.can_advance_checkpoints, + ) + .await } else { Ok(None) }; @@ -592,12 +675,17 @@ impl BatchingTask { .as_ref() .and_then(|t| *t.time_window_size()); + let prefer_short_incremental_cadence = state.checkpoint_mode() + == CheckpointMode::Incremental + && !state.is_incremental_disabled(); + state.get_next_start_query_time( self.config.flow_id, &time_window_size, min_refresh, Some(self.config.batch_opts.query_timeout), self.config.batch_opts.experimental_max_filter_num_per_query, + prefer_short_incremental_cadence, ) }; @@ -733,6 +821,7 @@ impl BatchingTask { return Ok(Some(PlanInfo { plan, dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore), + can_advance_checkpoints: true, })); } _ => { @@ -769,6 +858,7 @@ impl BatchingTask { return Ok(Some(PlanInfo { plan, dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore), + can_advance_checkpoints: true, })); } }; @@ -799,20 +889,33 @@ impl BatchingTask { ), })?; - let expr = self - .state - .write() - .unwrap() - .dirty_time_windows - .gen_filter_exprs( + let (expr, can_advance_checkpoints) = { + let mut state = self.state.write().unwrap(); + let window_cnt = if state.checkpoint_mode() == CheckpointMode::Incremental + && !state.is_incremental_disabled() + && matches!(self.config.query_type, QueryType::Sql) + { + // Incremental scans are bounded by region sequence checkpoints, + // so the dirty-window filter only narrows sink-side/time-window + // work. Drain more windows than normal, but keep a hard cap to + // avoid building a huge OR filter after a long downtime. If + // windows remain, checkpoints won't advance this round. + MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS + } else { + max_window_cnt + .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query) + }; + let expr = state.dirty_time_windows.gen_filter_exprs( &col_name, Some(expire_lower_bound), window_size, - max_window_cnt - .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query), + window_cnt, self.config.flow_id, Some(self), )?; + let can_advance_checkpoints = state.dirty_time_windows.is_empty(); + (expr, can_advance_checkpoints) + }; let Some(expr) = expr else { // no new data, hence no need to update @@ -859,6 +962,7 @@ impl BatchingTask { let info = PlanInfo { plan: new_plan.clone(), dirty_restore: DirtyRestore::Scoped(expr), + can_advance_checkpoints, }; Ok(Some(info)) diff --git a/src/flow/src/batching_mode/task/ckpt.rs b/src/flow/src/batching_mode/task/ckpt.rs new file mode 100644 index 0000000000..035d30a079 --- /dev/null +++ b/src/flow/src/batching_mode/task/ckpt.rs @@ -0,0 +1,181 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use client::OutputWithMetrics; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_telemetry::tracing::warn; +use common_telemetry::{debug, info}; + +use crate::batching_mode::checkpoint::{ + FlowCheckpointDecision, FlowQueryFallbackReason, checkpoint_mode_label, +}; +use crate::batching_mode::state::{CheckpointMode, TaskState}; +use crate::batching_mode::task::BatchingTask; +use crate::metrics::{ + METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT, +}; +use crate::{Error, FlowId}; + +impl BatchingTask { + pub(super) fn query_failure_reason(err: &Error) -> FlowQueryFallbackReason { + if err.status_code() == StatusCode::RequestOutdated { + FlowQueryFallbackReason::StaleCursor + } else { + FlowQueryFallbackReason::IncrementalQueryFailure + } + } + + pub(super) fn apply_query_failure_to_state( + state: &mut TaskState, + elapsed: Duration, + reason: FlowQueryFallbackReason, + ) -> Option { + state.after_query_exec(elapsed, false); + let checkpoint_mode = state.checkpoint_mode(); + if checkpoint_mode == CheckpointMode::Incremental { + state.mark_full_snapshot(); + Some(FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason, + }) + } else { + None + } + } + + pub(super) fn apply_query_result_to_state( + state: &mut TaskState, + res: &OutputWithMetrics, + elapsed: Duration, + can_advance_checkpoints: bool, + ) -> FlowCheckpointDecision { + state.after_query_exec(elapsed, true); + let checkpoint_mode = state.checkpoint_mode(); + if !can_advance_checkpoints { + state.mark_full_snapshot(); + return FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason: FlowQueryFallbackReason::DirtyBacklogPending, + }; + } + + if let (Some(participating_regions), Some(watermark_map)) = + (res.participating_regions(), res.region_watermark_map()) + { + let can_advance = match checkpoint_mode { + CheckpointMode::FullSnapshot => state + .can_advance_full_snapshot_checkpoints(&participating_regions, &watermark_map), + CheckpointMode::Incremental => state + .can_advance_incremental_checkpoints_with_participation( + &participating_regions, + &watermark_map, + ), + }; + + if can_advance { + let participating_region_count = participating_regions.len(); + let watermark_count = watermark_map.len(); + match checkpoint_mode { + CheckpointMode::FullSnapshot => { + state.advance_checkpoints(watermark_map); + if state.is_incremental_disabled() { + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::IncrementalDisabled, + } + } else { + FlowCheckpointDecision::AdvancedFromFullSnapshot { + participating_regions: participating_region_count, + watermarks: watermark_count, + } + } + } + CheckpointMode::Incremental => { + state.advance_incremental_checkpoints_with_participation( + &participating_regions, + watermark_map, + ); + FlowCheckpointDecision::AdvancedIncremental { + participating_regions: participating_region_count, + watermarks: watermark_count, + } + } + } + } else { + state.mark_full_snapshot(); + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason: FlowQueryFallbackReason::IncompleteRegionWatermark, + } + } + } else { + state.mark_full_snapshot(); + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason: FlowQueryFallbackReason::MissingRegionWatermark, + } + } + } + + pub(super) fn record_checkpoint_decision(flow_id: FlowId, decision: FlowCheckpointDecision) { + let flow_id = flow_id.to_string(); + METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT + .with_label_values(&[ + flow_id.as_str(), + decision.mode_label(), + decision.decision_label(), + decision.reason_label(), + ]) + .inc(); + + match decision { + FlowCheckpointDecision::AdvancedFromFullSnapshot { + participating_regions, + watermarks, + } => { + info!( + "Flow {flow_id} switched to incremental mode after full snapshot, participating_regions={participating_regions}, watermarks={watermarks}" + ); + } + FlowCheckpointDecision::AdvancedIncremental { + participating_regions, + watermarks, + } => { + debug!( + "Flow {flow_id} advanced incremental checkpoints, participating_regions={participating_regions}, watermarks={watermarks}" + ); + } + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode, + reason, + } => { + warn!( + "Flow {flow_id} switched to full snapshot mode, previous_mode={}, reason={}", + checkpoint_mode_label(previous_mode), + reason.as_label() + ); + } + } + } + + pub(super) fn record_query_mode(flow_id: FlowId, mode: CheckpointMode) { + let flow_id = flow_id.to_string(); + METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT + .with_label_values(&[flow_id.as_str(), checkpoint_mode_label(mode)]) + .inc(); + } +} diff --git a/src/flow/src/batching_mode/task/inc.rs b/src/flow/src/batching_mode/task/inc.rs new file mode 100644 index 0000000000..4fb64a676e --- /dev/null +++ b/src/flow/src/batching_mode/task/inc.rs @@ -0,0 +1,252 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_error::ext::BoxedError; +use common_telemetry::debug; +use common_telemetry::tracing::warn; +use datafusion_expr::{DmlStatement, LogicalPlan}; +use query::options::{ + FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY, + FLOW_SINK_TABLE_ID, +}; +use snafu::ResultExt; +use table::metadata::TableId; + +use crate::Error; +use crate::batching_mode::incremental_filter::build_sink_dirty_time_window_filter_expr; +use crate::batching_mode::state::{CheckpointMode, FilterExprInfo}; +use crate::batching_mode::table_creator::QueryType; +use crate::batching_mode::task::BatchingTask; +use crate::batching_mode::utils::{ + analyze_incremental_aggregate_plan, get_table_info_df_schema, + rewrite_incremental_aggregate_with_sink_merge, +}; +use crate::error::{ExternalSnafu, UnexpectedSnafu}; + +impl BatchingTask { + async fn sink_table_id(&self) -> Result { + let table = self + .config + .catalog_manager + .table( + &self.config.sink_table_name[0], + &self.config.sink_table_name[1], + &self.config.sink_table_name[2], + None, + ) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .ok_or_else(|| { + UnexpectedSnafu { + reason: format!( + "Flow {} cannot build incremental extensions because sink table {:?} was not found", + self.config.flow_id, self.config.sink_table_name + ), + } + .build() + })?; + Ok(table.table_info().table_id()) + } + + /// For incremental-mode SQL queries, attempt to prepare an executable plan + /// that is safe for incremental scan extensions. + /// + /// Returns `Some(plan)` when incremental extensions are safe, and `None` + /// when the caller should execute the original plan without incremental + /// extensions. The returned plan may be either a rewritten + /// delta-LEFT-JOIN-sink merge plan or the original plan. In particular, + /// plain GROUP BY queries with no aggregate merge columns are incremental + /// safe without a rewrite, so they return `Some(original_plan)`. + pub(super) async fn prepare_plan_for_incremental( + &self, + plan: &LogicalPlan, + dirty_filter: Option<&FilterExprInfo>, + ) -> Result, Error> { + let is_incremental_sql = { + let state = self.state.read().unwrap(); + if state.is_incremental_disabled() { + return Ok(None); + } + state.checkpoint_mode() == CheckpointMode::Incremental + && matches!(self.config.query_type, QueryType::Sql) + }; + + if !is_incremental_sql { + return Ok(None); + } + + // Extract inner query plan from the DML wrapper. + // Non-DML or non-SQL plans bypass the rewrite and keep checkpoint mode; + // non-aggregate TQL or non-INSERT plans do not need incremental scan extensions. + let inner_plan = match plan { + LogicalPlan::Dml(dml) => dml.input.as_ref().clone(), + _ => return Ok(None), + }; + + // Analyze the plan for incremental rewritability. + // Incremental reads currently require aggregate / group-by plans that + // can be rewritten into a delta-left-join-sink merge. Non-aggregate SQL + // (projection, filter, or other non-aggregate shapes) stays full-snapshot + // until separately supported, and incremental mode is permanently + // disabled for this flow. + let Some(analysis) = analyze_incremental_aggregate_plan(&inner_plan)? else { + warn!( + "Flow {} incremental mode but plan is not an aggregate query; \ + permanently disabling incremental for this flow", + self.config.flow_id + ); + self.state.write().unwrap().disable_incremental(); + return Ok(None); + }; + + if !analysis.unsupported_exprs.is_empty() { + warn!( + "Flow {} incremental aggregate contains unsupported expressions {:?}; \ + permanently disabling incremental for this flow", + self.config.flow_id, analysis.unsupported_exprs + ); + self.state.write().unwrap().disable_incremental(); + return Ok(None); + } + + // Plain GROUP BY without aggregate expressions has no values to + // merge between delta and sink. The incremental delta scan emits + // changed groups, and sink primary-key write semantics make this + // idempotent; no explicit left-join rewrite is needed. + if analysis.merge_columns.is_empty() { + return Ok(Some(plan.clone())); + } + + // Fetch sink table for the merge rewrite. + // Transient errors (catalog, schema, filter, or rewrite) should not + // permanently disable incremental mode. Instead, we fall back to a + // full-snapshot plan for this round while keeping incremental retryable. + let sink_table = match get_table_info_df_schema( + self.config.catalog_manager.clone(), + self.config.sink_table_name.clone(), + ) + .await + { + Ok((table, _)) => table, + Err(err) => { + warn!( + "Flow {} failed to fetch sink table for incremental rewrite; \ + falling back to full snapshot for this round: {:?}", + self.config.flow_id, err + ); + self.state.write().unwrap().mark_full_snapshot(); + return Ok(None); + } + }; + let sink_schema = sink_table.table_info().meta.schema.clone(); + let sink_dirty_filter = match build_sink_dirty_time_window_filter_expr( + self.config.flow_id, + &analysis, + &sink_schema, + dirty_filter, + ) { + Ok(filter) => filter, + Err(err) => { + warn!( + "Flow {} failed to build sink dirty time window filter; \ + falling back to full snapshot for this round: {:?}", + self.config.flow_id, err + ); + self.state.write().unwrap().mark_full_snapshot(); + return Ok(None); + } + }; + + let rewritten_inner = match rewrite_incremental_aggregate_with_sink_merge( + &inner_plan, + &analysis, + sink_table, + &self.config.sink_table_name, + sink_dirty_filter, + ) + .await + { + Ok(plan) => plan, + Err(err) => { + warn!( + "Flow {} failed to rewrite incremental aggregate with sink merge; \ + falling back to full snapshot for this round: {:?}", + self.config.flow_id, err + ); + self.state.write().unwrap().mark_full_snapshot(); + return Ok(None); + } + }; + + // Reconstruct DML plan with the rewritten inner plan + let rewritten = match plan { + LogicalPlan::Dml(dml) => LogicalPlan::Dml(DmlStatement::new( + dml.table_name.clone(), + dml.target.clone(), + dml.op.clone(), + Arc::new(rewritten_inner), + )), + _ => unreachable!("already matched Dml above"), + }; + + debug!( + "Flow {} rewrote incremental SQL aggregate query with sink merge", + self.config.flow_id + ); + + Ok(Some(rewritten)) + } + + pub(super) async fn build_flow_query_extensions( + &self, + incremental_safe: bool, + can_advance_checkpoints: bool, + ) -> Result, Error> { + let mut extensions = vec![("flow.return_region_seq", "true".to_string())]; + + let incremental_checkpoints_json = { + let state = self.state.read().unwrap(); + if incremental_safe + && can_advance_checkpoints + && !state.is_incremental_disabled() + && state.checkpoint_mode() == CheckpointMode::Incremental + && !state.checkpoints().is_empty() + { + Some(serde_json::to_string(state.checkpoints()).map_err(|err| { + UnexpectedSnafu { + reason: format!("Failed to serialize checkpoint map: {err}"), + } + .build() + })?) + } else { + None + } + }; + + if let Some(checkpoints_json) = incremental_checkpoints_json { + let sink_table_id = self.sink_table_id().await?; + extensions.push((FLOW_SINK_TABLE_ID, sink_table_id.to_string())); + extensions.push(( + FLOW_INCREMENTAL_MODE, + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + )); + extensions.push((FLOW_INCREMENTAL_AFTER_SEQS, checkpoints_json)); + } + + Ok(extensions) + } +} diff --git a/src/flow/src/batching_mode/task/test.rs b/src/flow/src/batching_mode/task/test.rs index 55a0a3057f..959aeb00c9 100644 --- a/src/flow/src/batching_mode/task/test.rs +++ b/src/flow/src/batching_mode/task/test.rs @@ -12,18 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; + use catalog::RegisterTableRequest; use catalog::memory::MemoryCatalogManager; +use client::OutputWithMetrics; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_query::Output; use common_recordbatch::RecordBatch; +use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; use datatypes::data_type::ConcreteDataType as CDT; use datatypes::schema::ColumnSchema; -use datatypes::vectors::{UInt32Vector, VectorRef}; +use datatypes::vectors::{TimestampMillisecondVector, UInt32Vector, VectorRef}; use pretty_assertions::assert_eq; +use query::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY}; use session::context::QueryContext; use table::test_util::MemTable; use super::*; +use crate::batching_mode::checkpoint::{ + CHECKPOINT_DECISION_ADVANCE, CHECKPOINT_DECISION_FALLBACK, CHECKPOINT_REASON_NONE, + FlowCheckpointDecision, FlowQueryFallbackReason, +}; +use crate::batching_mode::state::CheckpointMode; use crate::batching_mode::time_window::find_time_window_expr; use crate::test_utils::create_test_query_engine; @@ -172,6 +183,34 @@ fn register_number_only_sink(query_engine: &QueryEngineRef, table_name: &str) { memory_catalog.register_table_sync(request).unwrap(); } +fn register_auto_created_aggregate_sink(query_engine: &QueryEngineRef, table_name: &str) { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), true), + ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true), + ColumnSchema::new("update_at", CDT::timestamp_millisecond_datatype(), true), + ])); + let columns: Vec = vec![ + Arc::new(UInt32Vector::from_slice([1_u32])), + Arc::new(TimestampMillisecondVector::from_slice([0_i64])), + Arc::new(TimestampMillisecondVector::from_slice([0_i64])), + ]; + let recordbatch = RecordBatch::new(schema, columns).unwrap(); + let table = MemTable::table(table_name, recordbatch); + let request = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id: 9002, + table, + }; + let catalog_manager = query_engine.engine_state().catalog_manager(); + let memory_catalog = catalog_manager + .as_any() + .downcast_ref::() + .unwrap(); + memory_catalog.register_table_sync(request).unwrap(); +} + fn dirty_marker() -> DirtyTimeWindows { let mut dirty = DirtyTimeWindows::default(); dirty.set_dirty(); @@ -204,6 +243,7 @@ async fn assert_unscoped_failure_restore( let unscoped_query = PlanInfo { plan, dirty_restore: DirtyRestore::Unscoped(consumed_dirty_windows), + can_advance_checkpoints: true, }; task.handle_executed_query_failure(Some(&unscoped_query)); @@ -216,6 +256,442 @@ async fn assert_unscoped_failure_restore( ); } +fn output_with_region_watermarks( + watermarks: impl IntoIterator)>, +) -> OutputWithMetrics { + let result = OutputWithMetrics::from_output(Output::new_with_affected_rows(0)); + result.metrics.update(Some(RecordBatchMetrics { + region_watermarks: watermarks + .into_iter() + .map(|(region_id, watermark)| RegionWatermarkEntry { + region_id, + watermark, + }) + .collect(), + ..Default::default() + })); + result.metrics.mark_ready(); + result +} + +#[test] +fn test_apply_query_result_to_state_advances_full_snapshot_to_incremental() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + let result = output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &result, + std::time::Duration::from_millis(1), + true, + ); + + assert_eq!( + decision, + FlowCheckpointDecision::AdvancedFromFullSnapshot { + participating_regions: 2, + watermarks: 2, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental); + assert_eq!( + state.checkpoints(), + &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + ); +} + +#[test] +fn test_apply_query_result_to_state_stays_full_snapshot_when_incremental_disabled() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state.disable_incremental(); + assert!(state.is_incremental_disabled()); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + + let result = output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]); + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &result, + std::time::Duration::from_millis(1), + true, + ); + + // Should NOT claim advancement to incremental; should fallback with correct reason. + assert_eq!( + decision, + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::IncrementalDisabled, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.is_incremental_disabled()); + // Checkpoints are still updated even if mode doesn't advance. + assert_eq!( + state.checkpoints(), + &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + ); +} + +#[test] +fn test_apply_query_result_to_state_rejects_unproved_watermark() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + let result = output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, None)]); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &result, + std::time::Duration::from_millis(1), + true, + ); + + assert_eq!( + decision, + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::IncompleteRegionWatermark, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.checkpoints().is_empty()); +} + +#[test] +fn test_apply_query_result_to_state_reports_missing_watermark() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + let result = OutputWithMetrics::from_output(Output::new_with_affected_rows(0)); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &result, + std::time::Duration::from_millis(1), + true, + ); + + assert_eq!( + decision, + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::MissingRegionWatermark, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.checkpoints().is_empty()); +} + +#[test] +fn test_apply_query_result_to_state_advances_incremental_subset() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state.advance_checkpoints(HashMap::from([ + (1_u64, 10_u64), + (2_u64, 20_u64), + (3_u64, 30_u64), + ])); + let result = output_with_region_watermarks([(1_u64, Some(12_u64)), (3_u64, Some(35_u64))]); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &result, + std::time::Duration::from_millis(1), + true, + ); + + assert_eq!( + decision, + FlowCheckpointDecision::AdvancedIncremental { + participating_regions: 2, + watermarks: 2, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental); + assert_eq!( + state.checkpoints(), + &BTreeMap::from([(1_u64, 12_u64), (2_u64, 20_u64), (3_u64, 35_u64)]) + ); +} + +#[test] +fn test_apply_query_result_to_state_blocks_full_snapshot_when_dirty_backlog_pending() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + let result = output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &result, + std::time::Duration::from_millis(1), + false, + ); + + assert_eq!( + decision, + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::DirtyBacklogPending, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.checkpoints().is_empty()); +} + +#[test] +fn test_apply_query_result_to_state_blocks_incremental_when_dirty_backlog_pending() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])); + let result = output_with_region_watermarks([(1_u64, Some(12_u64)), (2_u64, Some(25_u64))]); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &result, + std::time::Duration::from_millis(1), + false, + ); + + assert_eq!( + decision, + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::Incremental, + reason: FlowQueryFallbackReason::DirtyBacklogPending, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert_eq!( + state.checkpoints(), + &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + ); +} + +#[test] +fn test_apply_query_failure_to_state_falls_back_from_incremental() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])); + assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental); + + let decision = BatchingTask::apply_query_failure_to_state( + &mut state, + std::time::Duration::from_millis(1), + FlowQueryFallbackReason::IncrementalQueryFailure, + ); + + assert_eq!( + decision, + Some(FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::Incremental, + reason: FlowQueryFallbackReason::IncrementalQueryFailure, + }) + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert_eq!( + state.checkpoints(), + &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + ); +} + +#[test] +fn test_apply_query_failure_to_state_keeps_full_snapshot_without_decision() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + + let decision = BatchingTask::apply_query_failure_to_state( + &mut state, + std::time::Duration::from_millis(1), + FlowQueryFallbackReason::StaleCursor, + ); + + assert_eq!(decision, None); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.checkpoints().is_empty()); +} + +#[test] +fn test_checkpoint_decision_labels_are_stable() { + let advance = FlowCheckpointDecision::AdvancedIncremental { + participating_regions: 1, + watermarks: 1, + }; + let fallback = FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::Incremental, + reason: FlowQueryFallbackReason::StaleCursor, + }; + + assert_eq!(advance.mode_label(), "incremental"); + assert_eq!(advance.decision_label(), CHECKPOINT_DECISION_ADVANCE); + assert_eq!(advance.reason_label(), CHECKPOINT_REASON_NONE); + assert_eq!(fallback.mode_label(), "incremental"); + assert_eq!(fallback.decision_label(), CHECKPOINT_DECISION_FALLBACK); + assert_eq!(fallback.reason_label(), "stale_cursor"); + assert_eq!( + FlowQueryFallbackReason::DirtyBacklogPending.as_label(), + "dirty_backlog_pending" + ); +} + +#[tokio::test] +async fn test_build_flow_query_extensions_switches_with_checkpoint_mode() { + let (task, _) = new_test_task_engine_and_plan_with_query( + "SELECT number, ts FROM numbers_with_ts", + "numbers_with_ts", + ) + .await + .into_task_and_plan(); + + let extensions = task.build_flow_query_extensions(false, true).await.unwrap(); + assert_eq!( + extensions, + vec![("flow.return_region_seq", "true".to_string())] + ); + + task.state + .write() + .unwrap() + .advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])); + + let extensions = task.build_flow_query_extensions(false, true).await.unwrap(); + assert!(extensions.contains(&("flow.return_region_seq", "true".to_string()))); + assert!( + !extensions + .iter() + .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE) + ); + assert!( + !extensions + .iter() + .any(|(key, _)| *key == FLOW_INCREMENTAL_AFTER_SEQS) + ); + + let extensions = task.build_flow_query_extensions(true, true).await.unwrap(); + + assert!(extensions.contains(&("flow.return_region_seq", "true".to_string()))); + assert!(extensions.contains(&( + FLOW_INCREMENTAL_MODE, + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string() + ))); + assert!(extensions.contains(&( + FLOW_INCREMENTAL_AFTER_SEQS, + serde_json::json!({"1": 10, "2": 20}).to_string(), + ))); + + let extensions = task.build_flow_query_extensions(true, false).await.unwrap(); + assert!(extensions.contains(&("flow.return_region_seq", "true".to_string()))); + assert!( + !extensions + .iter() + .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE) + ); + assert!( + !extensions + .iter() + .any(|(key, _)| *key == FLOW_INCREMENTAL_AFTER_SEQS) + ); + + task.state.write().unwrap().disable_incremental(); + let extensions = task.build_flow_query_extensions(true, true).await.unwrap(); + assert!(extensions.contains(&("flow.return_region_seq", "true".to_string()))); + assert!( + !extensions + .iter() + .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE) + ); + assert!( + !extensions + .iter() + .any(|(key, _)| *key == FLOW_INCREMENTAL_AFTER_SEQS) + ); +} + +#[tokio::test] +async fn test_full_snapshot_scoped_plan_marks_checkpoint_advance_safe_only_after_backlog_drained() { + let TestTaskParts { + task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number", + ) + .await; + { + let mut state = task.state.write().unwrap(); + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(30), Some(Timestamp::new_second(35))); + } + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])); + + let first = task + .gen_query_with_time_window(query_engine.clone(), &sink_schema, &[], false, Some(1)) + .await + .unwrap() + .unwrap(); + assert!(!first.can_advance_checkpoints); + assert_eq!(task.state.read().unwrap().dirty_time_windows.len(), 1); + + let second = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1)) + .await + .unwrap() + .unwrap(); + assert!(second.can_advance_checkpoints); + assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); +} + +#[tokio::test] +async fn test_incremental_scoped_plan_consumes_all_dirty_windows_for_checkpoint_safety() { + let TestTaskParts { + task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number", + ) + .await; + { + let mut state = task.state.write().unwrap(); + state.advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(30), Some(Timestamp::new_second(35))); + } + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])); + + let plan = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1)) + .await + .unwrap() + .unwrap(); + + assert!(plan.can_advance_checkpoints); + assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); +} + #[tokio::test] async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_path() { let (task, plan) = new_test_task_and_plan_with_missing_sink().await; @@ -231,12 +707,293 @@ async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_pat time_ranges: vec![(Timestamp::new_second(10), Timestamp::new_second(20))], window_size: chrono::Duration::seconds(10), }), + can_advance_checkpoints: true, }; task.handle_executed_query_failure(Some(&scoped_query)); let state = task.state.read().unwrap(); assert_eq!(state.dirty_time_windows.len(), 1); + assert_eq!( + state.dirty_time_windows.window_size(), + std::time::Duration::from_secs(10) + ); +} + +#[tokio::test] +async fn test_prepare_plan_for_incremental_disables_on_non_aggregate() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let plan = sql_to_df_plan( + ctx.clone(), + query_engine.clone(), + "SELECT number, ts FROM numbers_with_ts", + true, + ) + .await + .unwrap(); + + // Build a DML wrapper using a real sink table from the test engine. + let (sink_table, _) = get_table_info_df_schema( + query_engine.engine_state().catalog_manager().clone(), + [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ], + ) + .await + .unwrap(); + let table_provider = Arc::new(DfTableProviderAdapter::new(sink_table)); + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + let dml_plan = LogicalPlan::Dml(DmlStatement::new( + datafusion_common::TableReference::bare("test"), + table_source, + WriteOp::Insert(datafusion_expr::dml::InsertOp::Append), + Arc::new(plan), + )); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let task = BatchingTask::try_new(TaskArgs { + flow_id: 1, + query: "SELECT number, ts FROM numbers_with_ts", + plan: dml_plan.clone(), + time_window_expr: None, + expire_after: None, + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx: ctx, + catalog_manager: query_engine.engine_state().catalog_manager().clone(), + shutdown_rx: rx, + batch_opts: Arc::new(BatchingModeOptions::default()), + flow_eval_interval: None, + }) + .unwrap(); + + // Put the state into Incremental mode with checkpoints. + task.state + .write() + .unwrap() + .advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); + assert_eq!( + task.state.read().unwrap().checkpoint_mode(), + CheckpointMode::Incremental + ); + + let incremental_plan = task + .prepare_plan_for_incremental(&dml_plan, None) + .await + .unwrap(); + assert!(incremental_plan.is_none()); + let state = task.state.read().unwrap(); + assert!(state.is_incremental_disabled()); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); +} + +#[tokio::test] +async fn test_prepare_plan_for_incremental_falls_back_without_disable_on_rewrite_error() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let plan = sql_to_df_plan( + ctx.clone(), + query_engine.clone(), + "SELECT sum(number) AS total, ts FROM numbers_with_ts GROUP BY ts", + true, + ) + .await + .unwrap(); + + let (sink_table, _) = get_table_info_df_schema( + query_engine.engine_state().catalog_manager().clone(), + [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ], + ) + .await + .unwrap(); + let table_provider = Arc::new(DfTableProviderAdapter::new(sink_table)); + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + let dml_plan = LogicalPlan::Dml(DmlStatement::new( + datafusion_common::TableReference::bare("test"), + table_source, + WriteOp::Insert(datafusion_expr::dml::InsertOp::Append), + Arc::new(plan), + )); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let task = BatchingTask::try_new(TaskArgs { + flow_id: 1, + query: "SELECT sum(number) AS total, ts FROM numbers_with_ts GROUP BY ts", + plan: dml_plan.clone(), + time_window_expr: None, + expire_after: None, + // The sink table exists, but does not have the rewritten aggregate + // output column `total`, so the rewrite fails deterministically. + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx: ctx, + catalog_manager: query_engine.engine_state().catalog_manager().clone(), + shutdown_rx: rx, + batch_opts: Arc::new(BatchingModeOptions::default()), + flow_eval_interval: None, + }) + .unwrap(); + + task.state + .write() + .unwrap() + .advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); + assert_eq!( + task.state.read().unwrap().checkpoint_mode(), + CheckpointMode::Incremental + ); + + let incremental_plan = task + .prepare_plan_for_incremental(&dml_plan, None) + .await + .unwrap(); + assert!(incremental_plan.is_none()); + let state = task.state.read().unwrap(); + assert!(!state.is_incremental_disabled()); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); +} + +#[tokio::test] +async fn test_prepare_plan_for_incremental_group_by_without_merge_columns_uses_original_plan() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let plan = sql_to_df_plan( + ctx.clone(), + query_engine.clone(), + "SELECT ts FROM numbers_with_ts GROUP BY ts", + true, + ) + .await + .unwrap(); + + let (sink_table, _) = get_table_info_df_schema( + query_engine.engine_state().catalog_manager().clone(), + [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ], + ) + .await + .unwrap(); + let table_provider = Arc::new(DfTableProviderAdapter::new(sink_table)); + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + let dml_plan = LogicalPlan::Dml(DmlStatement::new( + datafusion_common::TableReference::bare("test"), + table_source, + WriteOp::Insert(datafusion_expr::dml::InsertOp::Append), + Arc::new(plan), + )); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let task = BatchingTask::try_new(TaskArgs { + flow_id: 1, + query: "SELECT ts FROM numbers_with_ts GROUP BY ts", + plan: dml_plan.clone(), + time_window_expr: None, + expire_after: None, + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx: ctx, + catalog_manager: query_engine.engine_state().catalog_manager().clone(), + shutdown_rx: rx, + batch_opts: Arc::new(BatchingModeOptions::default()), + flow_eval_interval: None, + }) + .unwrap(); + + task.state + .write() + .unwrap() + .advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); + + let incremental_plan = task + .prepare_plan_for_incremental(&dml_plan, None) + .await + .unwrap() + .expect("plain GROUP BY is incremental-safe without a rewrite"); + + assert_eq!(format!("{incremental_plan}"), format!("{dml_plan}")); + assert!(!task.state.read().unwrap().is_incremental_disabled()); +} + +#[tokio::test] +async fn test_auto_created_sql_aggregate_sink_reaches_incremental_safe() { + let sink_table = "auto_created_aggregate_sink"; + let TestTaskParts { + task, query_engine, .. + } = new_test_task_engine_and_plan_with_query( + "SELECT max(number) AS number, ts FROM numbers_with_ts GROUP BY ts", + sink_table, + ) + .await; + register_auto_created_aggregate_sink(&query_engine, sink_table); + task.state.write().unwrap().dirty_time_windows.set_dirty(); + + let plan_info = task + .gen_insert_plan(&query_engine, None) + .await + .unwrap() + .unwrap(); + assert!(plan_info.can_advance_checkpoints); + + task.state + .write() + .unwrap() + .advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); + let incremental_plan = task + .prepare_plan_for_incremental(&plan_info.plan, None) + .await + .unwrap(); + let incremental_safe = incremental_plan.is_some(); + + assert!(incremental_safe); + assert!(!task.state.read().unwrap().is_incremental_disabled()); + + let extensions = task + .build_flow_query_extensions(incremental_safe, plan_info.can_advance_checkpoints) + .await + .unwrap(); + assert!(extensions.contains(&( + FLOW_INCREMENTAL_MODE, + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string() + ))); + assert!( + extensions + .iter() + .any(|(key, _)| *key == FLOW_INCREMENTAL_AFTER_SEQS) + ); } #[tokio::test] diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index 7b066388ec..e86b1ee3be 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -278,7 +278,7 @@ fn collect_output_projection_info(plan: &LogicalPlan) -> OutputProjectionInfo { let mut col_names = Vec::new(); find_column_names(&alias.expr, &mut col_names); match col_names.len() { - 0 if matches!(alias.expr.as_ref(), Expr::Literal(_, _)) => { + 0 if is_passthrough_output_column(&alias_name, alias.expr.as_ref()) => { projection_info.literal_columns.insert(alias_name); } 1 => { @@ -315,10 +315,38 @@ fn collect_output_projection_info(plan: &LogicalPlan) -> OutputProjectionInfo { } } + if projection_info + .output_field_names + .iter() + .any(|name| name == AUTO_CREATED_PLACEHOLDER_TS_COL) + { + projection_info + .literal_columns + .insert(AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()); + } + projection_info.output_aliases = output_aliases; projection_info } +fn is_passthrough_output_column(alias_name: &str, expr: &Expr) -> bool { + matches!(expr, Expr::Literal(_, _)) + || match alias_name { + AUTO_CREATED_UPDATE_AT_TS_COL => expr == &datafusion::prelude::now(), + AUTO_CREATED_PLACEHOLDER_TS_COL => is_literal_or_cast_literal(expr), + _ => false, + } +} + +fn is_literal_or_cast_literal(expr: &Expr) -> bool { + match expr { + Expr::Literal(_, _) => true, + Expr::Cast(cast) => is_literal_or_cast_literal(cast.expr.as_ref()), + Expr::TryCast(cast) => is_literal_or_cast_literal(cast.expr.as_ref()), + _ => false, + } +} + fn merge_op_for_aggregate_expr(aggr_expr: &Expr) -> Result { let Some(aggr_func) = get_aggr_func(aggr_expr) else { return Err(aggr_expr.to_string()); @@ -385,6 +413,11 @@ fn find_uncovered_output_fields( !group_key_names.contains(*name) && !merge_column_names.contains(*name) && !projection_info.literal_columns.contains(*name) + // Auto-created sink columns injected by ColumnMatcherRewriter + // are not part of the original aggregate semantics and must + // not prevent incremental aggregate rewrites. + && name.as_str() != AUTO_CREATED_UPDATE_AT_TS_COL + && name.as_str() != AUTO_CREATED_PLACEHOLDER_TS_COL }) .cloned() .collect() @@ -536,7 +569,8 @@ pub fn analyze_incremental_aggregate_plan( /// /// ```text /// delta = SELECT ts, number FROM AS __flow_delta -/// sink = SELECT ts, number FROM AS __flow_sink +/// sink_scan = SELECT * FROM [WHERE ] +/// sink = SELECT ts, number FROM sink_scan AS __flow_sink /// SELECT /// CASE /// WHEN __flow_sink.number IS NULL THEN __flow_delta.number @@ -548,11 +582,17 @@ pub fn analyze_incremental_aggregate_plan( /// LEFT JOIN sink /// ON __flow_delta.ts IS NOT DISTINCT FROM __flow_sink.ts /// ``` +/// +/// If `sink_dirty_filter` is provided, it is applied to the sink table scan +/// before projection, aliasing, and the left join. The predicate must reference +/// raw sink table columns structurally (unqualified), before the `__flow_sink` +/// alias exists. pub async fn rewrite_incremental_aggregate_with_sink_merge( delta_plan: &LogicalPlan, analysis: &IncrementalAggregateAnalysis, sink_table: TableRef, sink_table_name: &TableName, + sink_dirty_filter: Option, ) -> Result { ensure!( analysis.unsupported_exprs.is_empty(), @@ -637,7 +677,22 @@ pub async fn rewrite_incremental_aggregate_with_sink_merge( .cloned() .map(unqualified_col) .collect::>(); - let sink_selected = LogicalPlanBuilder::from(sink_scan) + let sink_input = if let Some(predicate) = sink_dirty_filter { + LogicalPlanBuilder::from(sink_scan) + .filter(predicate) + .with_context(|_| DatafusionSnafu { + context: "Failed to filter sink table scan for incremental sink merge".to_string(), + })? + .build() + .with_context(|_| DatafusionSnafu { + context: "Failed to build filtered sink plan for incremental sink merge" + .to_string(), + })? + } else { + sink_scan + }; + + let sink_selected = LogicalPlanBuilder::from(sink_input) .project(sink_selected_exprs) .with_context(|_| DatafusionSnafu { context: "Failed to project sink table scan for incremental sink merge".to_string(), diff --git a/src/flow/src/batching_mode/utils/test.rs b/src/flow/src/batching_mode/utils/test.rs index 863580b4ae..5b9cf7f507 100644 --- a/src/flow/src/batching_mode/utils/test.rs +++ b/src/flow/src/batching_mode/utils/test.rs @@ -15,10 +15,13 @@ use std::sync::Arc; use common_recordbatch::RecordBatch; +use common_time::Timestamp; use datafusion_common::tree_node::TreeNode as _; use datafusion_expr::GroupingSet; -use datatypes::prelude::{ConcreteDataType, Scalar, VectorRef}; +use datatypes::prelude::{ConcreteDataType, MutableVector, Scalar, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::timestamp::TimestampMillisecond; +use datatypes::vectors::TimestampMillisecondVectorBuilder; use pretty_assertions::assert_eq; use query::query_engine::DefaultSerializer; use session::context::QueryContext; @@ -26,6 +29,7 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::test_util::MemTable; use super::*; +use crate::batching_mode::state::FilterExprInfo; use crate::test_utils::create_test_query_engine; fn u32_table(table_name: &str, columns: Vec<&str>, rows: usize) -> TableRef { @@ -50,6 +54,30 @@ fn empty_u32_table(table_name: &str, columns: Vec<&str>) -> TableRef { u32_table(table_name, columns, 0) } +fn time_window_u32_table(table_name: &str) -> TableRef { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new( + "time_window", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ])); + + let mut time_window_builder = TimestampMillisecondVectorBuilder::with_capacity(1); + time_window_builder.push(Some(TimestampMillisecond::new(0))); + let recordbatch = RecordBatch::new( + schema, + vec![ + time_window_builder.to_vector_cloned(), + Arc::new(::VectorType::from_vec(vec![1])) as VectorRef, + ], + ) + .unwrap(); + MemTable::table(table_name, recordbatch) +} + fn assert_same_logical_plan(actual: &LogicalPlan, expected: &LogicalPlan) { assert_eq!( format!("{}", expected.display_indent()), @@ -84,6 +112,29 @@ fn expected_left_join_rewrite( sink_selected_exprs: Vec, join_keys: (Vec, Vec), projection_exprs: Vec, +) -> LogicalPlan { + expected_left_join_rewrite_with_sink_filter( + delta_plan, + sink_table, + sink_table_name, + delta_selected_exprs, + sink_selected_exprs, + None, + join_keys, + projection_exprs, + ) +} + +#[allow(clippy::too_many_arguments)] +fn expected_left_join_rewrite_with_sink_filter( + delta_plan: &LogicalPlan, + sink_table: TableRef, + sink_table_name: &TableName, + delta_selected_exprs: Vec, + sink_selected_exprs: Vec, + sink_filter: Option, + join_keys: (Vec, Vec), + projection_exprs: Vec, ) -> LogicalPlan { let delta_alias = "__flow_delta"; let sink_alias = "__flow_sink"; @@ -94,7 +145,17 @@ fn expected_left_join_rewrite( .unwrap() .build() .unwrap(); - let sink_selected = LogicalPlanBuilder::from(test_sink_scan(sink_table, sink_table_name)) + let sink_scan = test_sink_scan(sink_table, sink_table_name); + let sink_input = if let Some(predicate) = sink_filter { + LogicalPlanBuilder::from(sink_scan) + .filter(predicate) + .unwrap() + .build() + .unwrap() + } else { + sink_scan + }; + let sink_selected = LogicalPlanBuilder::from(sink_input) .project(sink_selected_exprs) .unwrap() .alias(sink_alias) @@ -576,6 +637,44 @@ async fn test_analyze_incremental_aggregate_plan_keeps_aliases_for_multiple_aggr })); } +#[tokio::test] +async fn test_analyze_incremental_aggregate_plan_allows_auto_created_sink_columns() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sql = format!( + "SELECT max(number) AS total, ts, now() AS {}, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS {} FROM numbers_with_ts GROUP BY ts", + AUTO_CREATED_UPDATE_AT_TS_COL, AUTO_CREATED_PLACEHOLDER_TS_COL + ); + let plan = sql_to_df_plan(ctx, query_engine, &sql, false) + .await + .unwrap(); + + let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap(); + assert!( + analysis.unsupported_exprs.is_empty(), + "auto-created sink columns should not disable incremental analysis: {:?}", + analysis.unsupported_exprs + ); + assert!( + analysis + .literal_columns + .iter() + .any(|name| name == AUTO_CREATED_UPDATE_AT_TS_COL) + ); + assert!( + analysis + .literal_columns + .iter() + .any(|name| name == AUTO_CREATED_PLACEHOLDER_TS_COL) + ); + assert_eq!(analysis.merge_columns.len(), 1); + assert_eq!(analysis.merge_columns[0].output_field_name, "total"); + assert_eq!( + analysis.merge_columns[0].merge_op, + IncrementalAggregateMergeOp::Max + ); +} + #[tokio::test] async fn test_analyze_incremental_aggregate_plan_allows_where_before_aggregate() { let query_engine = create_test_query_engine(); @@ -641,6 +740,7 @@ async fn test_rewrite_incremental_aggregate_allows_alias_wrapped_scan() { "public".to_string(), "alias_wrapped_sink".to_string(), ], + None, ) .await .unwrap(); @@ -887,6 +987,7 @@ async fn test_analyze_incremental_aggregate_plan_allows_literal_outputs() { &analysis, sink_table.clone(), &sink_table_name, + None, ) .await .unwrap(); @@ -975,6 +1076,7 @@ async fn test_rewrite_incremental_aggregate_preserves_non_identifier_aliases() { "public".to_string(), "non_identifier_alias_sink".to_string(), ], + None, ) .await .unwrap(); @@ -1161,6 +1263,7 @@ async fn test_rewrite_incremental_aggregate_with_left_join() { &analysis, sink_table.clone(), &sink_table_name, + None, ) .await .unwrap(); @@ -1183,6 +1286,67 @@ async fn test_rewrite_incremental_aggregate_with_left_join() { assert_same_logical_plan(&rewritten, &expected); } +#[tokio::test] +async fn test_rewrite_incremental_aggregate_filters_sink_dirty_time_window() { + // This verifies the rewrite placement when callers supply an already + // inferred sink dirty-window predicate. The task-level inference rules are + // covered by `infer_sink_time_window_filter_col` tests in task.rs. + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sql = "SELECT max(number) AS number, date_bin(INTERVAL '1 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window"; + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false) + .await + .unwrap(); + let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap(); + let sink_table = time_window_u32_table("time_window_sink"); + let sink_table_name = [ + "greptime".to_string(), + "public".to_string(), + "time_window_sink".to_string(), + ]; + let dirty_filter = FilterExprInfo { + expr: unqualified_col("ts"), + col_name: "ts".to_string(), + time_ranges: vec![( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(1000), + )], + window_size: chrono::Duration::seconds(1), + }; + let sink_filter = dirty_filter + .predicate_for_col("time_window") + .unwrap() + .unwrap(); + + let rewritten = rewrite_incremental_aggregate_with_sink_merge( + &plan, + &analysis, + sink_table.clone(), + &sink_table_name, + Some(sink_filter.clone()), + ) + .await + .unwrap(); + + let expected = expected_left_join_rewrite_with_sink_filter( + &plan, + sink_table, + &sink_table_name, + vec![unqualified_col("time_window"), unqualified_col("number")], + vec![unqualified_col("time_window"), unqualified_col("number")], + Some(sink_filter), + ( + vec![qualified_column("__flow_delta", "time_window")], + vec![qualified_column("__flow_sink", "time_window")], + ), + vec![ + max_merge_expr("number"), + qualified_col("__flow_delta", "time_window").alias("time_window"), + ], + ); + assert_same_logical_plan(&rewritten, &expected); +} + #[tokio::test] async fn test_analyze_incremental_aggregate_plan_rejects_global_aggregate() { let query_engine = create_test_query_engine(); @@ -1230,6 +1394,7 @@ async fn test_rewrite_incremental_aggregate_rejects_empty_group_keys() { &analysis, sink_table, &sink_table_name, + None, ) .await .unwrap_err(); @@ -1261,6 +1426,7 @@ async fn test_rewrite_incremental_aggregate_preserves_raw_aggregate_field_name() &analysis, sink_table.clone(), &sink_table_name, + None, ) .await .unwrap(); diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 58c01793cc..00f93d47ab 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -87,6 +87,20 @@ lazy_static! { &["flow_id"], ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT: IntCounterVec = + register_int_counter_vec!( + "greptime_flow_batching_checkpoint_decision_count", + "flow batching checkpoint state-machine decisions", + &["flow_id", "mode", "decision", "reason"], + ) + .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT: IntCounterVec = + register_int_counter_vec!( + "greptime_flow_batching_query_mode_count", + "flow batching query attempts by checkpoint mode", + &["flow_id", "mode"], + ) + .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index 75fbc848ea..e4010940fa 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -100,7 +100,7 @@ async fn test_incremental_query_stale_error() { region_id, ScanRequest { memtable_min_sequence: Some(min_readable_seq), - sst_min_sequence: Some(u64::MAX), + skip_sst_files: true, ..Default::default() }, ) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index ea779c145f..baf6964c27 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -456,26 +456,28 @@ impl ScanRegion { let ssts = &self.version.ssts; let mut files = Vec::new(); - for level in ssts.levels() { - for file in level.files.values() { - let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) { - (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence, - // If the file's sequence is None (or actually is zero), it could mean the file - // is generated and added to the region "directly". In this case, its data should - // be considered as fresh as the memtable. So its sequence is treated greater than - // the min_sequence, whatever the value of min_sequence is. Hence the default - // "true" in this arm. - (Some(_), None) => true, - (None, _) => true, - }; + if !self.request.skip_sst_files { + for level in ssts.levels() { + for file in level.files.values() { + let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) { + (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence, + // If the file's sequence is None (or actually is zero), it could mean the file + // is generated and added to the region "directly". In this case, its data should + // be considered as fresh as the memtable. So its sequence is treated greater than + // the min_sequence, whatever the value of min_sequence is. Hence the default + // "true" in this arm. + (Some(_), None) => true, + (None, _) => true, + }; - // Finds SST files in range. - if exceed_min_sequence && file_in_range(file, &time_range) { - files.push(file.clone()); + // Finds SST files in range. + if exceed_min_sequence && file_in_range(file, &time_range) { + files.push(file.clone()); + } + // There is no need to check and prune for file's sequence here as the sequence number is usually very new, + // unless the timing is too good, or the sequence number wouldn't be in file. + // and the batch will be filtered out by tree reader anyway. } - // There is no need to check and prune for file's sequence here as the sequence number is usually very new, - // unless the timing is too good, or the sequence number wouldn't be in file. - // and the batch will be filtered out by tree reader anyway. } } @@ -579,7 +581,9 @@ impl ScanRegion { .with_vector_index_k(vector_index_k); #[cfg(feature = "enterprise")] - let input = if let Some(provider) = self.extension_range_provider { + let input = if !self.request.skip_sst_files + && let Some(provider) = self.extension_range_provider + { let ranges = provider .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request) .await?; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 470b4d325f..e1dd635de7 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -51,6 +51,7 @@ use tracing::{Instrument, Span}; use crate::dist_plan::analyzer::AliasMapping; use crate::dist_plan::analyzer::utils::patch_batch_timezone; use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS}; +use crate::options::FlowQueryExtensions; use crate::region_query::RegionQueryHandlerRef; #[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)] @@ -481,6 +482,23 @@ impl MergeScanExec { &self.regions } + pub fn is_flow_sink_scan(&self) -> bool { + let Some(sink_table_id) = + FlowQueryExtensions::parse_flow_extensions(&self.query_ctx.extensions()) + .ok() + .flatten() + .and_then(|extensions| extensions.sink_table_id) + else { + return false; + }; + + !self.regions.is_empty() + && self + .regions + .iter() + .all(|region_id| region_id.table_id() == sink_table_id) + } + pub fn partition_count(&self) -> usize { self.target_partition } diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index f08c6e6ec6..c79c8d88ea 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -45,7 +45,7 @@ use table::metadata::{TableId, TableInfoRef}; use table::table::scan::RegionScanExec; use crate::error::{GetRegionMetadataSnafu, Result}; -use crate::options::FlowQueryExtensions; +use crate::options::{FlowIncrementalMode, FlowQueryExtensions}; /// Resolve to the given region (specified by [RegionId]) unconditionally. #[derive(Clone, Debug)] @@ -357,6 +357,8 @@ struct FlowScanDecision { /// When present, this becomes the effective memtable upper bound and suppresses /// binding a new snapshot on scan open. memtable_max_sequence: Option, + /// Whether to skip SST files for memtable-only incremental source scans. + skip_sst_files: bool, } impl FlowScanDecision { @@ -366,6 +368,7 @@ impl FlowScanDecision { snapshot_on_scan: false, memtable_min_sequence: None, memtable_max_sequence: None, + skip_sst_files: false, } } } @@ -379,6 +382,7 @@ fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result Result) -> Vec() { + if let Some(merge_scan) = plan.as_any().downcast_ref::() + && !merge_scan.is_flow_sink_scan() + { merge_merge_scan_region_watermarks( &mut merged, merge_scan @@ -281,8 +280,6 @@ fn collect_region_watermarks(plan: Arc) -> Vec match existing { - MergeState::Participated | MergeState::Proved(_) => { + MergeState::Proved(_) => { *existing = MergeState::Unproved; } MergeState::Unproved | MergeState::Conflict { .. } => {} }, Some(seq) => match existing { - MergeState::Participated => { - *existing = MergeState::Proved(seq); - } MergeState::Unproved => {} MergeState::Proved(existing_seq) if *existing_seq == seq => {} MergeState::Proved(existing_seq) => { @@ -336,16 +330,32 @@ fn merge_merge_scan_region_watermarks( regions: impl IntoIterator, sub_stage_metrics: impl IntoIterator, ) { - // Regions listed by MergeScanExec participated even when no sub-stage can - // prove a watermark. Keep them as explicit `None` entries so callers can - // distinguish unproved participation from non-participation. - for region_id in regions { - merged.entry(region_id).or_insert(MergeState::Participated); - } - + let regions = regions.into_iter().collect::>(); + let mut proved_or_unproved_regions = BTreeSet::new(); for metrics in sub_stage_metrics { + proved_or_unproved_regions.extend( + metrics + .region_watermarks + .iter() + .map(|entry| entry.region_id), + ); merge_region_watermark_entries(merged, metrics.region_watermarks); } + + // Regions listed by a MergeScanExec participated even when no sub-stage can + // prove a watermark. Merge missing per-scan region entries as explicit + // `None` entries so an unproved participating branch vetoes any proof from + // another branch for the same region. + merge_region_watermark_entries( + merged, + regions + .into_iter() + .filter(|region_id| !proved_or_unproved_regions.contains(region_id)) + .map(|region_id| RegionWatermarkEntry { + region_id, + watermark: None, + }), + ); } fn finalize_region_watermarks(merged: BTreeMap) -> Vec { @@ -354,7 +364,6 @@ fn finalize_region_watermarks(merged: BTreeMap) -> Vec None, MergeState::Unproved => None, MergeState::Proved(seq) => Some(seq), MergeState::Conflict { watermarks } => { @@ -371,10 +380,35 @@ fn finalize_region_watermarks(merged: BTreeMap) -> Vec Result { + unreachable!("metrics tests should not execute remote queries") + } + } fn metrics_with_region_watermarks(entries: &[(u64, Option)]) -> RecordBatchMetrics { RecordBatchMetrics { @@ -389,12 +423,66 @@ mod tests { } } + fn test_merge_scan_exec(table_id: u32, query_ctx: QueryContextRef) -> Arc { + let session_state = SessionStateBuilder::new().with_default_features().build(); + let plan = LogicalPlanBuilder::empty(false).build().unwrap(); + let schema = ArrowSchema::empty(); + + Arc::new( + MergeScanExec::new( + &session_state, + TableName::new("greptime", "public", "test"), + vec![RegionId::new(table_id, 0)], + plan, + &schema, + Arc::new(NoopRegionQueryHandler), + query_ctx, + 1, + BTreeMap::>::new(), + ) + .unwrap(), + ) + } + + fn flow_query_ctx_with_sink_table_id(sink_table_id: u32) -> QueryContextRef { + Arc::new( + QueryContextBuilder::default() + .set_extension(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()) + .set_extension(FLOW_SINK_TABLE_ID.to_string(), sink_table_id.to_string()) + .build(), + ) + } + #[test] fn terminal_metrics_returns_none_without_merge_scan() { let plan: Arc = Arc::new(EmptyExec::new(Arc::new(ArrowSchema::empty()))); assert!(terminal_recordbatch_metrics_from_plan(plan).is_none()); } + #[test] + fn terminal_metrics_skip_flow_sink_merge_scan_regions() { + let query_ctx = flow_query_ctx_with_sink_table_id(42); + let plan = test_merge_scan_exec(42, query_ctx); + + assert!(terminal_recordbatch_metrics_from_plan(plan).is_none()); + } + + #[test] + fn terminal_metrics_keep_source_merge_scan_regions_with_sink_extension() { + let query_ctx = flow_query_ctx_with_sink_table_id(42); + let plan = test_merge_scan_exec(43, query_ctx); + + assert_eq!( + terminal_recordbatch_metrics_from_plan(plan) + .unwrap() + .region_watermarks, + vec![RegionWatermarkEntry { + region_id: RegionId::new(43, 0).as_u64(), + watermark: None, + }] + ); + } + #[test] fn merge_merge_scan_region_watermarks_marks_missing_watermarks_unproved() { let mut merged = BTreeMap::new(); @@ -503,4 +591,44 @@ mod tests { }] ); } + + #[test] + fn merge_merge_scan_region_watermarks_missing_branch_vetoes_proved_value() { + let mut merged = BTreeMap::new(); + + merge_merge_scan_region_watermarks( + &mut merged, + [9], + [metrics_with_region_watermarks(&[(9, Some(21))])], + ); + merge_merge_scan_region_watermarks(&mut merged, [9], std::iter::empty()); + + assert_eq!( + finalize_region_watermarks(merged), + vec![RegionWatermarkEntry { + region_id: 9, + watermark: None, + }] + ); + } + + #[test] + fn merge_merge_scan_region_watermarks_missing_branch_vetoes_proved_value_regardless_of_order() { + let mut merged = BTreeMap::new(); + + merge_merge_scan_region_watermarks(&mut merged, [9], std::iter::empty()); + merge_merge_scan_region_watermarks( + &mut merged, + [9], + [metrics_with_region_watermarks(&[(9, Some(21))])], + ); + + assert_eq!( + finalize_region_watermarks(merged), + vec![RegionWatermarkEntry { + region_id: 9, + watermark: None, + }] + ); + } } diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 57b2ca8e88..b27119d881 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -124,6 +124,9 @@ pub struct ScanRequest { /// Optional constraint on the minimal sequence number in the SST files. /// If set, only the SST files that contain sequences greater than this value will be scanned. pub sst_min_sequence: Option, + /// Whether to skip all SST files. + /// This is stronger than `sst_min_sequence` and also skips SST files without sequence metadata. + pub skip_sst_files: bool, /// Whether to bind the effective snapshot upper bound when opening the scan. pub snapshot_on_scan: bool, /// Optional hint for the distribution of time-series data. @@ -211,6 +214,14 @@ impl Display for ScanRequest { sst_min_sequence )?; } + if self.skip_sst_files { + write!( + f, + "{}skip_sst_files: {}", + delimiter.as_str(), + self.skip_sst_files + )?; + } if self.snapshot_on_scan { write!( f, @@ -321,5 +332,11 @@ mod tests { request.to_string(), "ScanRequest { snapshot_on_scan: true }" ); + + let request = ScanRequest { + skip_sst_files: true, + ..Default::default() + }; + assert_eq!(request.to_string(), "ScanRequest { skip_sst_files: true }"); } } diff --git a/tests/cases/standalone/common/flow/flow_incremental_aggr.result b/tests/cases/standalone/common/flow/flow_incremental_aggr.result new file mode 100644 index 0000000000..bb66d5362c --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_incremental_aggr.result @@ -0,0 +1,119 @@ +CREATE TABLE incremental_aggr_input ( + host_id INT, + n INT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(host_id) +) WITH ( + append_mode = 'true' +); + +Affected Rows: 0 + +CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink AS +SELECT + sum(n) AS total, + min(n) AS min_n, + max(n) AS max_n, + date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window +FROM + incremental_aggr_input +GROUP BY + time_window; + +Affected Rows: 0 + +INSERT INTO incremental_aggr_input VALUES + (1, 10, '2024-01-01 00:00:00'), + (2, 20, '2024-01-01 00:00:30'); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('incremental_aggr_flow'); + ++-------------------------------------------+ +| ADMIN FLUSH_FLOW('incremental_aggr_flow') | ++-------------------------------------------+ +| FLOW_FLUSHED | ++-------------------------------------------+ + +SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window; + ++-------+-------+-------+---------------------+ +| total | min_n | max_n | time_window | ++-------+-------+-------+---------------------+ +| 30 | 10 | 20 | 2024-01-01T00:00:00 | ++-------+-------+-------+---------------------+ + +-- Move already checkpointed source rows into SST. The next incremental run +-- must still read only the memtable delta and must not merge these old SST +-- rows again. +ADMIN FLUSH_TABLE('incremental_aggr_input'); + ++---------------------------------------------+ +| ADMIN FLUSH_TABLE('incremental_aggr_input') | ++---------------------------------------------+ +| 0 | ++---------------------------------------------+ + +-- Insert more rows into the same time window. An incremental-safe flow should +-- merge the delta aggregate with the existing sink aggregate state. +INSERT INTO incremental_aggr_input VALUES + (3, 30, '2024-01-01 00:00:15'), + (4, 40, '2024-01-01 00:00:45'); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('incremental_aggr_flow'); + ++-------------------------------------------+ +| ADMIN FLUSH_FLOW('incremental_aggr_flow') | ++-------------------------------------------+ +| FLOW_FLUSHED | ++-------------------------------------------+ + +SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window; + ++-------+-------+-------+---------------------+ +| total | min_n | max_n | time_window | ++-------+-------+-------+---------------------+ +| 100 | 10 | 40 | 2024-01-01T00:00:00 | ++-------+-------+-------+---------------------+ + +-- Insert a row into a new time window to cover append of a new aggregate key. +INSERT INTO incremental_aggr_input VALUES + (5, 50, '2024-01-01 00:01:00'); + +Affected Rows: 1 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('incremental_aggr_flow'); + ++-------------------------------------------+ +| ADMIN FLUSH_FLOW('incremental_aggr_flow') | ++-------------------------------------------+ +| FLOW_FLUSHED | ++-------------------------------------------+ + +SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window; + ++-------+-------+-------+---------------------+ +| total | min_n | max_n | time_window | ++-------+-------+-------+---------------------+ +| 100 | 10 | 40 | 2024-01-01T00:00:00 | +| 50 | 50 | 50 | 2024-01-01T00:01:00 | ++-------+-------+-------+---------------------+ + +DROP FLOW incremental_aggr_flow; + +Affected Rows: 0 + +DROP TABLE incremental_aggr_input; + +Affected Rows: 0 + +DROP TABLE incremental_aggr_sink; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_incremental_aggr.sql b/tests/cases/standalone/common/flow/flow_incremental_aggr.sql new file mode 100644 index 0000000000..51dd431fef --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_incremental_aggr.sql @@ -0,0 +1,57 @@ +CREATE TABLE incremental_aggr_input ( + host_id INT, + n INT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(host_id) +) WITH ( + append_mode = 'true' +); + +CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink AS +SELECT + sum(n) AS total, + min(n) AS min_n, + max(n) AS max_n, + date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window +FROM + incremental_aggr_input +GROUP BY + time_window; + +INSERT INTO incremental_aggr_input VALUES + (1, 10, '2024-01-01 00:00:00'), + (2, 20, '2024-01-01 00:00:30'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('incremental_aggr_flow'); + +SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window; + +-- Move already checkpointed source rows into SST. The next incremental run +-- must still read only the memtable delta and must not merge these old SST +-- rows again. +ADMIN FLUSH_TABLE('incremental_aggr_input'); + +-- Insert more rows into the same time window. An incremental-safe flow should +-- merge the delta aggregate with the existing sink aggregate state. +INSERT INTO incremental_aggr_input VALUES + (3, 30, '2024-01-01 00:00:15'), + (4, 40, '2024-01-01 00:00:45'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('incremental_aggr_flow'); + +SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window; + +-- Insert a row into a new time window to cover append of a new aggregate key. +INSERT INTO incremental_aggr_input VALUES + (5, 50, '2024-01-01 00:01:00'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('incremental_aggr_flow'); + +SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window; + +DROP FLOW incremental_aggr_flow; +DROP TABLE incremental_aggr_input; +DROP TABLE incremental_aggr_sink; diff --git a/tests/cases/standalone/common/flow/flow_incremental_memtable.result b/tests/cases/standalone/common/flow/flow_incremental_memtable.result new file mode 100644 index 0000000000..1e452b21ad --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_incremental_memtable.result @@ -0,0 +1,132 @@ +-- Validate that a flow performing an incremental aggregate read only reads memtable +-- data and does NOT re-read source rows that have already been flushed to SST after +-- a previous checkpoint. +CREATE TABLE flow_incr_memtable_input ( + host_id INT, + n INT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(host_id) +) WITH ( + append_mode = 'true' +); + +Affected Rows: 0 + +CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink AS +SELECT + sum(n) AS total, + min(n) AS min_n, + max(n) AS max_n, + date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window +FROM + flow_incr_memtable_input +GROUP BY + time_window; + +Affected Rows: 0 + +-- ==== Phase 1: initial insert + checkpoint ==== +INSERT INTO flow_incr_memtable_input VALUES + (1, 10, '2024-01-01 00:00:00'), + (2, 20, '2024-01-01 00:00:30'); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_memtable'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('flow_incr_memtable') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window; + ++-------+-------+-------+---------------------+ +| total | min_n | max_n | time_window | ++-------+-------+-------+---------------------+ +| 30 | 10 | 20 | 2024-01-01T00:00:00 | ++-------+-------+-------+---------------------+ + +-- ==== Phase 2: flush sink and source tables to SST ==== +-- The next incremental run must still read the flushed sink aggregate state, +-- while skipping already-checkpointed source SST files. +ADMIN FLUSH_TABLE('flow_incr_memtable_sink'); + ++----------------------------------------------+ +| ADMIN FLUSH_TABLE('flow_incr_memtable_sink') | ++----------------------------------------------+ +| 0 | ++----------------------------------------------+ + +ADMIN FLUSH_TABLE('flow_incr_memtable_input'); + ++-----------------------------------------------+ +| ADMIN FLUSH_TABLE('flow_incr_memtable_input') | ++-----------------------------------------------+ +| 0 | ++-----------------------------------------------+ + +-- ==== Phase 3: empty incremental window ==== +-- Flush the flow without inserting any new source rows to verify that +-- the incremental read correctly handles the case where no new memtable +-- data exists. +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_memtable'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('flow_incr_memtable') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window; + ++-------+-------+-------+---------------------+ +| total | min_n | max_n | time_window | ++-------+-------+-------+---------------------+ +| 30 | 10 | 20 | 2024-01-01T00:00:00 | ++-------+-------+-------+---------------------+ + +-- ==== Phase 4: insert new delta within the same time window ==== +INSERT INTO flow_incr_memtable_input VALUES + (3, 30, '2024-01-01 00:00:15'), + (4, 40, '2024-01-01 00:00:45'); + +Affected Rows: 2 + +-- ==== Phase 5: flush flow again (incremental read) ==== +-- The flow must only read the new memtable delta and merge with the existing +-- sink aggregate. If it mistakenly re-reads the SST, the result will be +-- inflated (initial data counted twice). +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_memtable'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('flow_incr_memtable') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window; + ++-------+-------+-------+---------------------+ +| total | min_n | max_n | time_window | ++-------+-------+-------+---------------------+ +| 100 | 10 | 40 | 2024-01-01T00:00:00 | ++-------+-------+-------+---------------------+ + +-- Clean up +DROP FLOW flow_incr_memtable; + +Affected Rows: 0 + +DROP TABLE flow_incr_memtable_input; + +Affected Rows: 0 + +DROP TABLE flow_incr_memtable_sink; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_incremental_memtable.sql b/tests/cases/standalone/common/flow/flow_incremental_memtable.sql new file mode 100644 index 0000000000..66dccbb8b3 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_incremental_memtable.sql @@ -0,0 +1,66 @@ +-- Validate that a flow performing an incremental aggregate read only reads memtable +-- data and does NOT re-read source rows that have already been flushed to SST after +-- a previous checkpoint. +CREATE TABLE flow_incr_memtable_input ( + host_id INT, + n INT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(host_id) +) WITH ( + append_mode = 'true' +); + +CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink AS +SELECT + sum(n) AS total, + min(n) AS min_n, + max(n) AS max_n, + date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window +FROM + flow_incr_memtable_input +GROUP BY + time_window; + +-- ==== Phase 1: initial insert + checkpoint ==== +INSERT INTO flow_incr_memtable_input VALUES + (1, 10, '2024-01-01 00:00:00'), + (2, 20, '2024-01-01 00:00:30'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_memtable'); + +SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window; + +-- ==== Phase 2: flush sink and source tables to SST ==== +-- The next incremental run must still read the flushed sink aggregate state, +-- while skipping already-checkpointed source SST files. +ADMIN FLUSH_TABLE('flow_incr_memtable_sink'); +ADMIN FLUSH_TABLE('flow_incr_memtable_input'); + +-- ==== Phase 3: empty incremental window ==== +-- Flush the flow without inserting any new source rows to verify that +-- the incremental read correctly handles the case where no new memtable +-- data exists. +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_memtable'); + +SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window; + +-- ==== Phase 4: insert new delta within the same time window ==== +INSERT INTO flow_incr_memtable_input VALUES + (3, 30, '2024-01-01 00:00:15'), + (4, 40, '2024-01-01 00:00:45'); + +-- ==== Phase 5: flush flow again (incremental read) ==== +-- The flow must only read the new memtable delta and merge with the existing +-- sink aggregate. If it mistakenly re-reads the SST, the result will be +-- inflated (initial data counted twice). +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_memtable'); + +SELECT total, min_n, max_n, time_window FROM flow_incr_memtable_sink ORDER BY time_window; + +-- Clean up +DROP FLOW flow_incr_memtable; +DROP TABLE flow_incr_memtable_input; +DROP TABLE flow_incr_memtable_sink; diff --git a/tests/cases/standalone/common/flow/flow_incremental_partitioned.result b/tests/cases/standalone/common/flow/flow_incremental_partitioned.result new file mode 100644 index 0000000000..b56b390abd --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_incremental_partitioned.result @@ -0,0 +1,108 @@ +-- Validate that a flow performing an incremental aggregate read on a +-- partitioned source table (multiple regions) only reads memtable data +-- and does NOT re-read source rows that have already been flushed to SST. +CREATE TABLE flow_incr_part_input ( + host_id INT, + n INT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(host_id) +) +PARTITION ON COLUMNS (host_id) ( + host_id < 3, + host_id >= 3 +) +WITH ( + append_mode = 'true' +); + +Affected Rows: 0 + +CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink AS +SELECT + sum(n) AS total, + min(n) AS min_n, + max(n) AS max_n, + date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window +FROM + flow_incr_part_input +GROUP BY + time_window; + +Affected Rows: 0 + +-- ==== Phase 1: initial insert across both partitions ==== +INSERT INTO flow_incr_part_input VALUES + (1, 10, '2024-01-01 00:00:00'), + (4, 20, '2024-01-01 00:00:30'); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_part'); + ++------------------------------------+ +| ADMIN FLUSH_FLOW('flow_incr_part') | ++------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------+ + +SELECT total, min_n, max_n, time_window FROM flow_incr_part_sink ORDER BY time_window; + ++-------+-------+-------+---------------------+ +| total | min_n | max_n | time_window | ++-------+-------+-------+---------------------+ +| 30 | 10 | 20 | 2024-01-01T00:00:00 | ++-------+-------+-------+---------------------+ + +-- ==== Phase 2: flush source table to SST ==== +-- Move already checkpointed source rows into SST so the next incremental run +-- must skip them. +ADMIN FLUSH_TABLE('flow_incr_part_input'); + ++-------------------------------------------+ +| ADMIN FLUSH_TABLE('flow_incr_part_input') | ++-------------------------------------------+ +| 0 | ++-------------------------------------------+ + +-- ==== Phase 3: insert new delta across both partitions, same time window ==== +INSERT INTO flow_incr_part_input VALUES + (2, 30, '2024-01-01 00:00:15'), + (3, 40, '2024-01-01 00:00:45'); + +Affected Rows: 2 + +-- ==== Phase 4: flush flow again (incremental read) ==== +-- The flow must only read the new memtable delta from both regions and merge +-- with the existing sink aggregate. If it mistakenly re-reads the SST, the +-- result will be inflated (initial data counted twice). +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_part'); + ++------------------------------------+ +| ADMIN FLUSH_FLOW('flow_incr_part') | ++------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------+ + +SELECT total, min_n, max_n, time_window FROM flow_incr_part_sink ORDER BY time_window; + ++-------+-------+-------+---------------------+ +| total | min_n | max_n | time_window | ++-------+-------+-------+---------------------+ +| 100 | 10 | 40 | 2024-01-01T00:00:00 | ++-------+-------+-------+---------------------+ + +-- Clean up +DROP FLOW flow_incr_part; + +Affected Rows: 0 + +DROP TABLE flow_incr_part_input; + +Affected Rows: 0 + +DROP TABLE flow_incr_part_sink; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_incremental_partitioned.sql b/tests/cases/standalone/common/flow/flow_incremental_partitioned.sql new file mode 100644 index 0000000000..234c9b9085 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_incremental_partitioned.sql @@ -0,0 +1,61 @@ +-- Validate that a flow performing an incremental aggregate read on a +-- partitioned source table (multiple regions) only reads memtable data +-- and does NOT re-read source rows that have already been flushed to SST. +CREATE TABLE flow_incr_part_input ( + host_id INT, + n INT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(host_id) +) +PARTITION ON COLUMNS (host_id) ( + host_id < 3, + host_id >= 3 +) +WITH ( + append_mode = 'true' +); + +CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink AS +SELECT + sum(n) AS total, + min(n) AS min_n, + max(n) AS max_n, + date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window +FROM + flow_incr_part_input +GROUP BY + time_window; + +-- ==== Phase 1: initial insert across both partitions ==== +INSERT INTO flow_incr_part_input VALUES + (1, 10, '2024-01-01 00:00:00'), + (4, 20, '2024-01-01 00:00:30'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_part'); + +SELECT total, min_n, max_n, time_window FROM flow_incr_part_sink ORDER BY time_window; + +-- ==== Phase 2: flush source table to SST ==== +-- Move already checkpointed source rows into SST so the next incremental run +-- must skip them. +ADMIN FLUSH_TABLE('flow_incr_part_input'); + +-- ==== Phase 3: insert new delta across both partitions, same time window ==== +INSERT INTO flow_incr_part_input VALUES + (2, 30, '2024-01-01 00:00:15'), + (3, 40, '2024-01-01 00:00:45'); + +-- ==== Phase 4: flush flow again (incremental read) ==== +-- The flow must only read the new memtable delta from both regions and merge +-- with the existing sink aggregate. If it mistakenly re-reads the SST, the +-- result will be inflated (initial data counted twice). +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_incr_part'); + +SELECT total, min_n, max_n, time_window FROM flow_incr_part_sink ORDER BY time_window; + +-- Clean up +DROP FLOW flow_incr_part; +DROP TABLE flow_incr_part_input; +DROP TABLE flow_incr_part_sink;