From f6f56e6bc587b25309fa72ae7133151f651fb75e Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 12 Jun 2026 21:27:12 +0800 Subject: [PATCH] fix!: fence scoped flow repair snapshots (#8277) * fix: fence scoped flow repair snapshots Signed-off-by: discord9 * test: trim duplicate flow task tests Signed-off-by: discord9 * test: moreless Signed-off-by: discord9 * refactor: simplify flow query failure fallback Signed-off-by: discord9 * test: update need eval interval Signed-off-by: discord9 * docs: for helper fn Signed-off-by: discord9 * chore: less test Signed-off-by: discord9 * refactor: per review Signed-off-by: discord9 * test: update for eval interval Signed-off-by: discord9 * fix: consume dirty windows after successful query Signed-off-by: discord9 * test: rm useless tests Signed-off-by: discord9 * test: standalone seq&rm dead if Signed-off-by: discord9 * chore: move to pending window instead Signed-off-by: discord9 * chore: mark full also call abandon_fenced_repair Signed-off-by: discord9 * chore: instant for pending fenced repair Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/flow/src/batching_mode/checkpoint.rs | 26 + src/flow/src/batching_mode/engine.rs | 76 ++ src/flow/src/batching_mode/frontend_client.rs | 46 +- src/flow/src/batching_mode/state.rs | 246 ++++- src/flow/src/batching_mode/task.rs | 254 ++--- src/flow/src/batching_mode/task/ckpt.rs | 198 +++- src/flow/src/batching_mode/task/inc.rs | 26 +- src/flow/src/batching_mode/task/test.rs | 867 +++++++++++++++--- tests-integration/src/grpc/flight.rs | 21 + .../common/flow/flow_advance_ttl.result | 4 +- .../common/flow/flow_advance_ttl.sql | 6 +- .../standalone/common/flow/flow_basic.result | 18 +- .../standalone/common/flow/flow_basic.sql | 18 +- .../common/flow/flow_rebuild.result | 20 +- .../standalone/common/flow/flow_rebuild.sql | 20 +- .../common/flow/flow_user_guide.result | 6 +- .../common/flow/flow_user_guide.sql | 8 +- .../standalone/common/flow/flow_view.result | 4 +- .../standalone/common/flow/flow_view.sql | 4 +- 19 files changed, 1499 insertions(+), 369 deletions(-) diff --git a/src/flow/src/batching_mode/checkpoint.rs b/src/flow/src/batching_mode/checkpoint.rs index 7341d3d9e7..11be951a9a 100644 --- a/src/flow/src/batching_mode/checkpoint.rs +++ b/src/flow/src/batching_mode/checkpoint.rs @@ -16,6 +16,7 @@ use crate::batching_mode::state::CheckpointMode; pub(super) const CHECKPOINT_DECISION_ADVANCE: &str = "advance"; pub(super) const CHECKPOINT_DECISION_FALLBACK: &str = "fallback"; +pub(super) const CHECKPOINT_DECISION_CONTINUE_REPAIR: &str = "continue_repair"; pub(super) const CHECKPOINT_REASON_NONE: &str = "none"; /// Why the task fell back to full snapshot mode. @@ -34,9 +35,16 @@ pub(super) enum FlowQueryFallbackReason { /// The datanode detected a stale incremental cursor and the Flow /// must recompute from scratch. StaleCursor, + /// A fenced repair chunk tried to use a snapshot upper bound that the + /// storage engine can no longer enforce, so the current repair high must + /// be abandoned and rebound by a fresh scoped full snapshot repair. + SnapshotFenceExpired, /// A non-stale-cursor query failure; the Flow resets to full snapshot /// to avoid cascading errors. IncrementalQueryFailure, + /// A non-incremental query failed while the task was already in full + /// snapshot or scoped repair mode. + QueryFailure, /// Incremental mode has been permanently disabled for this Flow /// (e.g. because the query shape is not incrementally safe). IncrementalDisabled, @@ -49,7 +57,9 @@ impl FlowQueryFallbackReason { Self::IncompleteRegionWatermark => "incomplete_region_watermark", Self::DirtyBacklogPending => "dirty_backlog_pending", Self::StaleCursor => "stale_cursor", + Self::SnapshotFenceExpired => "snapshot_fence_expired", Self::IncrementalQueryFailure => "incremental_query_failure", + Self::QueryFailure => "query_failure", Self::IncrementalDisabled => "incremental_disabled", } } @@ -77,6 +87,14 @@ pub(super) enum FlowCheckpointDecision { participating_regions: usize, watermarks: usize, }, + /// FullSnapshot stayed in full snapshot mode because a scoped base repair + /// found additional dirty windows that may be concurrent with the returned + /// high watermark. These windows must be repaired under the fixed high + /// before checkpoints can advance. + ContinuedFencedRepair { + pending_windows: usize, + watermarks: usize, + }, /// Any mode → FullSnapshot. /// /// Watermark information was incomplete, a participating region was @@ -96,6 +114,13 @@ impl FlowCheckpointDecision { checkpoint_mode_label(CheckpointMode::FullSnapshot) } Self::AdvancedIncremental { .. } => checkpoint_mode_label(CheckpointMode::Incremental), + // Fenced repair is intentionally a FullSnapshot sub-state, not a + // third top-level checkpoint mode, so metrics keep the + // `full_snapshot` mode label while the decision label carries + // `continue_repair`. + Self::ContinuedFencedRepair { .. } => { + checkpoint_mode_label(CheckpointMode::FullSnapshot) + } Self::FallbackToFullSnapshot { previous_mode, .. } => { checkpoint_mode_label(previous_mode) } @@ -107,6 +132,7 @@ impl FlowCheckpointDecision { Self::AdvancedFromFullSnapshot { .. } | Self::AdvancedIncremental { .. } => { CHECKPOINT_DECISION_ADVANCE } + Self::ContinuedFencedRepair { .. } => CHECKPOINT_DECISION_CONTINUE_REPAIR, Self::FallbackToFullSnapshot { .. } => CHECKPOINT_DECISION_FALLBACK, } } diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 319ddcf2e7..bd15654b5e 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -458,6 +458,22 @@ impl BatchingEngine { .is_some_and(|value| value.eq_ignore_ascii_case("true")) } + /// SQL flows without a usable time-window expression can only run as an + /// explicit full-query flow, so require `EVAL INTERVAL` at creation time. + fn ensure_sql_flow_has_twe_or_eval_interval( + eval_interval: Option, + has_time_window_expr: bool, + ) -> Result<(), Error> { + ensure!( + eval_interval.is_some() || has_time_window_expr, + InvalidQuerySnafu { + reason: "SQL batching flow without a time-window expression must specify EVAL INTERVAL to run as an explicit full-query flow" + .to_string(), + } + ); + Ok(()) + } + fn ensure_incremental_source_append_only( batch_opts: &BatchingModeOptions, table_name: &[String; 3], @@ -609,6 +625,10 @@ impl BatchingEngine { .unwrap_or("None".to_string()) ); + if !is_tql { + Self::ensure_sql_flow_has_twe_or_eval_interval(eval_interval, phy_expr.is_some())?; + } + let task_args = TaskArgs { flow_id, query: &sql, @@ -1035,6 +1055,62 @@ mod tests { )); } + #[test] + fn test_sql_flow_requires_time_window_or_eval_interval() { + BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(None, true) + .expect("SQL flow with a time-window expression should be accepted"); + BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(Some(10), false).expect( + "SQL flow with EVAL INTERVAL should be accepted as an explicit full-query flow", + ); + + let err = BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(None, false) + .expect_err("SQL flow without a time-window expression or EVAL INTERVAL should fail"); + assert!(matches!(err, Error::InvalidQuery { .. }), "{err}"); + assert!( + err.to_string().contains("must specify EVAL INTERVAL"), + "{err}" + ); + } + + #[tokio::test] + async fn test_complex_sql_without_eval_interval_is_rejected_as_no_twe() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let plan = sql_to_df_plan( + ctx.clone(), + query_engine.clone(), + r#" +SELECT + l.number, + date_bin('5 minutes', l.ts) AS time_window +FROM numbers_with_ts l +JOIN numbers_with_ts r ON l.number = r.number +GROUP BY l.number, time_window +"#, + true, + ) + .await + .unwrap(); + + let (_, time_window_expr, _, _) = find_time_window_expr( + &plan, + query_engine.engine_state().catalog_manager().clone(), + ctx, + ) + .await + .unwrap(); + assert!( + time_window_expr.is_none(), + "complex SQL should be classified as having no safe TWE" + ); + + BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(Some(10), false) + .expect("complex SQL can run as an explicit full-query flow when EVAL INTERVAL is set"); + let err = BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(None, false) + .expect_err("complex SQL without EVAL INTERVAL should fail creation"); + assert!(matches!(err, Error::InvalidQuery { .. }), "{err}"); + } + #[test] fn test_incremental_source_append_only_enforcement() { let table_name = [ diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 2f1e4b73b4..aa94b91b2a 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -15,7 +15,7 @@ //! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user use std::collections::HashMap; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{Arc, Mutex, RwLock, Weak}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; @@ -25,7 +25,6 @@ use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config}; use common_meta::peer::{Peer, PeerDiscovery}; use common_query::{Output, OutputData}; -use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; use common_telemetry::warn; use futures::stream::{FuturesUnordered, StreamExt}; use meta_client::client::MetaClient; @@ -388,12 +387,15 @@ impl FrontendClient { } } + /// Execute a flow query and return terminal metrics. `snapshot_seqs` are + /// optional read upper bounds used only by snapshot-fenced repair chunks. pub(crate) async fn query_with_terminal_metrics( &self, catalog: &str, schema: &str, request: QueryRequest, extensions: &[(&str, &str)], + snapshot_seqs: &HashMap, peer_desc: &mut Option, ) -> Result { let flow_extensions = build_flow_extensions(extensions)?; @@ -415,7 +417,7 @@ impl FrontendClient { request, &hints, extensions, - &Default::default(), + snapshot_seqs, ) .await .map_err(BoxedError::new) @@ -437,6 +439,7 @@ impl FrontendClient { .current_catalog(catalog.to_string()) .current_schema(schema.to_string()) .extensions(extensions_map) + .snapshot_seqs(Arc::new(RwLock::new(snapshot_seqs.clone()))) .build(); let ctx = Arc::new(ctx); let database_client = { @@ -462,7 +465,7 @@ impl FrontendClient { .do_query(Request::Query(request), ctx.clone()) .await .map(|output| { - wrap_standalone_output_with_terminal_metrics(output, &flow_extensions, &ctx) + wrap_standalone_output_with_terminal_metrics(output, &flow_extensions) }) .map_err(BoxedError::new) .context(ExternalSnafu) @@ -570,7 +573,6 @@ fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result OutputWithMetrics { let should_collect_region_watermark = flow_extensions.should_collect_region_watermark(); let terminal_metrics = @@ -580,7 +582,6 @@ fn wrap_standalone_output_with_terminal_metrics( .plan .clone() .and_then(terminal_recordbatch_metrics_from_plan) - .or_else(|| terminal_recordbatch_metrics_from_snapshots(query_ctx)) } else { None }; @@ -591,28 +592,6 @@ fn wrap_standalone_output_with_terminal_metrics( result } -fn terminal_recordbatch_metrics_from_snapshots( - query_ctx: &QueryContextRef, -) -> Option { - let mut region_watermarks = query_ctx - .snapshots() - .into_iter() - .map(|(region_id, watermark)| RegionWatermarkEntry { - region_id, - watermark: Some(watermark), - }) - .collect::>(); - if region_watermarks.is_empty() { - return None; - } - - region_watermarks.sort_by_key(|entry| entry.region_id); - Some(RecordBatchMetrics { - region_watermarks, - ..Default::default() - }) -} - /// Describe a peer of frontend #[derive(Debug, Default, Clone)] pub(crate) enum PeerDesc { @@ -790,6 +769,8 @@ mod tests { ctx: QueryContextRef, ) -> std::result::Result { assert_eq!(ctx.extension("flow.return_region_seq"), Some("true")); + assert_eq!(ctx.get_snapshot(1), Some(10)); + assert_eq!(ctx.get_snapshot(2), Some(20)); ctx.set_snapshot(42, 99); Ok(Output::new_with_affected_rows(1)) } @@ -903,6 +884,7 @@ mod tests { query: Some(Query::Sql("select 1".to_string())), }, &[], + &HashMap::new(), &mut peer_desc, ) .await @@ -940,6 +922,7 @@ mod tests { query: Some(Query::Sql("insert into t select 1".to_string())), }, &[("flow.return_region_seq", "true")], + &HashMap::new(), &mut peer_desc, ) .await @@ -965,6 +948,7 @@ mod tests { query: Some(Query::Sql("insert into t select * from src".to_string())), }, &[("flow.return_region_seq", "true")], + &HashMap::from([(1, 10), (2, 20)]), &mut peer_desc, ) .await @@ -972,10 +956,7 @@ mod tests { assert!(matches!(peer_desc, Some(PeerDesc::Standalone))); assert!(result.metrics.is_ready()); - assert_eq!( - result.region_watermark_map(), - Some(HashMap::from([(42, 99)])) - ); + assert_eq!(result.region_watermark_map(), None); } #[tokio::test] @@ -993,6 +974,7 @@ mod tests { query: Some(Query::Sql("select 1".to_string())), }, &[("flow.return_region_seq", "not-a-bool")], + &HashMap::new(), &mut peer_desc, ) .await diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index c5fcc74143..6401adc07e 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -51,6 +51,7 @@ pub struct TaskState { /// mapping of `start -> end` and non-overlapping pub(crate) dirty_time_windows: DirtyTimeWindows, checkpoint_mode: CheckpointMode, + pending_fenced_repair: Option, /// Region id -> last consumed watermark sequence. Incremental scans use /// this as the next lower sequence bound for each source region. checkpoints: BTreeMap, @@ -81,6 +82,7 @@ impl TaskState { last_exec_time_millis: None, dirty_time_windows, checkpoint_mode: CheckpointMode::FullSnapshot, + pending_fenced_repair: None, checkpoints: Default::default(), incremental_disabled: false, exec_state: ExecState::Idle, @@ -112,6 +114,12 @@ impl TaskState { &self.checkpoints } + /// Returns the in-progress fenced repair, if the task is repairing dirty + /// windows under a frozen full-snapshot high watermark. + pub fn pending_fenced_repair(&self) -> Option<&FencedRepair> { + self.pending_fenced_repair.as_ref() + } + pub fn is_incremental_disabled(&self) -> bool { self.incremental_disabled } @@ -123,17 +131,25 @@ impl TaskState { self.mark_full_snapshot(); } + /// Move back to top-level FullSnapshot mode. If a fenced repair is active, + /// restore its not-yet-in-flight pending windows to the live dirty queue so + /// the moved backlog is not lost. pub fn mark_full_snapshot(&mut self) { - self.checkpoint_mode = CheckpointMode::FullSnapshot; + self.abandon_fenced_repair(); } + /// Replace full-snapshot checkpoints with a complete watermark proof. + /// Clears fenced repair state and enters Incremental unless disabled. pub fn advance_checkpoints(&mut self, watermark_map: HashMap) { self.checkpoints = watermark_map.into_iter().collect(); + self.pending_fenced_repair = None; if !self.incremental_disabled { self.checkpoint_mode = CheckpointMode::Incremental; } } + /// Advance only the participating regions for an incremental delta query. + /// This also clears any stale fenced repair sub-state. pub fn advance_incremental_checkpoints_with_participation( &mut self, participating_regions: &BTreeSet, @@ -147,8 +163,140 @@ impl TaskState { if !self.incremental_disabled { self.checkpoint_mode = CheckpointMode::Incremental; } + self.pending_fenced_repair = None; } + /// Start repairing the current live dirty windows under a frozen high `H`. + /// The current live backlog is moved into the fenced repair so successful + /// chunks are consumed from that backlog. New post-`H` dirty signals can + /// still arrive in the live queue while the fenced repair is active. + pub fn start_fenced_repair(&mut self, high: BTreeMap) -> Option<&FencedRepair> { + if self.dirty_time_windows.is_empty() { + self.pending_fenced_repair = None; + return None; + } + + let pending_windows = self.dirty_time_windows.clone(); + self.dirty_time_windows.clean(); + self.pending_fenced_repair = Some(FencedRepair { + high, + pending_windows, + }); + self.checkpoint_mode = CheckpointMode::FullSnapshot; + self.pending_fenced_repair.as_ref() + } + + /// Finish the fenced repair and promote the frozen high watermark to the + /// checkpoint map. Incremental-disabled flows stay in FullSnapshot mode. + pub fn finish_fenced_repair(&mut self) -> Option> { + let repair = self.pending_fenced_repair.take()?; + self.checkpoints = repair.high; + if !self.incremental_disabled { + self.checkpoint_mode = CheckpointMode::Incremental; + } + Some(self.checkpoints.clone()) + } + + /// Abandon the current fenced repair and restore all not-yet-in-flight + /// pending windows to the live dirty queue for a fresh scoped repair. + pub fn abandon_fenced_repair(&mut self) -> bool { + self.checkpoint_mode = CheckpointMode::FullSnapshot; + let Some(repair) = self.pending_fenced_repair.take() else { + return false; + }; + + self.dirty_time_windows + .add_dirty_windows(&repair.pending_windows); + true + } + + /// Restore a scoped query's windows after a failed or unproven run. During + /// an active fenced repair this requeues into `pending_windows`; otherwise + /// it restores to the live dirty queue. + pub fn restore_scoped_windows(&mut self, filter: &FilterExprInfo) { + if let Some(repair) = self.pending_fenced_repair.as_mut() { + repair + .pending_windows + .add_windows(filter.time_ranges.clone()); + return; + } + + self.dirty_time_windows + .add_windows(filter.time_ranges.clone()); + } + + /// Generate the next scoped filter from the fenced-repair queue when active; + /// otherwise consume windows from the live dirty queue. + pub fn gen_scoped_filter_exprs( + &mut self, + col_name: &str, + expire_lower_bound: Option, + window_size: chrono::Duration, + window_cnt: usize, + flow_id: FlowId, + task_ctx: Option<&BatchingTask>, + ) -> Result, Error> { + if let Some(repair) = self.pending_fenced_repair.as_mut() { + let expr = repair.pending_windows.gen_filter_exprs( + col_name, + expire_lower_bound, + window_size, + window_cnt, + flow_id, + task_ctx, + )?; + if expr.is_some() || !repair.pending_windows.is_empty() { + return Ok(expr); + } + + // All pending repair windows may have expired during merge. Clear + // the empty repair so this call can fall back to live dirty windows + // instead of routing future executions to an empty queue forever. + self.pending_fenced_repair = None; + } + + self.dirty_time_windows.gen_filter_exprs( + col_name, + expire_lower_bound, + window_size, + window_cnt, + flow_id, + task_ctx, + ) + } + + /// Returns true only when the query result's participating regions and + /// terminal watermarks exactly match the fenced repair's frozen high `H`. + pub fn fenced_repair_watermarks_match_high( + &self, + participating_regions: &BTreeSet, + watermark_map: &HashMap, + ) -> bool { + let Some(repair) = self.pending_fenced_repair.as_ref() else { + return false; + }; + + !participating_regions.is_empty() + && participating_regions.len() == repair.high.len() + && watermark_map.len() == repair.high.len() + && participating_regions.iter().all(|region_id| { + repair + .high + .get(region_id) + .zip(watermark_map.get(region_id)) + .is_some_and(|(high, watermark)| high == watermark) + }) + } + + /// Whether the active fenced repair has drained all pending windows. + pub fn fenced_repair_pending_is_empty(&self) -> bool { + self.pending_fenced_repair + .as_ref() + .is_some_and(|repair| repair.pending_windows.is_empty()) + } + + /// Full-snapshot checkpoint advances require a watermark for every region + /// that participated in the query. pub fn can_advance_full_snapshot_checkpoints( &self, participating_regions: &BTreeSet, @@ -161,6 +309,8 @@ impl TaskState { .all(|region_id| watermark_map.contains_key(region_id)) } + /// Incremental advances are limited to participating regions whose returned + /// watermark is not older than the stored checkpoint. pub fn can_advance_incremental_checkpoints_with_participation( &self, participating_regions: &BTreeSet, @@ -191,6 +341,9 @@ impl TaskState { /// /// if current the dirty time range is longer than one query can handle, /// execute immediately to faster clean up dirty time windows. + /// Active fenced repairs also execute immediately while pending windows + /// remain: the current backlog has moved out of live dirty windows and into + /// `pending_fenced_repair.pending_windows`. /// /// If `prefer_short_incremental_cadence` is true, run incremental queries /// more often when there is no large dirty backlog. This only reduces the @@ -214,6 +367,18 @@ impl TaskState { next_duration }; + if self + .pending_fenced_repair + .as_ref() + .is_some_and(|repair| !repair.pending_windows().is_empty()) + { + debug!( + "Flow id = {}, active fenced repair still has pending windows, execute immediately", + flow_id, + ); + return Instant::now(); + } + let cur_dirty_window_size = self.dirty_time_windows.window_size(); // compute how much time range can be handled in one query let max_query_update_range = (*time_window_size) @@ -721,6 +886,26 @@ pub enum CheckpointMode { Incremental, } +/// Dirty windows that must be repaired under a frozen full-snapshot watermark. +/// This is a FullSnapshot sub-state, not a separate checkpoint mode. +#[derive(Debug, Clone)] +pub struct FencedRepair { + high: BTreeMap, + pending_windows: DirtyTimeWindows, +} + +impl FencedRepair { + /// Frozen high watermark `H` used as the snapshot upper bound for chunks. + pub fn high(&self) -> &BTreeMap { + &self.high + } + + /// Dirty windows still waiting to be repaired under `high`. + pub fn pending_windows(&self) -> &DirtyTimeWindows { + &self.pending_windows + } +} + /// Filter Expression's information #[derive(Debug, Clone)] pub struct FilterExprInfo { @@ -1029,6 +1214,38 @@ mod test { ); } + #[test] + fn test_mark_full_snapshot_restores_pending_fenced_repair_windows() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105))); + + state + .start_fenced_repair(BTreeMap::from([(1_u64, 10_u64)])) + .unwrap(); + assert!(state.dirty_time_windows.is_empty()); + assert_eq!( + state + .pending_fenced_repair() + .unwrap() + .pending_windows() + .len(), + 2 + ); + + state.mark_full_snapshot(); + + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.pending_fenced_repair().is_none()); + assert_eq!(state.dirty_time_windows.len(), 2); + } + #[test] fn test_disable_incremental_persists_full_snapshot_mode() { let query_ctx = QueryContext::arc(); @@ -1339,6 +1556,33 @@ mod test { ); } + #[test] + fn test_pending_fenced_repair_schedules_immediately() { + let mut state = state_with_past_update(Duration::from_secs(10)); + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + state + .start_fenced_repair(BTreeMap::from([(1_u64, 10_u64)])) + .unwrap(); + assert!(state.dirty_time_windows.is_empty()); + assert!(!state.fenced_repair_pending_is_empty()); + + let result = state.get_next_start_query_time( + 1, + &Some(Duration::from_secs(60)), + Duration::from_secs(5), + None, + 20, + false, + ); + + assert!( + result <= Instant::now(), + "pending fenced repair backlog should schedule immediately" + ); + } + #[test] fn test_incremental_disabled_ignores_short_cadence() { // When prefer_short_incremental_cadence is true but the dirty backlog is diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 3be41e0786..7ffcded1b1 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -161,7 +161,42 @@ pub struct TaskArgs<'a> { pub struct PlanInfo { pub plan: LogicalPlan, pub dirty_restore: DirtyRestore, - pub can_advance_checkpoints: bool, + pub coverage: QueryCoverage, +} + +#[derive(Clone)] +pub enum QueryCoverage { + /// Explicit full-query snapshot coverage, e.g. TQL or evaluation-interval + /// SQL flows whose plan shape cannot be safely dirty-window pruned. This + /// must not be used as an implicit recovery path for scoped repair or an + /// unsafe incremental rewrite fallback. + UnfilteredFull, + /// Scoped full-snapshot repair over the current dirty windows. A successful + /// result may start a fenced repair if new dirty windows appeared meanwhile. + ScopedBaseRepair, + /// A chunk of windows being repaired under the frozen high-watermark `H`. + /// The `high` map is sent as snapshot read bounds and must be matched by + /// the returned terminal watermarks before checkpoints can advance. + FencedRepairChunk { high: BTreeMap }, + /// Incremental delta query over `(checkpoint, scan-open snapshot]`. + IncrementalDelta, +} + +impl QueryCoverage { + /// Whether this query should use incremental scan extensions and + /// incremental checkpoint advancement rules. + fn is_incremental_delta(&self) -> bool { + matches!(self, Self::IncrementalDelta) + } + + /// Snapshot upper bounds requested from the storage layer. Only fenced + /// repair chunks carry bounds; all other coverage relies on normal scans. + fn snapshot_seqs(&self) -> HashMap { + match self { + Self::FencedRepairChunk { high } => high.iter().map(|(k, v)| (*k, *v)).collect(), + _ => HashMap::new(), + } + } } pub enum DirtyRestore { @@ -379,7 +414,8 @@ impl BatchingTask { .execute_logical_plan_unlocked( frontend_client, &new_query.plan, - new_query.can_advance_checkpoints, + &new_query.dirty_restore, + &new_query.coverage, ) .await; if res.is_err() { @@ -466,7 +502,7 @@ impl BatchingTask { let insert_into_info = PlanInfo { plan, dirty_restore: new_query.dirty_restore, - can_advance_checkpoints: new_query.can_advance_checkpoints, + coverage: new_query.coverage, }; let insert_into = match insert_into_info @@ -486,7 +522,7 @@ impl BatchingTask { Ok(Some(PlanInfo { plan: insert_into, dirty_restore: insert_into_info.dirty_restore, - can_advance_checkpoints: insert_into_info.can_advance_checkpoints, + coverage: insert_into_info.coverage, })) } @@ -508,7 +544,8 @@ impl BatchingTask { &self, frontend_client: &Arc, plan: &LogicalPlan, - can_advance_checkpoints: bool, + dirty_restore: &DirtyRestore, + coverage: &QueryCoverage, ) -> Result, Error> { let instant = Instant::now(); let flow_id = self.config.flow_id; @@ -540,16 +577,24 @@ impl BatchingTask { // For incremental-mode SQL queries, attempt to rewrite the delta aggregate // plan into a safe delta-LEFT-JOIN-sink form before deciding on extensions. - let incremental_plan = if can_advance_checkpoints { + let incremental_plan = if coverage.is_incremental_delta() { self.prepare_plan_for_incremental(&plan).await? } else { None }; let incremental_safe = incremental_plan.is_some(); + if coverage.is_incremental_delta() && !incremental_safe { + warn!( + "Flow {flow_id} skipped unsafe incremental delta fallback; \ + restored dirty signal instead of executing an unfiltered full snapshot" + ); + self.restore_dirty_windows(dirty_restore); + return Ok(None); + } let plan = incremental_plan.unwrap_or_else(|| plan.clone()); let extensions = self - .build_flow_query_extensions(incremental_safe, can_advance_checkpoints) + .build_flow_query_extensions(incremental_safe, coverage.is_incremental_delta()) .await?; let extension_refs = extensions .iter() @@ -633,8 +678,16 @@ impl BatchingTask { } }; + let snapshot_seqs = coverage.snapshot_seqs(); frontend_client - .query_with_terminal_metrics(catalog, schema, req, &extension_refs, &mut peer_desc) + .query_with_terminal_metrics( + catalog, + schema, + req, + &extension_refs, + &snapshot_seqs, + &mut peer_desc, + ) .await }; @@ -650,8 +703,8 @@ impl BatchingTask { ); let decision = { let mut state = self.state.write().unwrap(); - let reason = Self::query_failure_reason(err); - Self::apply_query_failure_to_state(&mut state, elapsed, reason) + let reason = Self::query_failure_reason(err, coverage); + Self::apply_query_failure_to_state(&mut state, elapsed, coverage, reason) }; if let Some(decision) = decision { Self::record_checkpoint_decision(flow_id, decision); @@ -680,16 +733,11 @@ impl BatchingTask { METRIC_FLOW_ROWS .with_label_values(&[format!("{}-out-batching", flow_id).as_str()]) .inc_by(affected_rows as _); - { + let decision = { let mut state = self.state.write().unwrap(); - let decision = Self::apply_query_result_to_state( - &mut state, - &res, - elapsed, - can_advance_checkpoints, - ); - Self::record_checkpoint_decision(flow_id, decision); - } + Self::apply_query_result_to_state(&mut state, &res, elapsed, coverage) + }; + Self::record_checkpoint_decision(flow_id, decision); Ok(Some((affected_rows, elapsed))) } @@ -697,8 +745,8 @@ impl BatchingTask { /// Restore dirty windows consumed by a failed query so they are retried on /// the next execution. /// - fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) { - match &query.dirty_restore { + fn restore_dirty_windows(&self, dirty_restore: &DirtyRestore) { + match dirty_restore { DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter), DirtyRestore::Unscoped(dirty_windows) => self .state @@ -709,14 +757,20 @@ impl BatchingTask { } } - fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) { - self.state - .write() - .unwrap() - .dirty_time_windows - .add_windows(filter.time_ranges.clone()); + /// Restore the dirty signal for a plan that was generated but failed before + /// it could prove any checkpoint advancement. + fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) { + self.restore_dirty_windows(&query.dirty_restore); } + /// Restore scoped windows through `TaskState` so fenced repair can decide + /// whether they go back to pending repair or live dirty state. + fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) { + self.state.write().unwrap().restore_scoped_windows(filter); + } + + /// Run a fallible scoped operation and restore its consumed windows if plan + /// generation/rewrite fails before execution. fn restore_scoped_dirty_windows_on_err( &self, filter: &FilterExprInfo, @@ -727,6 +781,8 @@ impl BatchingTask { }) } + /// Restore an unscoped dirty signal consumed by an explicit full-query or + /// incremental-delta plan. fn restore_unscoped_dirty_windows(&self, dirty_windows: &DirtyTimeWindows) { self.state .write() @@ -735,6 +791,8 @@ impl BatchingTask { .add_dirty_windows(dirty_windows); } + /// Run a fallible unscoped operation and restore the dirty signal if it + /// fails before a query is executed. fn restore_unscoped_dirty_windows_on_err( &self, dirty_windows: &DirtyTimeWindows, @@ -745,6 +803,8 @@ impl BatchingTask { }) } + /// Consume the live dirty signal for an unscoped query while keeping a copy + /// that can be restored if planning or execution fails. fn drain_dirty_windows_signal(&self) -> (bool, DirtyTimeWindows) { let mut state = self.state.write().unwrap(); let dirty_windows_to_restore = state.dirty_time_windows.clone(); @@ -754,6 +814,8 @@ impl BatchingTask { } #[allow(clippy::too_many_arguments)] + /// Build an unfiltered plan for explicit full-query or incremental-delta + /// coverage. Callers pass the consumed dirty signal for failure restoration. async fn gen_unfiltered_plan_info( &self, engine: QueryEngineRef, @@ -763,6 +825,7 @@ impl BatchingTask { allow_partial: bool, dirty_windows_to_restore: DirtyTimeWindows, retention_filter: Option<(&str, Timestamp, &'static str)>, + coverage: QueryCoverage, ) -> Result { let mut plan = self.restore_unscoped_dirty_windows_on_err( &dirty_windows_to_restore, @@ -801,10 +864,13 @@ impl BatchingTask { Ok(PlanInfo { plan, dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore), - can_advance_checkpoints: true, + coverage, }) } + #[allow(clippy::too_many_arguments)] + /// Build an unfiltered plan only when the live dirty signal was present; + /// otherwise skip this round without querying. async fn gen_unfiltered_plan_info_if_dirty( &self, engine: QueryEngineRef, @@ -813,6 +879,7 @@ impl BatchingTask { primary_key_indices: &[usize], allow_partial: bool, retention_filter: Option<(&str, Timestamp, &'static str)>, + coverage: QueryCoverage, ) -> Result, Error> { let (is_dirty, dirty_windows_to_restore) = self.drain_dirty_windows_signal(); if !is_dirty { @@ -828,6 +895,7 @@ impl BatchingTask { allow_partial, dirty_windows_to_restore, retention_filter, + coverage, ) .await .map(Some) @@ -973,6 +1041,8 @@ impl BatchingTask { create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type) } + /// Incremental delta scans are unfiltered by dirty windows; the sequence + /// range, not a time predicate, defines source correctness. fn should_use_unfiltered_incremental_delta(&self) -> bool { let state = self.state.read().unwrap(); state.checkpoint_mode() == CheckpointMode::Incremental @@ -980,14 +1050,8 @@ impl BatchingTask { && matches!(self.config.query_type, QueryType::Sql) } - fn should_use_unfiltered_full_snapshot_seeding(&self) -> bool { - let state = self.state.read().unwrap(); - state.checkpoint_mode() == CheckpointMode::FullSnapshot - && !state.is_incremental_disabled() - && matches!(self.config.query_type, QueryType::Sql) - } - - /// will merge and use the first ten time window in query + /// Generate the next plan and classify its coverage so checkpoint handling + /// knows whether it is full-query, scoped repair, fenced repair, or delta. async fn gen_query_with_time_window( &self, engine: QueryEngineRef, @@ -1016,49 +1080,44 @@ impl BatchingTask { .map(|expr| expr.eval(low_bound)) .transpose()?; - let (expire_lower_bound, expire_upper_bound) = - match (expire_time_window_bound, &self.config.query_type) { - (Some((Some(l), Some(u))), QueryType::Sql) => (l, u), - (None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => { - // if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns) - // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason - debug!( - "Flow id = {:?}, no time window, using the same query", + let (expire_lower_bound, expire_upper_bound) = match ( + expire_time_window_bound, + &self.config.query_type, + ) { + (Some((Some(l), Some(u))), QueryType::Sql) => (l, u), + (None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => { + return UnexpectedSnafu { + reason: format!( + "Flow id={} reached runtime without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it", self.config.flow_id - ); - // clean dirty time window too, this could be from create flow's check_execute - return self - .gen_unfiltered_plan_info_if_dirty( - engine, - query_ctx, - sink_table_schema.clone(), - primary_key_indices, - allow_partial, - None, - ) - .await; + ), } - _ => { - // Clean dirty windows for full-query/non-scoped paths, - // such as TQL or evaluation-interval SQL without a recognized - // time-window expression, that cannot use a time-window filter. - let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal(); + .fail(); + } + _ => { + // Explicit full-query flows (TQL and evaluation-interval SQL + // plans whose shape cannot be safely dirty-window pruned) are + // allowed to run as unfiltered full snapshots. This is distinct + // from using unfiltered full as a fallback after scoped repair or + // incremental rewrite failed. + let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal(); - let plan_info = self - .gen_unfiltered_plan_info( - engine, - query_ctx, - sink_table_schema.clone(), - primary_key_indices, - allow_partial, - dirty_windows_to_restore, - None, - ) - .await?; + let plan_info = self + .gen_unfiltered_plan_info( + engine, + query_ctx, + sink_table_schema.clone(), + primary_key_indices, + allow_partial, + dirty_windows_to_restore, + None, + QueryCoverage::UnfilteredFull, + ) + .await?; - return Ok(Some(plan_info)); - } - }; + return Ok(Some(plan_info)); + } + }; debug!( "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}", @@ -1086,31 +1145,6 @@ impl BatchingTask { ), })?; - if self.should_use_unfiltered_full_snapshot_seeding() { - // A full-snapshot query that can seed/refresh incremental - // checkpoints must not use dirty-window predicates. Rows can be - // written after dirty windows are drained but before the source scan - // snapshot opens; a stale dirty-window filter could exclude those - // rows while the returned watermark includes them, causing the next - // incremental read to skip them forever. Execute an unfiltered full - // snapshot instead, and keep dirty windows only as the scheduling and - // failure-restoration signal. - let retention_filter = self - .config - .expire_after - .map(|_| (col_name.as_str(), expire_lower_bound, "full-snapshot")); - return self - .gen_unfiltered_plan_info_if_dirty( - engine, - query_ctx, - sink_table_schema.clone(), - primary_key_indices, - allow_partial, - retention_filter, - ) - .await; - } - if self.should_use_unfiltered_incremental_delta() { // In incremental mode, source correctness is defined by the // per-region sequence range `(checkpoint, scan-open snapshot]`, not @@ -1133,15 +1167,16 @@ impl BatchingTask { primary_key_indices, allow_partial, retention_filter, + QueryCoverage::IncrementalDelta, ) .await; } - let (expr, can_advance_checkpoints) = { + let (expr, coverage) = { let mut state = self.state.write().unwrap(); let window_cnt = max_window_cnt .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query); - let expr = state.dirty_time_windows.gen_filter_exprs( + let expr = state.gen_scoped_filter_exprs( &col_name, Some(expire_lower_bound), window_size, @@ -1149,8 +1184,15 @@ impl BatchingTask { self.config.flow_id, Some(self), )?; - let can_advance_checkpoints = state.dirty_time_windows.is_empty(); - (expr, can_advance_checkpoints) + let repair_high = state + .pending_fenced_repair() + .map(|repair| repair.high().clone()); + let coverage = if let Some(high) = repair_high { + QueryCoverage::FencedRepairChunk { high } + } else { + QueryCoverage::ScopedBaseRepair + }; + (expr, coverage) }; let Some(expr) = expr else { @@ -1198,7 +1240,7 @@ impl BatchingTask { let info = PlanInfo { plan: new_plan.clone(), dirty_restore: DirtyRestore::Scoped(expr), - can_advance_checkpoints, + coverage, }; Ok(Some(info)) diff --git a/src/flow/src/batching_mode/task/ckpt.rs b/src/flow/src/batching_mode/task/ckpt.rs index 035d30a079..7966d314e1 100644 --- a/src/flow/src/batching_mode/task/ckpt.rs +++ b/src/flow/src/batching_mode/task/ckpt.rs @@ -24,73 +24,169 @@ use crate::batching_mode::checkpoint::{ FlowCheckpointDecision, FlowQueryFallbackReason, checkpoint_mode_label, }; use crate::batching_mode::state::{CheckpointMode, TaskState}; -use crate::batching_mode::task::BatchingTask; +use crate::batching_mode::task::{BatchingTask, QueryCoverage}; use crate::metrics::{ METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT, }; use crate::{Error, FlowId}; impl BatchingTask { - pub(super) fn query_failure_reason(err: &Error) -> FlowQueryFallbackReason { + /// Classify execution errors into checkpoint fallback reasons. A stale + /// snapshot fence is special only for fenced repair chunks. + pub(super) fn query_failure_reason( + err: &Error, + coverage: &QueryCoverage, + ) -> FlowQueryFallbackReason { if err.status_code() == StatusCode::RequestOutdated { - FlowQueryFallbackReason::StaleCursor - } else { + if matches!(coverage, QueryCoverage::FencedRepairChunk { .. }) { + FlowQueryFallbackReason::SnapshotFenceExpired + } else { + FlowQueryFallbackReason::StaleCursor + } + } else if matches!(coverage, QueryCoverage::IncrementalDelta) { FlowQueryFallbackReason::IncrementalQueryFailure + } else { + FlowQueryFallbackReason::QueryFailure } } + /// Apply the conservative state transition for a failed query. This never + /// advances checkpoints; it only decides whether to abandon repair state or + /// fall back from incremental mode. pub(super) fn apply_query_failure_to_state( state: &mut TaskState, elapsed: Duration, + coverage: &QueryCoverage, reason: FlowQueryFallbackReason, ) -> Option { state.after_query_exec(elapsed, false); let checkpoint_mode = state.checkpoint_mode(); - if checkpoint_mode == CheckpointMode::Incremental { - state.mark_full_snapshot(); - Some(FlowCheckpointDecision::FallbackToFullSnapshot { + if matches!(coverage, QueryCoverage::FencedRepairChunk { .. }) + && matches!(reason, FlowQueryFallbackReason::SnapshotFenceExpired) + { + // `abandon_fenced_repair()` restores the not-yet-in-flight pending + // windows from the stale repair back to live dirty windows. The + // currently executing chunk is restored separately by the outer + // failure path (`handle_executed_query_failure`), after this state + // transition has removed `pending_fenced_repair`, so it also lands + // in live dirty windows for the next scoped base repair. + state.abandon_fenced_repair(); + return Some(FlowCheckpointDecision::FallbackToFullSnapshot { previous_mode: checkpoint_mode, reason, - }) - } else { - None + }); } + + if checkpoint_mode == CheckpointMode::Incremental { + state.mark_full_snapshot(); + } + Some(FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason, + }) } + /// Apply checkpoint transitions for a successfully executed query using its + /// terminal watermark proof and declared coverage. pub(super) fn apply_query_result_to_state( state: &mut TaskState, res: &OutputWithMetrics, elapsed: Duration, - can_advance_checkpoints: bool, + coverage: &QueryCoverage, ) -> FlowCheckpointDecision { state.after_query_exec(elapsed, true); let checkpoint_mode = state.checkpoint_mode(); - if !can_advance_checkpoints { - state.mark_full_snapshot(); - return FlowCheckpointDecision::FallbackToFullSnapshot { - previous_mode: checkpoint_mode, - reason: FlowQueryFallbackReason::DirtyBacklogPending, - }; - } - if let (Some(participating_regions), Some(watermark_map)) = (res.participating_regions(), res.region_watermark_map()) { - let can_advance = match checkpoint_mode { - CheckpointMode::FullSnapshot => state - .can_advance_full_snapshot_checkpoints(&participating_regions, &watermark_map), - CheckpointMode::Incremental => state - .can_advance_incremental_checkpoints_with_participation( + let participating_region_count = participating_regions.len(); + let watermark_count = watermark_map.len(); + match coverage { + QueryCoverage::ScopedBaseRepair => { + if !state.can_advance_full_snapshot_checkpoints( &participating_regions, &watermark_map, - ), - }; + ) { + return FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason: FlowQueryFallbackReason::IncompleteRegionWatermark, + }; + } - if can_advance { - let participating_region_count = participating_regions.len(); - let watermark_count = watermark_map.len(); - match checkpoint_mode { - CheckpointMode::FullSnapshot => { + if state.is_incremental_disabled() { + return FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::IncrementalDisabled, + }; + } + + if state.dirty_time_windows.is_empty() { + state.advance_checkpoints(watermark_map); + FlowCheckpointDecision::AdvancedFromFullSnapshot { + participating_regions: participating_region_count, + watermarks: watermark_count, + } + } else if let Some(repair) = + state.start_fenced_repair(watermark_map.into_iter().collect()) + { + FlowCheckpointDecision::ContinuedFencedRepair { + pending_windows: repair.pending_windows().len(), + watermarks: repair.high().len(), + } + } else { + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason: FlowQueryFallbackReason::DirtyBacklogPending, + } + } + } + QueryCoverage::FencedRepairChunk { .. } => { + if !state + .fenced_repair_watermarks_match_high(&participating_regions, &watermark_map) + { + // A successful repair chunk whose terminal watermark + // differs from the frozen fence `H` cannot prove that + // continuing the same repair is safe. The pre-`H` + // repair work item already executed under the snapshot + // fence, so do not requeue it; restore only + // not-yet-in-flight pending windows, then start over + // with a fresh H. Later/post-`H` writes still rely on + // live dirty notifications. + state.abandon_fenced_repair(); + return FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason: FlowQueryFallbackReason::IncompleteRegionWatermark, + }; + } + + if state.fenced_repair_pending_is_empty() { + state.finish_fenced_repair(); + if state.is_incremental_disabled() { + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::IncrementalDisabled, + } + } else { + FlowCheckpointDecision::AdvancedFromFullSnapshot { + participating_regions: participating_region_count, + watermarks: watermark_count, + } + } + } else { + let repair = state + .pending_fenced_repair() + .expect("fenced repair exists after matching repair chunk watermark"); + FlowCheckpointDecision::ContinuedFencedRepair { + pending_windows: repair.pending_windows().len(), + watermarks: repair.high().len(), + } + } + } + QueryCoverage::UnfilteredFull => { + if state.can_advance_full_snapshot_checkpoints( + &participating_regions, + &watermark_map, + ) { state.advance_checkpoints(watermark_map); if state.is_incremental_disabled() { FlowCheckpointDecision::FallbackToFullSnapshot { @@ -103,8 +199,19 @@ impl BatchingTask { watermarks: watermark_count, } } + } else { + debug_assert_ne!(checkpoint_mode, CheckpointMode::Incremental); + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason: FlowQueryFallbackReason::IncompleteRegionWatermark, + } } - CheckpointMode::Incremental => { + } + QueryCoverage::IncrementalDelta => { + if state.can_advance_incremental_checkpoints_with_participation( + &participating_regions, + &watermark_map, + ) { state.advance_incremental_checkpoints_with_participation( &participating_regions, watermark_map, @@ -113,17 +220,22 @@ impl BatchingTask { participating_regions: participating_region_count, watermarks: watermark_count, } + } else { + state.mark_full_snapshot(); + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: checkpoint_mode, + reason: FlowQueryFallbackReason::IncompleteRegionWatermark, + } } } - } else { - state.mark_full_snapshot(); - FlowCheckpointDecision::FallbackToFullSnapshot { - previous_mode: checkpoint_mode, - reason: FlowQueryFallbackReason::IncompleteRegionWatermark, - } } } else { - state.mark_full_snapshot(); + if matches!(coverage, QueryCoverage::FencedRepairChunk { .. }) { + state.abandon_fenced_repair(); + } + if matches!(checkpoint_mode, CheckpointMode::Incremental) { + state.mark_full_snapshot(); + } FlowCheckpointDecision::FallbackToFullSnapshot { previous_mode: checkpoint_mode, reason: FlowQueryFallbackReason::MissingRegionWatermark, @@ -159,6 +271,14 @@ impl BatchingTask { "Flow {flow_id} advanced incremental checkpoints, participating_regions={participating_regions}, watermarks={watermarks}" ); } + FlowCheckpointDecision::ContinuedFencedRepair { + pending_windows, + watermarks, + } => { + debug!( + "Flow {flow_id} continued fenced repair, pending_windows={pending_windows}, watermarks={watermarks}" + ); + } FlowCheckpointDecision::FallbackToFullSnapshot { previous_mode, reason, diff --git a/src/flow/src/batching_mode/task/inc.rs b/src/flow/src/batching_mode/task/inc.rs index 9af54c1ba7..fbea3abd25 100644 --- a/src/flow/src/batching_mode/task/inc.rs +++ b/src/flow/src/batching_mode/task/inc.rs @@ -64,12 +64,14 @@ impl BatchingTask { /// For incremental-mode SQL queries, attempt to prepare an executable plan /// that is safe for incremental scan extensions. /// - /// Returns `Some(plan)` when incremental extensions are safe, and `None` - /// when the caller should execute the original plan without incremental - /// extensions. The returned plan may be either a rewritten - /// delta-LEFT-JOIN-sink merge plan or the original plan. In particular, - /// plain GROUP BY queries with no aggregate merge columns are incremental - /// safe without a rewrite, so they return `Some(original_plan)`. + /// Returns `Some(plan)` when incremental extensions are safe. For an + /// incremental-delta query, `None` means the caller must not execute the + /// original plan without incremental extensions, because that would become + /// an unfiltered full snapshot; the caller should restore the dirty signal + /// and skip the current round instead. The returned plan may be either a + /// rewritten delta-LEFT-JOIN-sink merge plan or the original plan. In + /// particular, plain GROUP BY queries with no aggregate merge columns are + /// incremental safe without a rewrite, so they return `Some(original_plan)`. pub(super) async fn prepare_plan_for_incremental( &self, plan: &LogicalPlan, @@ -131,8 +133,10 @@ impl BatchingTask { // Fetch sink table for the merge rewrite. // Transient errors (catalog, schema, filter, or rewrite) should not - // permanently disable incremental mode. Instead, we fall back to a - // full-snapshot plan for this round while keeping incremental retryable. + // permanently disable incremental mode. They also must not execute the + // original plan without incremental extensions, because that would be an + // unfiltered full snapshot. The caller will restore the dirty signal and + // skip this round while keeping incremental retryable. let sink_table = match get_table_info_df_schema( self.config.catalog_manager.clone(), self.config.sink_table_name.clone(), @@ -143,10 +147,9 @@ impl BatchingTask { Err(err) => { warn!( "Flow {} failed to fetch sink table for incremental rewrite; \ - falling back to full snapshot for this round: {:?}", + skipping this round to avoid unfiltered full snapshot: {:?}", self.config.flow_id, err ); - self.state.write().unwrap().mark_full_snapshot(); return Ok(None); } }; @@ -163,10 +166,9 @@ impl BatchingTask { Err(err) => { warn!( "Flow {} failed to rewrite incremental aggregate with sink merge; \ - falling back to full snapshot for this round: {:?}", + skipping this round to avoid unfiltered full snapshot: {:?}", self.config.flow_id, err ); - self.state.write().unwrap().mark_full_snapshot(); return Ok(None); } }; diff --git a/src/flow/src/batching_mode/task/test.rs b/src/flow/src/batching_mode/task/test.rs index c42d564ce2..db773ed4f0 100644 --- a/src/flow/src/batching_mode/task/test.rs +++ b/src/flow/src/batching_mode/task/test.rs @@ -12,12 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use catalog::RegisterTableRequest; use catalog::memory::MemoryCatalogManager; use client::OutputWithMetrics; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::ext::BoxedError; +use common_error::mock::MockError; +use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::RecordBatch; use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; @@ -29,6 +32,7 @@ use query::options::{ FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY, QueryOptions, }; use session::context::QueryContext; +use snafu::ResultExt; use table::test_util::MemTable; use super::*; @@ -304,6 +308,12 @@ fn dirty_marker() -> DirtyTimeWindows { dirty } +fn flow_error_with_status(status_code: StatusCode) -> Error { + Err::<(), _>(BoxedError::new(MockError::new(status_code))) + .context(crate::error::ExternalSnafu) + .unwrap_err() +} + fn dirty_range(start: i64, end: i64) -> DirtyTimeWindows { let mut dirty = DirtyTimeWindows::default(); dirty.add_window( @@ -321,6 +331,14 @@ fn expire_after_for_retention_filter_test() -> i64 { (now_secs - 10) as i64 } +fn aggregate_time_window_sink_schema() -> Arc { + Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])) +} + async fn assert_unscoped_failure_restore( consumed_dirty_windows: DirtyTimeWindows, current_dirty_windows: DirtyTimeWindows, @@ -338,7 +356,7 @@ async fn assert_unscoped_failure_restore( let unscoped_query = PlanInfo { plan, dirty_restore: DirtyRestore::Unscoped(consumed_dirty_windows), - can_advance_checkpoints: true, + coverage: QueryCoverage::UnfilteredFull, }; task.handle_executed_query_failure(Some(&unscoped_query)); @@ -380,7 +398,7 @@ fn test_apply_query_result_to_state_advances_full_snapshot_to_incremental() { &mut state, &result, std::time::Duration::from_millis(1), - true, + &QueryCoverage::UnfilteredFull, ); assert_eq!( @@ -411,7 +429,7 @@ fn test_apply_query_result_to_state_stays_full_snapshot_when_incremental_disable &mut state, &result, std::time::Duration::from_millis(1), - true, + &QueryCoverage::UnfilteredFull, ); // Should NOT claim advancement to incremental; should fallback with correct reason. @@ -442,7 +460,7 @@ fn test_apply_query_result_to_state_rejects_unproved_watermark() { &mut state, &result, std::time::Duration::from_millis(1), - true, + &QueryCoverage::UnfilteredFull, ); assert_eq!( @@ -467,7 +485,7 @@ fn test_apply_query_result_to_state_reports_missing_watermark() { &mut state, &result, std::time::Duration::from_millis(1), - true, + &QueryCoverage::UnfilteredFull, ); assert_eq!( @@ -497,7 +515,7 @@ fn test_apply_query_result_to_state_advances_incremental_subset() { &mut state, &result, std::time::Duration::from_millis(1), - true, + &QueryCoverage::IncrementalDelta, ); assert_eq!( @@ -515,56 +533,323 @@ fn test_apply_query_result_to_state_advances_incremental_subset() { } #[test] -fn test_apply_query_result_to_state_blocks_full_snapshot_when_dirty_backlog_pending() { +fn test_scoped_base_repair_with_dirty_backlog_starts_fenced_repair_from_full_snapshot() { let query_ctx = QueryContext::arc(); let (_tx, rx) = tokio::sync::oneshot::channel(); let mut state = TaskState::new(query_ctx, rx); + // Set a dirty window so that ScopedBaseRepair enters fenced repair instead + // of advancing directly; coverage type plus live dirty-window presence now + // determines this transition. + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(20))); + let high = BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]); let result = output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]); let decision = BatchingTask::apply_query_result_to_state( &mut state, &result, std::time::Duration::from_millis(1), - false, + &QueryCoverage::ScopedBaseRepair, + ); + + assert_eq!( + decision, + FlowCheckpointDecision::ContinuedFencedRepair { + pending_windows: 1, + watermarks: 2, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.dirty_time_windows.is_empty()); + let repair = state.pending_fenced_repair().unwrap(); + assert_eq!(repair.high(), &high); + assert_eq!(repair.pending_windows().len(), 1); +} + +fn next_fenced_repair_filter(state: &mut TaskState, window_cnt: usize) -> FilterExprInfo { + state + .gen_scoped_filter_exprs( + "ts", + None, + chrono::Duration::seconds(10), + window_cnt, + 1, + None, + ) + .unwrap() + .unwrap() +} + +#[test] +fn test_fenced_repair_chunk_with_pending_windows_stays_full_snapshot() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(20))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(110))); + + let high = BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]); + state.start_fenced_repair(high.clone()).unwrap(); + let _filter = next_fenced_repair_filter(&mut state, 1); + assert_eq!( + state + .pending_fenced_repair() + .unwrap() + .pending_windows() + .len(), + 1 + ); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]), + std::time::Duration::from_millis(1), + &QueryCoverage::FencedRepairChunk { high }, + ); + + assert_eq!( + decision, + FlowCheckpointDecision::ContinuedFencedRepair { + pending_windows: 1, + watermarks: 2, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.checkpoints().is_empty()); + assert_eq!( + state + .pending_fenced_repair() + .unwrap() + .pending_windows() + .len(), + 1 + ); + assert!(state.dirty_time_windows.is_empty()); +} + +#[test] +fn test_continued_fenced_repair_uses_pending_snapshot_not_later_live_dirty() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105))); + + let high = BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]); + state.start_fenced_repair(high.clone()).unwrap(); + + // Make the two queues distinguishable: the fenced repair should keep using + // the moved pending backlog captured above, not this later live dirty window. + state.dirty_time_windows.add_window( + Timestamp::new_second(1000), + Some(Timestamp::new_second(1005)), + ); + assert_eq!(state.dirty_time_windows.len(), 1); + + let first_filter = next_fenced_repair_filter(&mut state, 1); + assert_eq!( + first_filter.time_ranges, + vec![(Timestamp::new_second(10), Timestamp::new_second(15))] + ); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]), + std::time::Duration::from_millis(1), + &QueryCoverage::FencedRepairChunk { high }, + ); + assert_eq!( + decision, + FlowCheckpointDecision::ContinuedFencedRepair { + pending_windows: 1, + watermarks: 2, + } + ); + + let second_filter = next_fenced_repair_filter(&mut state, 1); + assert_eq!( + second_filter.time_ranges, + vec![(Timestamp::new_second(100), Timestamp::new_second(105))] + ); + assert!(state.fenced_repair_pending_is_empty()); + assert_eq!(state.dirty_time_windows.len(), 1); +} + +#[test] +fn test_final_fenced_repair_chunk_advances_to_high() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(20))); + + let high = BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]); + state.start_fenced_repair(high.clone()).unwrap(); + let _filter = next_fenced_repair_filter(&mut state, 1); + assert!(state.fenced_repair_pending_is_empty()); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &output_with_region_watermarks([(1_u64, Some(10_u64)), (2_u64, Some(20_u64))]), + std::time::Duration::from_millis(1), + &QueryCoverage::FencedRepairChunk { high: high.clone() }, + ); + + assert_eq!( + decision, + FlowCheckpointDecision::AdvancedFromFullSnapshot { + participating_regions: 2, + watermarks: 2, + } + ); + assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental); + assert_eq!(state.checkpoints(), &high); + assert!(state.pending_fenced_repair().is_none()); + assert!(state.dirty_time_windows.is_empty()); +} + +#[test] +fn test_fenced_repair_watermarks_require_exact_high() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(20))); + + state + .start_fenced_repair(BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])) + .unwrap(); + let participating_regions = BTreeSet::from([1_u64, 2_u64]); + + assert!(state.fenced_repair_watermarks_match_high( + &participating_regions, + &HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + )); + assert!(!state.fenced_repair_watermarks_match_high( + &participating_regions, + &HashMap::from([(1_u64, 11_u64), (2_u64, 20_u64)]) + )); + assert!(!state.fenced_repair_watermarks_match_high( + &participating_regions, + &HashMap::from([(1_u64, 10_u64)]) + )); +} + +#[test] +fn test_fenced_repair_chunk_watermark_mismatch_restores_pending_but_consumes_inflight() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105))); + + let high = BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]); + state.start_fenced_repair(high.clone()).unwrap(); + + let _filter = next_fenced_repair_filter(&mut state, 1); + assert_eq!( + state + .pending_fenced_repair() + .unwrap() + .pending_windows() + .len(), + 1 + ); + assert!(state.dirty_time_windows.is_empty()); + + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &output_with_region_watermarks([(1_u64, Some(11_u64)), (2_u64, Some(20_u64))]), + std::time::Duration::from_millis(1), + &QueryCoverage::FencedRepairChunk { high }, ); assert_eq!( decision, FlowCheckpointDecision::FallbackToFullSnapshot { previous_mode: CheckpointMode::FullSnapshot, - reason: FlowQueryFallbackReason::DirtyBacklogPending, + reason: FlowQueryFallbackReason::IncompleteRegionWatermark, } ); - assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); - assert!(state.checkpoints().is_empty()); + assert!(state.pending_fenced_repair().is_none()); + assert_eq!(state.dirty_time_windows.len(), 1); } -#[test] -fn test_apply_query_result_to_state_blocks_incremental_when_dirty_backlog_pending() { - let query_ctx = QueryContext::arc(); - let (_tx, rx) = tokio::sync::oneshot::channel(); - let mut state = TaskState::new(query_ctx, rx); - state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])); - let result = output_with_region_watermarks([(1_u64, Some(12_u64)), (2_u64, Some(25_u64))]); +#[tokio::test] +async fn test_fenced_repair_mismatch_next_plan_is_scoped_base_repair() { + let TestTaskParts { + task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number", + ) + .await; + let high = BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]); + let _filter = { + let mut state = task.state.write().unwrap(); + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105))); + state.start_fenced_repair(high.clone()).unwrap(); + next_fenced_repair_filter(&mut state, 1) + }; - let decision = BatchingTask::apply_query_result_to_state( - &mut state, - &result, - std::time::Duration::from_millis(1), - false, - ); + { + let mut state = task.state.write().unwrap(); + let decision = BatchingTask::apply_query_result_to_state( + &mut state, + &output_with_region_watermarks([(1_u64, Some(11_u64)), (2_u64, Some(20_u64))]), + std::time::Duration::from_millis(1), + &QueryCoverage::FencedRepairChunk { high }, + ); + assert_eq!( + decision, + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::IncompleteRegionWatermark, + } + ); + assert!(state.pending_fenced_repair().is_none()); + } + let plan = task + .gen_query_with_time_window( + query_engine, + &aggregate_time_window_sink_schema(), + &[], + false, + Some(1), + ) + .await + .unwrap() + .expect("mismatch should keep live dirty backlog for a fresh scoped repair"); + assert!(matches!(plan.coverage, QueryCoverage::ScopedBaseRepair)); + let DirtyRestore::Scoped(filter) = &plan.dirty_restore else { + panic!("scoped base repair should carry scoped dirty restore info"); + }; assert_eq!( - decision, - FlowCheckpointDecision::FallbackToFullSnapshot { - previous_mode: CheckpointMode::Incremental, - reason: FlowQueryFallbackReason::DirtyBacklogPending, - } - ); - assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); - assert_eq!( - state.checkpoints(), - &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + filter.time_ranges, + vec![(Timestamp::new_second(100), Timestamp::new_second(105))], + "executed pre-H repair item should not be requeued; only remaining pending window is retried" ); } @@ -579,6 +864,7 @@ fn test_apply_query_failure_to_state_falls_back_from_incremental() { let decision = BatchingTask::apply_query_failure_to_state( &mut state, std::time::Duration::from_millis(1), + &QueryCoverage::IncrementalDelta, FlowQueryFallbackReason::IncrementalQueryFailure, ); @@ -597,7 +883,7 @@ fn test_apply_query_failure_to_state_falls_back_from_incremental() { } #[test] -fn test_apply_query_failure_to_state_keeps_full_snapshot_without_decision() { +fn test_apply_query_failure_to_state_records_full_snapshot_failure() { let query_ctx = QueryContext::arc(); let (_tx, rx) = tokio::sync::oneshot::channel(); let mut state = TaskState::new(query_ctx, rx); @@ -605,14 +891,197 @@ fn test_apply_query_failure_to_state_keeps_full_snapshot_without_decision() { let decision = BatchingTask::apply_query_failure_to_state( &mut state, std::time::Duration::from_millis(1), - FlowQueryFallbackReason::StaleCursor, + &QueryCoverage::UnfilteredFull, + FlowQueryFallbackReason::QueryFailure, ); - assert_eq!(decision, None); + assert_eq!( + decision, + Some(FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::QueryFailure, + }) + ); assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); assert!(state.checkpoints().is_empty()); } +#[test] +fn test_query_failure_reason_distinguishes_fenced_repair_stale_fence() { + let err = flow_error_with_status(StatusCode::RequestOutdated); + + assert_eq!( + BatchingTask::query_failure_reason( + &err, + &QueryCoverage::FencedRepairChunk { + high: BTreeMap::new(), + }, + ), + FlowQueryFallbackReason::SnapshotFenceExpired + ); + assert_eq!( + BatchingTask::query_failure_reason(&err, &QueryCoverage::IncrementalDelta), + FlowQueryFallbackReason::StaleCursor + ); + + let generic_err = flow_error_with_status(StatusCode::Unexpected); + assert_eq!( + BatchingTask::query_failure_reason(&generic_err, &QueryCoverage::ScopedBaseRepair), + FlowQueryFallbackReason::QueryFailure + ); + assert_eq!( + BatchingTask::query_failure_reason(&generic_err, &QueryCoverage::IncrementalDelta), + FlowQueryFallbackReason::IncrementalQueryFailure + ); +} + +#[test] +fn test_fenced_repair_coverage_produces_snapshot_seq_map_for_distributed_metadata_path() { + // Covers the metadata boundary between QueryCoverage and the + // frontend/distributed client API: only FencedRepairChunk carries a + // non-empty snapshot_seqs map so the datanode can bind per-region + // snapshot upper bounds against the frozen high H. Other coverage + // variants must produce an empty map. + let high = BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]); + let coverage = QueryCoverage::FencedRepairChunk { high: high.clone() }; + assert_eq!( + coverage.snapshot_seqs(), + HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]) + ); + + assert!(QueryCoverage::UnfilteredFull.snapshot_seqs().is_empty()); + assert!(QueryCoverage::ScopedBaseRepair.snapshot_seqs().is_empty()); + assert!(QueryCoverage::IncrementalDelta.snapshot_seqs().is_empty()); +} + +#[tokio::test] +async fn test_fenced_repair_stale_fence_next_plan_is_scoped_base_repair() { + let TestTaskParts { + task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number", + ) + .await; + let high = BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]); + let filter = { + let mut state = task.state.write().unwrap(); + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105))); + state.start_fenced_repair(high.clone()).unwrap(); + next_fenced_repair_filter(&mut state, 1) + }; + + { + let mut state = task.state.write().unwrap(); + let decision = BatchingTask::apply_query_failure_to_state( + &mut state, + std::time::Duration::from_millis(1), + &QueryCoverage::FencedRepairChunk { high }, + FlowQueryFallbackReason::SnapshotFenceExpired, + ); + assert_eq!( + decision, + Some(FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::SnapshotFenceExpired, + }) + ); + assert!(state.pending_fenced_repair().is_none()); + + // Simulate the outer execution failure restore for the in-flight chunk. + state.restore_scoped_windows(&filter); + } + + let plan = task + .gen_query_with_time_window( + query_engine, + &aggregate_time_window_sink_schema(), + &[], + false, + Some(1), + ) + .await + .unwrap() + .expect("stale fence should restore dirty windows for a fresh scoped repair"); + assert!(matches!(plan.coverage, QueryCoverage::ScopedBaseRepair)); +} + +#[test] +fn test_fenced_repair_transient_non_stale_failure_retries_same_high() { + // Opposite of stale-fence abandon: a non-RequestOutdated failure on a + // fenced repair chunk should NOT abandon the pending repair. The same + // high H is retained, the failed in-flight window goes back into + // pending_windows (not live dirty_time_windows), and the next execution + // can re-attempt the same fenced repair chunk. + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + state + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105))); + + let high = BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]); + state.start_fenced_repair(high.clone()).unwrap(); + let filter = next_fenced_repair_filter(&mut state, 1); + assert_eq!( + state + .pending_fenced_repair() + .unwrap() + .pending_windows() + .len(), + 1 + ); + + let decision = BatchingTask::apply_query_failure_to_state( + &mut state, + std::time::Duration::from_millis(1), + &QueryCoverage::FencedRepairChunk { high: high.clone() }, + FlowQueryFallbackReason::QueryFailure, + ); + + assert_eq!( + decision, + Some(FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::FullSnapshot, + reason: FlowQueryFallbackReason::QueryFailure, + }) + ); + // Pending repair is NOT abandoned: high H is unchanged. + let repair = state.pending_fenced_repair().unwrap(); + assert_eq!(repair.high(), &high); + assert_eq!(repair.pending_windows().len(), 1); + + // Simulate the outer execution failure restore for the in-flight chunk. + state.restore_scoped_windows(&filter); + + // After restore, the in-flight chunk goes back into pending_windows + // (because pending_fenced_repair is still Some), NOT into live + // dirty_time_windows. + assert_eq!( + state + .pending_fenced_repair() + .unwrap() + .pending_windows() + .len(), + 2, + "in-flight window restored into pending_windows" + ); + assert_eq!( + state.dirty_time_windows.len(), + 0, + "live dirty windows unchanged (not where in-flight was restored)" + ); +} + #[test] fn test_checkpoint_decision_labels_are_stable() { let advance = FlowCheckpointDecision::AdvancedIncremental { @@ -630,10 +1099,18 @@ fn test_checkpoint_decision_labels_are_stable() { assert_eq!(fallback.mode_label(), "incremental"); assert_eq!(fallback.decision_label(), CHECKPOINT_DECISION_FALLBACK); assert_eq!(fallback.reason_label(), "stale_cursor"); + assert_eq!( + FlowQueryFallbackReason::SnapshotFenceExpired.as_label(), + "snapshot_fence_expired" + ); assert_eq!( FlowQueryFallbackReason::DirtyBacklogPending.as_label(), "dirty_backlog_pending" ); + assert_eq!( + FlowQueryFallbackReason::QueryFailure.as_label(), + "query_failure" + ); } #[tokio::test] @@ -740,7 +1217,7 @@ async fn test_full_snapshot_scoped_plan_marks_checkpoint_advance_safe_only_after .await .unwrap() .unwrap(); - assert!(!first.can_advance_checkpoints); + assert!(matches!(first.coverage, QueryCoverage::ScopedBaseRepair)); assert_eq!(task.state.read().unwrap().dirty_time_windows.len(), 1); let second = task @@ -748,10 +1225,56 @@ async fn test_full_snapshot_scoped_plan_marks_checkpoint_advance_safe_only_after .await .unwrap() .unwrap(); - assert!(second.can_advance_checkpoints); + assert!(matches!(second.coverage, QueryCoverage::ScopedBaseRepair)); assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); } +#[tokio::test] +async fn test_expired_empty_fenced_repair_generates_scoped_base_repair_plan() { + let TestTaskParts { + mut task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number", + ) + .await; + Arc::get_mut(&mut task.config) + .expect("test task config should be uniquely owned") + .expire_after = Some(expire_after_for_retention_filter_test()); + + { + let mut state = task.state.write().unwrap(); + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + state + .start_fenced_repair(BTreeMap::from([(1_u64, 10_u64)])) + .unwrap(); + + state.dirty_time_windows.clean(); + state + .dirty_time_windows + .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105))); + } + + let plan = task + .gen_query_with_time_window( + query_engine, + &aggregate_time_window_sink_schema(), + &[], + false, + Some(1), + ) + .await + .unwrap() + .expect("expired empty repair should fall back to live dirty"); + + assert!(matches!(plan.coverage, QueryCoverage::ScopedBaseRepair)); + assert!(plan.coverage.snapshot_seqs().is_empty()); + assert!(task.state.read().unwrap().pending_fenced_repair().is_none()); +} + #[tokio::test] async fn test_incremental_plan_consumes_dirty_signal_for_checkpoint_safety() { let TestTaskParts { @@ -784,12 +1307,12 @@ async fn test_incremental_plan_consumes_dirty_signal_for_checkpoint_safety() { .unwrap() .unwrap(); - assert!(plan.can_advance_checkpoints); + assert!(matches!(plan.coverage, QueryCoverage::IncrementalDelta)); assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); } #[tokio::test] -async fn test_full_snapshot_seeding_for_incremental_does_not_add_dirty_window_filter() { +async fn test_scoped_base_repair_plan_applies_dirty_window_filter() { let TestTaskParts { task, query_engine, @@ -822,9 +1345,9 @@ async fn test_full_snapshot_seeding_for_incremental_does_not_add_dirty_window_fi .unwrap(); let plan_text = plan.plan.to_string(); - assert!(plan.can_advance_checkpoints); - assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); - assert!(!plan_text.contains("Filter:"), "{plan_text}"); + assert!(matches!(plan.coverage, QueryCoverage::ScopedBaseRepair)); + assert_eq!(task.state.read().unwrap().dirty_time_windows.len(), 1); + assert!(plan_text.contains("Filter:"), "{plan_text}"); } #[tokio::test] @@ -843,7 +1366,7 @@ async fn test_full_snapshot_seeding_applies_expire_after_retention_filter() { assert!(!state.is_incremental_disabled()); state .dirty_time_windows - .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105))); } let sink_schema = Arc::new(Schema::new(vec![ ColumnSchema::new("number", CDT::uint32_datatype(), false), @@ -860,7 +1383,7 @@ async fn test_full_snapshot_seeding_applies_expire_after_retention_filter() { .unwrap() .unwrap(); - assert!(plan.can_advance_checkpoints); + assert!(matches!(plan.coverage, QueryCoverage::ScopedBaseRepair)); assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); let plan_text = plan.plan.to_string(); assert!( @@ -899,7 +1422,7 @@ async fn test_incremental_plan_does_not_add_dirty_window_filter() { .unwrap(); let plan_text = plan.plan.to_string(); - assert!(plan.can_advance_checkpoints); + assert!(matches!(plan.coverage, QueryCoverage::IncrementalDelta)); assert!(!plan_text.contains("Filter:"), "{plan_text}"); } @@ -935,7 +1458,7 @@ async fn test_incremental_delta_applies_expire_after_retention_filter() { .unwrap() .unwrap(); - assert!(plan.can_advance_checkpoints); + assert!(matches!(plan.coverage, QueryCoverage::IncrementalDelta)); assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); let plan_text = plan.plan.to_string(); assert!( @@ -945,65 +1468,121 @@ async fn test_incremental_delta_applies_expire_after_retention_filter() { } #[tokio::test] -async fn test_non_scoped_path_generates_plan_with_empty_dirty_signal() { +async fn test_successful_incremental_checkpoint_fallback_consumes_unscoped_dirty_signal() { let TestTaskParts { - mut task, + task, query_engine, .. - } = new_test_task_engine_and_plan_with_query( - "SELECT number, ts FROM numbers_with_ts", - "missing_sink", + } = new_time_window_test_task_with_query( + "SELECT max(number) AS number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window", ) .await; - Arc::get_mut(&mut task.config) - .expect("test task config should be uniquely owned") - .query_type = QueryType::Tql; - task.state.write().unwrap().dirty_time_windows.clean(); - let sink_schema = Arc::new(Schema::new(vec![ - ColumnSchema::new("number", CDT::uint32_datatype(), false), - ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true), - ])); + { + let mut state = task.state.write().unwrap(); + state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])); + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + } + let sink_schema = aggregate_time_window_sink_schema(); - let plan = task - .gen_query_with_time_window(query_engine, &sink_schema, &[], false, None) + let plan_info = task + .gen_query_with_time_window(query_engine.clone(), &sink_schema, &[], false, Some(1)) .await .unwrap() - .expect("non-scoped path should generate a plan even with an empty dirty signal"); + .unwrap(); - assert!(plan.can_advance_checkpoints); + assert!(matches!( + plan_info.coverage, + QueryCoverage::IncrementalDelta + )); + assert!(matches!( + &plan_info.dirty_restore, + DirtyRestore::Unscoped(_) + )); assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); + + let result = output_with_region_watermarks([(1_u64, Some(12_u64)), (2_u64, None)]); + let decision = { + let mut state = task.state.write().unwrap(); + BatchingTask::apply_query_result_to_state( + &mut state, + &result, + std::time::Duration::from_millis(1), + &plan_info.coverage, + ) + }; + assert_eq!( + decision, + FlowCheckpointDecision::FallbackToFullSnapshot { + previous_mode: CheckpointMode::Incremental, + reason: FlowQueryFallbackReason::IncompleteRegionWatermark, + } + ); + + { + let state = task.state.read().unwrap(); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(state.dirty_time_windows.is_empty()); + } + + let followup = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1)) + .await + .unwrap(); + assert!( + followup.is_none(), + "successful fallback consumes the dirty signal instead of re-running it" + ); } #[tokio::test] -async fn test_no_time_window_sql_with_eval_interval_generates_plan_without_dirty_signal() { - let TestTaskParts { - mut task, - query_engine, - .. - } = new_test_task_engine_and_plan_with_query( - "SELECT number, ts FROM numbers_with_ts", - "missing_sink", - ) - .await; - Arc::get_mut(&mut task.config) - .expect("test task config should be uniquely owned") - .flow_eval_interval = Some(Duration::from_secs(60)); - task.state.write().unwrap().dirty_time_windows.clean(); - let sink_schema = Arc::new(Schema::new(vec![ - ColumnSchema::new("number", CDT::uint32_datatype(), false), - ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true), - ])); +async fn test_explicit_full_query_paths_generate_unfiltered_full() { + for (case_name, query_type, flow_eval_interval) in [ + ("TQL", QueryType::Tql, None), + ( + "eval-interval SQL", + QueryType::Sql, + Some(Duration::from_secs(60)), + ), + ] { + let TestTaskParts { + mut task, + query_engine, + .. + } = new_test_task_engine_and_plan_with_query( + "SELECT number, ts FROM numbers_with_ts", + "missing_sink", + ) + .await; + { + let config = + Arc::get_mut(&mut task.config).expect("test task config should be uniquely owned"); + config.query_type = query_type; + config.flow_eval_interval = flow_eval_interval; + } + task.state.write().unwrap().dirty_time_windows.set_dirty(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])); - let plan = task - .gen_query_with_time_window(query_engine, &sink_schema, &[], false, None) - .await - .unwrap() - .expect( - "eval-interval SQL without a time-window expr should run by interval, not dirty signal", + let plan = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, None) + .await + .unwrap() + .unwrap_or_else(|| panic!("{case_name} full-query path should generate a plan")); + + assert!( + matches!(plan.coverage, QueryCoverage::UnfilteredFull), + "{case_name} should use UnfilteredFull" ); - - assert!(plan.can_advance_checkpoints); - assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); + assert!( + task.state.read().unwrap().dirty_time_windows.is_empty(), + "{case_name} should consume the dirty signal" + ); + } } #[tokio::test] @@ -1021,7 +1600,7 @@ async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_pat time_ranges: vec![(Timestamp::new_second(10), Timestamp::new_second(20))], window_size: chrono::Duration::seconds(10), }), - can_advance_checkpoints: true, + coverage: QueryCoverage::ScopedBaseRepair, }; task.handle_executed_query_failure(Some(&scoped_query)); @@ -1110,7 +1689,7 @@ async fn test_prepare_plan_for_incremental_disables_on_non_aggregate() { } #[tokio::test] -async fn test_prepare_plan_for_incremental_falls_back_without_disable_on_rewrite_error() { +async fn test_unsafe_incremental_plan_skip_restores_dirty_without_query() { let query_engine = create_test_query_engine(); let ctx = QueryContext::arc(); let plan = sql_to_df_plan( @@ -1149,7 +1728,8 @@ async fn test_prepare_plan_for_incremental_falls_back_without_disable_on_rewrite time_window_expr: None, expire_after: None, // The sink table exists, but does not have the rewritten aggregate - // output column `total`, so the rewrite fails deterministically. + // output column `total`, so incremental rewrite fails before any + // frontend query should be sent. sink_table_name: [ "greptime".to_string(), "public".to_string(), @@ -1172,16 +1752,31 @@ async fn test_prepare_plan_for_incremental_falls_back_without_disable_on_rewrite .write() .unwrap() .advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); - assert_eq!( - task.state.read().unwrap().checkpoint_mode(), - CheckpointMode::Incremental - ); + let dirty_restore = DirtyRestore::Unscoped(dirty_range(10, 15)); + let (frontend_client, _) = FrontendClient::from_empty_grpc_handler(QueryOptions::default()); - let incremental_plan = task.prepare_plan_for_incremental(&dml_plan).await.unwrap(); - assert!(incremental_plan.is_none()); + let result = task + .execute_logical_plan_unlocked( + &Arc::new(frontend_client), + &dml_plan, + &dirty_restore, + &QueryCoverage::IncrementalDelta, + ) + .await + .unwrap(); + + assert!( + result.is_none(), + "unsafe incremental fallback must skip query" + ); let state = task.state.read().unwrap(); + assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental); assert!(!state.is_incremental_disabled()); - assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert_eq!(state.dirty_time_windows.len(), 1); + assert_eq!( + state.dirty_time_windows.window_size(), + std::time::Duration::from_secs(5) + ); } #[tokio::test] @@ -1259,38 +1854,47 @@ async fn test_prepare_plan_for_incremental_group_by_without_merge_columns_uses_o #[tokio::test] async fn test_auto_created_sql_aggregate_sink_reaches_incremental_safe() { let sink_table = "auto_created_aggregate_sink"; + let query = "SELECT max(number) AS number, ts FROM numbers_with_ts GROUP BY ts"; let TestTaskParts { task, query_engine, .. - } = new_test_task_engine_and_plan_with_query( - "SELECT max(number) AS number, ts FROM numbers_with_ts GROUP BY ts", - sink_table, - ) - .await; + } = new_test_task_engine_and_plan_with_query(query, sink_table).await; register_auto_created_aggregate_sink(&query_engine, sink_table); - task.state.write().unwrap().dirty_time_windows.set_dirty(); - let plan_info = task - .gen_insert_plan_unlocked(&query_engine, None) + let ctx = task.state.read().unwrap().query_ctx.clone(); + let plan = sql_to_df_plan(ctx, query_engine.clone(), query, true) .await - .unwrap() .unwrap(); - assert!(plan_info.can_advance_checkpoints); + let (sink_table, _) = get_table_info_df_schema( + query_engine.engine_state().catalog_manager().clone(), + [ + "greptime".to_string(), + "public".to_string(), + sink_table.to_string(), + ], + ) + .await + .unwrap(); + let table_provider = Arc::new(DfTableProviderAdapter::new(sink_table)); + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + let dml_plan = LogicalPlan::Dml(DmlStatement::new( + datafusion_common::TableReference::bare("test"), + table_source, + WriteOp::Insert(datafusion_expr::dml::InsertOp::Append), + Arc::new(plan), + )); task.state .write() .unwrap() .advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); - let incremental_plan = task - .prepare_plan_for_incremental(&plan_info.plan) - .await - .unwrap(); + let incremental_plan = task.prepare_plan_for_incremental(&dml_plan).await.unwrap(); let incremental_safe = incremental_plan.is_some(); assert!(incremental_safe); assert!(!task.state.read().unwrap().is_incremental_disabled()); let extensions = task - .build_flow_query_extensions(incremental_safe, plan_info.can_advance_checkpoints) + .build_flow_query_extensions(incremental_safe, true) .await .unwrap(); assert!(extensions.contains(&( @@ -1312,7 +1916,7 @@ async fn test_unscoped_failure_restores_consumed_dirty_signal() { } #[tokio::test] -async fn test_unscoped_plan_generation_failure_restores_consumed_dirty_signal() { +async fn test_unscoped_runtime_invariant_error_preserves_dirty_signal() { let TestTaskParts { task, query_engine, .. } = new_test_task_engine_and_plan_with_query( @@ -1330,7 +1934,16 @@ async fn test_unscoped_plan_generation_failure_restores_consumed_dirty_signal() .gen_query_with_time_window(query_engine, &sink_schema, &[], false, None) .await; - assert!(result.is_err()); + let err = match result { + Err(err) => err, + Ok(_) => panic!("runtime should reject SQL without TWE or EVAL INTERVAL"), + }; + assert!(matches!(err, Error::Unexpected { .. }), "{err}"); + assert!( + err.to_string() + .contains("create-flow validation should have rejected it"), + "{err}" + ); let state = task.state.read().unwrap(); assert_eq!(state.dirty_time_windows.len(), 1); assert_eq!( @@ -1377,12 +1990,16 @@ async fn test_scoped_plan_generation_failure_restores_consumed_dirty_windows() { async fn test_insert_plan_matching_failure_restores_consumed_dirty_marker() { let sink_table = "partial_sink"; let TestTaskParts { - task, query_engine, .. - } = new_test_task_engine_and_plan_with_query( - "SELECT number, ts FROM numbers_with_ts", - sink_table, + mut task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number", ) .await; + Arc::get_mut(&mut task.config) + .expect("test task config should be uniquely owned") + .sink_table_name[2] = sink_table.to_string(); register_number_only_sink(&query_engine, sink_table); task.state.write().unwrap().dirty_time_windows.set_dirty(); @@ -1397,6 +2014,6 @@ async fn test_insert_plan_matching_failure_restores_consumed_dirty_marker() { assert_eq!(state.dirty_time_windows.len(), 1); assert_eq!( state.dirty_time_windows.window_size(), - std::time::Duration::from_secs(0) + std::time::Duration::from_secs(5) ); } diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index d0765a46cf..068cb98142 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -77,6 +77,27 @@ mod test { | 1970-01-01T00:00:00.009 | -9 | s9 | +-------------------------+----+----+"; query_and_expect(db.frontend().as_ref(), sql, expected).await; + + create_table_named(&client, "bar").await; + let result = client + .sql_with_terminal_metrics( + "insert into bar select ts, a, `B` from foo", + &[("flow.return_region_seq", "true")], + ) + .await + .unwrap(); + let OutputData::AffectedRows(affected_rows) = result.output.data else { + panic!("expected affected rows output"); + }; + assert_eq!(affected_rows, 9); + assert!(result.metrics.is_ready()); + let region_watermark_map = result + .region_watermark_map() + .expect("standalone affected-rows output should carry terminal region watermarks"); + assert!( + !region_watermark_map.is_empty(), + "standalone affected-rows output should contain at least one region watermark" + ); } #[tokio::test(flavor = "multi_thread")] diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.result b/tests/cases/standalone/common/flow/flow_advance_ttl.result index 4102519634..fd109043a0 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.result +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.result @@ -10,7 +10,7 @@ Affected Rows: 0 -- should fallback to streaming mode -- SQLNESS REPLACE id=\d+ id=REDACTED -CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS SELECT DISTINCT number as dis FROM @@ -167,7 +167,7 @@ CREATE TABLE distinct_basic ( Affected Rows: 0 -CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS SELECT DISTINCT number as dis FROM diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.sql b/tests/cases/standalone/common/flow/flow_advance_ttl.sql index d4c1014642..7693ce672d 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.sql +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.sql @@ -8,7 +8,7 @@ CREATE TABLE distinct_basic ( -- should fallback to streaming mode -- SQLNESS REPLACE id=\d+ id=REDACTED -CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS SELECT DISTINCT number as dis FROM @@ -71,7 +71,7 @@ CREATE TABLE distinct_basic ( TIME INDEX(ts) )WITH ('ttl' = '5s'); -CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS SELECT DISTINCT number as dis FROM @@ -126,4 +126,4 @@ SELECT number FROM distinct_basic; DROP FLOW test_distinct_basic; DROP TABLE distinct_basic; -DROP TABLE out_distinct_basic; \ No newline at end of file +DROP TABLE out_distinct_basic; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index ea8d907279..5144c459ad 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -166,7 +166,7 @@ CREATE TABLE input_basic ( Affected Rows: 0 -CREATE FLOW test_wildcard_basic SiNk TO out_basic AS +CREATE FLOW test_wildcard_basic SiNk TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -196,7 +196,7 @@ DROP FLOW test_wildcard_basic; Affected Rows: 0 -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -297,7 +297,7 @@ CREATE TABLE distinct_basic ( Affected Rows: 0 -CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS SELECT DISTINCT number as dis FROM @@ -641,7 +641,7 @@ CREATE TABLE ngx_access_log ( Affected Rows: 0 -- create flow task to calculate the distinct country -CREATE FLOW calc_ngx_country SINK TO ngx_country AS +CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS SELECT DISTINCT country, FROM @@ -790,7 +790,7 @@ CREATE TABLE ngx_access_log ( Affected Rows: 0 -CREATE FLOW calc_ngx_country SINK TO ngx_country AS +CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS SELECT DISTINCT country, -- this distinct is not necessary, but it's a good test to see if it works @@ -960,7 +960,7 @@ CREATE TABLE temp_alerts ( Affected Rows: 0 -CREATE FLOW temp_monitoring SINK TO temp_alerts AS +CREATE FLOW temp_monitoring SINK TO temp_alerts EVAL INTERVAL '1m' AS SELECT sensor_id, loc, @@ -1255,7 +1255,7 @@ CREATE TABLE requests_without_ip ( Affected Rows: 0 -CREATE FLOW requests_long_term SINK TO requests_without_ip AS +CREATE FLOW requests_long_term SINK TO requests_without_ip EVAL INTERVAL '1m' AS SELECT service_name, val, @@ -1685,7 +1685,7 @@ CREATE TABLE numbers_input_basic ( Affected Rows: 0 -CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic EVAL INTERVAL '1m' AS SELECT sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num FROM @@ -1883,7 +1883,7 @@ SELECT device_model, +--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ -- Create a flow with same source and sink table -CREATE FLOW same_source_and_sink_table SINK TO live_connection_log AS +CREATE FLOW same_source_and_sink_table SINK TO live_connection_log EVAL INTERVAL '1m' AS SELECT * FROM diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index e154779b86..ef95ad88a9 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -75,7 +75,7 @@ CREATE TABLE input_basic ( TIME INDEX(ts) ); -CREATE FLOW test_wildcard_basic SiNk TO out_basic AS +CREATE FLOW test_wildcard_basic SiNk TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -85,7 +85,7 @@ SHOW CREATE TABLE out_basic; DROP FLOW test_wildcard_basic; -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -122,7 +122,7 @@ CREATE TABLE distinct_basic ( TIME INDEX(ts) ); -CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS SELECT DISTINCT number as dis FROM @@ -284,7 +284,7 @@ CREATE TABLE ngx_access_log ( ); -- create flow task to calculate the distinct country -CREATE FLOW calc_ngx_country SINK TO ngx_country AS +CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS SELECT DISTINCT country, FROM @@ -346,7 +346,7 @@ CREATE TABLE ngx_access_log ( access_time TIMESTAMP TIME INDEX ); -CREATE FLOW calc_ngx_country SINK TO ngx_country AS +CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS SELECT DISTINCT country, -- this distinct is not necessary, but it's a good test to see if it works @@ -427,7 +427,7 @@ CREATE TABLE temp_alerts ( update_at TIMESTAMP ); -CREATE FLOW temp_monitoring SINK TO temp_alerts AS +CREATE FLOW temp_monitoring SINK TO temp_alerts EVAL INTERVAL '1m' AS SELECT sensor_id, loc, @@ -586,7 +586,7 @@ CREATE TABLE requests_without_ip ( PRIMARY KEY(service_name) ); -CREATE FLOW requests_long_term SINK TO requests_without_ip AS +CREATE FLOW requests_long_term SINK TO requests_without_ip EVAL INTERVAL '1m' AS SELECT service_name, val, @@ -790,7 +790,7 @@ CREATE TABLE numbers_input_basic ( TIME INDEX(ts) ); -CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic EVAL INTERVAL '1m' AS SELECT sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num FROM @@ -908,7 +908,7 @@ SELECT device_model, record_time_window FROM live_connection_statistics_detail; -- Create a flow with same source and sink table -CREATE FLOW same_source_and_sink_table SINK TO live_connection_log AS +CREATE FLOW same_source_and_sink_table SINK TO live_connection_log EVAL INTERVAL '1m' AS SELECT * FROM diff --git a/tests/cases/standalone/common/flow/flow_rebuild.result b/tests/cases/standalone/common/flow/flow_rebuild.result index bd2bf9c892..79c31fc8b1 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.result +++ b/tests/cases/standalone/common/flow/flow_rebuild.result @@ -9,7 +9,7 @@ CREATE TABLE input_basic ( Affected Rows: 0 -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -64,7 +64,7 @@ CREATE TABLE input_basic ( Affected Rows: 0 -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -142,7 +142,7 @@ DROP FLOW test_wildcard_basic; Affected Rows: 0 -- recreate flow so that it use new table id -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -206,7 +206,7 @@ CREATE TABLE input_basic ( Affected Rows: 0 -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -247,7 +247,7 @@ DROP TABLE out_basic; Affected Rows: 0 -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -304,7 +304,7 @@ CREATE TABLE input_basic ( Affected Rows: 0 -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -371,7 +371,7 @@ CREATE TABLE input_basic ( Affected Rows: 0 -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -474,7 +474,7 @@ DROP FLOW test_wildcard_basic; Affected Rows: 0 -- recreate flow so that it use new table id -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -549,7 +549,7 @@ CREATE TABLE input_basic ( Affected Rows: 0 -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -602,7 +602,7 @@ DROP TABLE out_basic; Affected Rows: 0 -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM diff --git a/tests/cases/standalone/common/flow/flow_rebuild.sql b/tests/cases/standalone/common/flow/flow_rebuild.sql index 4f30c80ea2..d7d4f43749 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.sql +++ b/tests/cases/standalone/common/flow/flow_rebuild.sql @@ -7,7 +7,7 @@ CREATE TABLE input_basic ( append_mode = 'true' ); -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -39,7 +39,7 @@ CREATE TABLE input_basic ( TIME INDEX(ts) ); -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -81,7 +81,7 @@ SELECT wildcard FROM out_basic; DROP FLOW test_wildcard_basic; -- recreate flow so that it use new table id -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -113,7 +113,7 @@ CREATE TABLE input_basic ( TIME INDEX(ts) ); -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -135,7 +135,7 @@ DROP FLOW test_wildcard_basic; DROP TABLE out_basic; -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -166,7 +166,7 @@ CREATE TABLE input_basic ( TIME INDEX(ts) ); -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -204,7 +204,7 @@ CREATE TABLE input_basic ( TIME INDEX(ts) ); -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -259,7 +259,7 @@ SELECT wildcard FROM out_basic; DROP FLOW test_wildcard_basic; -- recreate flow so that it use new table id -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -296,7 +296,7 @@ CREATE TABLE input_basic ( TIME INDEX(ts) ); -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM @@ -323,7 +323,7 @@ DROP FLOW test_wildcard_basic; DROP TABLE out_basic; -CREATE FLOW test_wildcard_basic sink TO out_basic AS +CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS SELECT COUNT(*) as wildcard FROM diff --git a/tests/cases/standalone/common/flow/flow_user_guide.result b/tests/cases/standalone/common/flow/flow_user_guide.result index 67fc92c959..8bd9cab926 100644 --- a/tests/cases/standalone/common/flow/flow_user_guide.result +++ b/tests/cases/standalone/common/flow/flow_user_guide.result @@ -232,7 +232,7 @@ CREATE TABLE ngx_country ( Affected Rows: 0 /* create flow task to calculate the distinct country */ -CREATE FLOW calc_ngx_country SINK TO ngx_country AS +CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS SELECT DISTINCT country, FROM @@ -314,7 +314,7 @@ CREATE TABLE ngx_country ( Affected Rows: 0 -CREATE FLOW calc_ngx_country SINK TO ngx_country AS +CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS SELECT DISTINCT country, date_bin(INTERVAL '1 hour', access_time) as time_window, @@ -403,7 +403,7 @@ CREATE TABLE temp_alerts ( Affected Rows: 0 -CREATE FLOW temp_monitoring SINK TO temp_alerts AS +CREATE FLOW temp_monitoring SINK TO temp_alerts EVAL INTERVAL '1m' AS SELECT sensor_id, loc, diff --git a/tests/cases/standalone/common/flow/flow_user_guide.sql b/tests/cases/standalone/common/flow/flow_user_guide.sql index cc5049cfc8..e66c8d8688 100644 --- a/tests/cases/standalone/common/flow/flow_user_guide.sql +++ b/tests/cases/standalone/common/flow/flow_user_guide.sql @@ -184,7 +184,7 @@ CREATE TABLE ngx_country ( ); /* create flow task to calculate the distinct country */ -CREATE FLOW calc_ngx_country SINK TO ngx_country AS +CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS SELECT DISTINCT country, FROM @@ -236,7 +236,7 @@ CREATE TABLE ngx_country ( PRIMARY KEY(country) ); -CREATE FLOW calc_ngx_country SINK TO ngx_country AS +CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS SELECT DISTINCT country, date_bin(INTERVAL '1 hour', access_time) as time_window, @@ -295,7 +295,7 @@ CREATE TABLE temp_alerts ( PRIMARY KEY(sensor_id, loc) ); -CREATE FLOW temp_monitoring SINK TO temp_alerts AS +CREATE FLOW temp_monitoring SINK TO temp_alerts EVAL INTERVAL '1m' AS SELECT sensor_id, loc, @@ -411,4 +411,4 @@ DROP FLOW calc_ngx_distribution; DROP TABLE ngx_distribution; -DROP TABLE ngx_access_log; \ No newline at end of file +DROP TABLE ngx_access_log; diff --git a/tests/cases/standalone/common/flow/flow_view.result b/tests/cases/standalone/common/flow/flow_view.result index ec54a12aa8..f823328d50 100644 --- a/tests/cases/standalone/common/flow/flow_view.result +++ b/tests/cases/standalone/common/flow/flow_view.result @@ -20,7 +20,7 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512 Affected Rows: 4 -CREATE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent) AS total_count FROM ngx_access_log GROUP BY user_agent; +CREATE FLOW user_agent_flow SINK TO user_agent_statistics EVAL INTERVAL '1m' AS SELECT user_agent, COUNT(user_agent) AS total_count FROM ngx_access_log GROUP BY user_agent; Affected Rows: 0 @@ -32,7 +32,7 @@ SELECT created_time <= updated_time, created_time IS NOT NULL, source_table_name | true | true | greptime.public.ngx_access_log | +--------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------+ -CREATE OR REPLACE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent)+123 AS total_count FROM ngx_access_log GROUP BY user_agent; +CREATE OR REPLACE FLOW user_agent_flow SINK TO user_agent_statistics EVAL INTERVAL '1m' AS SELECT user_agent, COUNT(user_agent)+123 AS total_count FROM ngx_access_log GROUP BY user_agent; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/flow_view.sql b/tests/cases/standalone/common/flow/flow_view.sql index 28e5e2608e..420b9e84f4 100644 --- a/tests/cases/standalone/common/flow/flow_view.sql +++ b/tests/cases/standalone/common/flow/flow_view.sql @@ -10,11 +10,11 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512 -- SQLNESS SLEEP 1s INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); -CREATE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent) AS total_count FROM ngx_access_log GROUP BY user_agent; +CREATE FLOW user_agent_flow SINK TO user_agent_statistics EVAL INTERVAL '1m' AS SELECT user_agent, COUNT(user_agent) AS total_count FROM ngx_access_log GROUP BY user_agent; SELECT created_time <= updated_time, created_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow'; -CREATE OR REPLACE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent)+123 AS total_count FROM ngx_access_log GROUP BY user_agent; +CREATE OR REPLACE FLOW user_agent_flow SINK TO user_agent_statistics EVAL INTERVAL '1m' AS SELECT user_agent, COUNT(user_agent)+123 AS total_count FROM ngx_access_log GROUP BY user_agent; SELECT created_time < updated_time, created_time IS NOT NULL, updated_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';