From 28fd796f4eeeb056483d963cd01626600d89fb2a Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 1 Jun 2026 10:48:00 +0800 Subject: [PATCH] fix(flow): harden incremental read correctness (#8196) * fix(flow): harden incremental read correctness Signed-off-by: discord9 * fix(flow): propagate dirty window options Signed-off-by: discord9 * test: more Signed-off-by: discord9 * chore: test config api Signed-off-by: discord9 * refactor: split gen Signed-off-by: discord9 * chore: per review Signed-off-by: discord9 * fix: allowlist key Signed-off-by: discord9 --------- Signed-off-by: discord9 --- config/config.md | 1 + config/flownode.example.toml | 4 + src/common/meta/src/ddl/create_flow.rs | 9 +- src/common/meta/src/ddl/tests/create_flow.rs | 21 +- src/flow/src/batching_mode.rs | 6 +- src/flow/src/batching_mode/engine.rs | 133 +++++- .../src/batching_mode/incremental_filter.rs | 222 --------- src/flow/src/batching_mode/state.rs | 22 +- src/flow/src/batching_mode/task.rs | 435 ++++++++++++------ src/flow/src/batching_mode/task/inc.rs | 25 +- src/flow/src/batching_mode/task/test.rs | 316 ++++++++++++- src/flow/src/batching_mode/utils/test.rs | 7 +- src/operator/src/statement/ddl.rs | 32 +- tests-integration/tests/http.rs | 1 + .../common/flow/flow_incremental_aggr.result | 32 +- .../common/flow/flow_incremental_aggr.sql | 26 +- .../flow/flow_incremental_memtable.result | 4 +- .../common/flow/flow_incremental_memtable.sql | 4 +- .../flow/flow_incremental_partitioned.result | 4 +- .../flow/flow_incremental_partitioned.sql | 4 +- .../common/flow/show_create_flow.result | 2 +- 21 files changed, 873 insertions(+), 437 deletions(-) delete mode 100644 src/flow/src/batching_mode/incremental_filter.rs diff --git a/config/config.md b/config/config.md index aa0c6701a0..d9cffaf122 100644 --- a/config/config.md +++ b/config/config.md @@ -630,6 +630,7 @@ | `flow.batching_mode.experimental_frontend_scan_timeout` | String | `30s` | Flow wait for available frontend timeout,
if failed to find available frontend after frontend_scan_timeout elapsed, return error
which prevent flownode from starting | | `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query | | `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance | +| `flow.batching_mode.experimental_enable_incremental_read` | Bool | `false` | Whether to enable experimental flow incremental source reads.
When disabled, batching flows always execute full-snapshot queries.
Can be overridden per flow with WITH (experimental_enable_incremental_read = 'true'). | | `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. | | `flow.batching_mode.frontend_tls` | -- | -- | -- | | `flow.batching_mode.frontend_tls.enabled` | Bool | `false` | Whether to enable TLS for client. | diff --git a/config/flownode.example.toml b/config/flownode.example.toml index 2c053e6e8c..ff8a9e4a50 100644 --- a/config/flownode.example.toml +++ b/config/flownode.example.toml @@ -31,6 +31,10 @@ node_id = 14 #+experimental_max_filter_num_per_query=20 ## Time window merge distance #+experimental_time_window_merge_threshold=3 +## Whether to enable experimental flow incremental source reads. +## When disabled, batching flows always execute full-snapshot queries. +## Can be overridden per flow with WITH (experimental_enable_incremental_read = 'true'). +#+experimental_enable_incremental_read=false ## Read preference of the Frontend client. #+read_preference="Leader" [flow.batching_mode.frontend_tls] diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index ddfb0c0759..8a419176c9 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -437,11 +437,13 @@ pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result { pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> { for key in flow_task.flow_options.keys() { match key.as_str() { - DEFER_ON_MISSING_SOURCE_KEY | FlowType::FLOW_TYPE_KEY => {} + DEFER_ON_MISSING_SOURCE_KEY + | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY + | FlowType::FLOW_TYPE_KEY => {} unknown => { return UnexpectedSnafu { err_msg: format!( - "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}" + "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}, {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}" ), } .fail(); @@ -487,6 +489,9 @@ pub enum FlowType { Streaming, } +pub const FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY: &str = + "experimental_enable_incremental_read"; + impl FlowType { pub const BATCHING: &str = "batching"; pub const STREAMING: &str = "streaming"; diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index a1a6c040f1..7150be39cb 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -24,8 +24,9 @@ use table::table_name::TableName; use crate::ddl::DdlContext; use crate::ddl::create_flow::{ - CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY, FlowType, - defer_on_missing_source, + CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY, + FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType, defer_on_missing_source, + validate_flow_options, }; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler; @@ -275,6 +276,22 @@ fn test_defer_on_missing_source_invalid_value() { ); } +#[test] +fn test_validate_flow_options_allows_incremental_read_option() { + let mut task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + task.flow_options.insert( + FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(), + "true".to_string(), + ); + + validate_flow_options(&task).unwrap(); +} + #[tokio::test] async fn test_create_flow_rejects_unknown_option_in_meta_task() { let mut task = test_create_flow_task( diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs index 580762a142..a8bd139d98 100644 --- a/src/flow/src/batching_mode.rs +++ b/src/flow/src/batching_mode.rs @@ -23,7 +23,6 @@ use session::ReadPreference; mod checkpoint; pub(crate) mod engine; pub(crate) mod frontend_client; -mod incremental_filter; mod state; mod table_creator; mod task; @@ -55,6 +54,10 @@ pub struct BatchingModeOptions { pub experimental_max_filter_num_per_query: usize, /// Time window merge distance pub experimental_time_window_merge_threshold: usize, + /// Whether to enable experimental flow incremental source reads. + /// + /// When disabled, batching flows always execute full-snapshot queries. + pub experimental_enable_incremental_read: bool, /// Read preference of the Frontend client. pub read_preference: ReadPreference, /// TLS option for client connections to frontends. @@ -72,6 +75,7 @@ impl Default for BatchingModeOptions { experimental_frontend_scan_timeout: Duration::from_secs(30), experimental_max_filter_num_per_query: 20, experimental_time_window_merge_threshold: 3, + experimental_enable_incremental_read: false, read_preference: Default::default(), frontend_tls: None, } diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index f37e54d80b..68fb3793e4 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -21,7 +21,7 @@ use std::time::Duration; use api::v1::flow::DirtyWindowRequests; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; -use common_meta::ddl::create_flow::FlowType; +use common_meta::ddl::create_flow::{FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType}; use common_meta::key::TableMetadataManagerRef; use common_meta::key::flow::FlowMetadataManagerRef; use common_meta::key::flow::flow_state::FlowStat; @@ -38,6 +38,7 @@ use session::context::QueryContext; use snafu::{OptionExt, ResultExt, ensure}; use sql::parsers::utils::is_tql; use store_api::metric_engine_consts::is_metric_engine_internal_column; +use store_api::mito_engine_options::APPEND_MODE_KEY; use store_api::storage::{RegionId, TableId}; use table::table_reference::TableReference; use tokio::sync::{RwLock, oneshot}; @@ -428,6 +429,55 @@ async fn get_table_info( } impl BatchingEngine { + fn batch_opts_for_flow_options( + &self, + flow_options: &HashMap, + ) -> Result, Error> { + let mut batch_opts = (*self.batch_opts).clone(); + if let Some(enable_incremental_read) = + flow_options.get(FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY) + { + batch_opts.experimental_enable_incremental_read = enable_incremental_read + .parse::() + .map_err(|_| { + InvalidQuerySnafu { + reason: format!( + "Invalid flow option {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}: {enable_incremental_read}" + ), + } + .build() + })?; + } + + Ok(Arc::new(batch_opts)) + } + + fn table_options_enable_append_mode(extra_options: &HashMap) -> bool { + extra_options + .get(APPEND_MODE_KEY) + .is_some_and(|value| value.eq_ignore_ascii_case("true")) + } + + fn ensure_incremental_source_append_only( + batch_opts: &BatchingModeOptions, + table_name: &[String; 3], + extra_options: &HashMap, + ) -> Result<(), Error> { + if batch_opts.experimental_enable_incremental_read { + ensure!( + Self::table_options_enable_append_mode(extra_options), + UnsupportedSnafu { + reason: format!( + "Flow incremental read requires append-only source table, but source table `{}` is not append-only. Consider setting append_mode='true' on the source table or disabling experimental_enable_incremental_read", + table_name.join(".") + ), + } + ); + } + + Ok(()) + } + pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result, Error> { let CreateFlowArgs { flow_id, @@ -494,6 +544,8 @@ impl BatchingEngine { } ); + let batch_opts = self.batch_opts_for_flow_options(&flow_options)?; + let mut source_table_names = Vec::with_capacity(2); for src_id in source_table_ids { // also check table option to see if ttl!=instant @@ -509,6 +561,11 @@ impl BatchingEngine { ), } ); + Self::ensure_incremental_source_append_only( + &batch_opts, + &table_name, + &table_info.table_info.meta.options.extra_options, + )?; source_table_names.push(table_name); } @@ -563,7 +620,7 @@ impl BatchingEngine { query_ctx, catalog_manager: self.catalog_manager.clone(), shutdown_rx: rx, - batch_opts: self.batch_opts.clone(), + batch_opts, flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)), }; @@ -808,7 +865,7 @@ impl BatchingEngine { }); let res = task - .gen_exec_once( + .execute_once_serialized( &self.query_engine, &self.frontend_client, cur_dirty_window_cnt, @@ -946,6 +1003,76 @@ mod tests { ) } + #[tokio::test] + async fn test_flow_option_overrides_incremental_read_switch() { + let engine = new_test_engine().await; + + let default_opts = engine.batch_opts_for_flow_options(&HashMap::new()).unwrap(); + assert!(!default_opts.experimental_enable_incremental_read); + + let enabled_opts = engine + .batch_opts_for_flow_options(&HashMap::from([( + FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(), + "true".to_string(), + )])) + .unwrap(); + assert!(enabled_opts.experimental_enable_incremental_read); + } + + #[test] + fn test_table_options_enable_append_mode() { + assert!(!BatchingEngine::table_options_enable_append_mode( + &HashMap::new() + )); + assert!(!BatchingEngine::table_options_enable_append_mode( + &HashMap::from([(APPEND_MODE_KEY.to_string(), "false".to_string())]) + )); + assert!(BatchingEngine::table_options_enable_append_mode( + &HashMap::from([(APPEND_MODE_KEY.to_string(), "TRUE".to_string())]) + )); + } + + #[test] + fn test_incremental_source_append_only_enforcement() { + let table_name = [ + "greptime".to_string(), + "public".to_string(), + "numbers".to_string(), + ]; + let disabled_opts = BatchingModeOptions::default(); + let enabled_opts = BatchingModeOptions { + experimental_enable_incremental_read: true, + ..Default::default() + }; + let non_append_options = HashMap::new(); + let append_options = HashMap::from([(APPEND_MODE_KEY.to_string(), "true".to_string())]); + + BatchingEngine::ensure_incremental_source_append_only( + &disabled_opts, + &table_name, + &non_append_options, + ) + .expect("disabled incremental read should not require append-only source"); + BatchingEngine::ensure_incremental_source_append_only( + &enabled_opts, + &table_name, + &append_options, + ) + .expect("append-only source should be accepted when incremental read is enabled"); + + let err = BatchingEngine::ensure_incremental_source_append_only( + &enabled_opts, + &table_name, + &non_append_options, + ) + .expect_err("non-append source should be rejected when incremental read is enabled"); + assert!( + err.to_string() + .contains("Flow incremental read requires append-only source table"), + "{err}" + ); + } + async fn new_test_task(flow_id: FlowId) -> (BatchingTask, oneshot::Sender<()>) { let query_engine = create_test_query_engine(); let ctx = QueryContext::arc(); diff --git a/src/flow/src/batching_mode/incremental_filter.rs b/src/flow/src/batching_mode/incremental_filter.rs deleted file mode 100644 index ddc58d0378..0000000000 --- a/src/flow/src/batching_mode/incremental_filter.rs +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_telemetry::tracing::debug; -use datafusion_expr::Expr; -use datatypes::schema::Schema; - -use crate::batching_mode::state::FilterExprInfo; -use crate::batching_mode::utils::IncrementalAggregateAnalysis; -use crate::{Error, FlowId}; - -pub(super) fn build_sink_dirty_time_window_filter_expr( - flow_id: FlowId, - analysis: &IncrementalAggregateAnalysis, - sink_schema: &Schema, - dirty_filter: Option<&FilterExprInfo>, -) -> Result, Error> { - let Some(dirty_filter) = dirty_filter else { - return Ok(None); - }; - - let Some(sink_filter_col) = - infer_sink_time_window_filter_col(flow_id, analysis, sink_schema, dirty_filter) - else { - return Ok(None); - }; - - dirty_filter.predicate_for_col(&sink_filter_col) -} - -fn infer_sink_time_window_filter_col( - flow_id: FlowId, - analysis: &IncrementalAggregateAnalysis, - sink_schema: &Schema, - dirty_filter: &FilterExprInfo, -) -> Option { - if analysis.group_key_names.is_empty() { - return None; - } - - let is_timestamp_group_key = |name: &str| { - analysis.group_key_names.iter().any(|key| key == name) - && sink_schema - .column_schema_by_name(name) - .is_some_and(|col| col.data_type.is_timestamp()) - }; - - if is_timestamp_group_key(&dirty_filter.col_name) { - return Some(dirty_filter.col_name.clone()); - } - - let candidates = analysis - .group_key_names - .iter() - .filter(|name| is_timestamp_group_key(name)) - .cloned() - .collect::>(); - - match candidates.as_slice() { - [name] => Some(name.clone()), - [] => { - debug!( - "Flow {} cannot infer sink dirty-window filter column: no timestamp group key in {:?}", - flow_id, analysis.group_key_names - ); - None - } - _ => { - debug!( - "Flow {} cannot infer sink dirty-window filter column: ambiguous timestamp group keys {:?}", - flow_id, candidates - ); - None - } - } -} - -#[cfg(test)] -mod test { - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::ColumnSchema; - use pretty_assertions::assert_eq; - - use super::*; - use crate::adapter::AUTO_CREATED_UPDATE_AT_TS_COL; - use crate::batching_mode::state::FilterExprInfo; - use crate::batching_mode::utils::IncrementalAggregateAnalysis; - - fn test_analysis_with_group_keys(group_key_names: Vec<&str>) -> IncrementalAggregateAnalysis { - IncrementalAggregateAnalysis { - group_key_names: group_key_names - .into_iter() - .map(|name| name.to_string()) - .collect(), - merge_columns: vec![], - literal_columns: vec![], - output_field_names: vec![], - unsupported_exprs: vec![], - } - } - - fn test_dirty_filter(col_name: &str) -> FilterExprInfo { - FilterExprInfo { - expr: datafusion_expr::col(col_name), - col_name: col_name.to_string(), - time_ranges: vec![], - window_size: chrono::Duration::seconds(1), - } - } - - fn test_sink_schema(columns: Vec<(&str, ConcreteDataType)>) -> Schema { - Schema::new( - columns - .into_iter() - .map(|(name, data_type)| ColumnSchema::new(name, data_type, true)) - .collect(), - ) - } - - #[test] - fn test_infer_sink_time_window_filter_col_uses_matching_source_group_key() { - let analysis = test_analysis_with_group_keys(vec!["ts", "host"]); - let sink_schema = test_sink_schema(vec![ - ("ts", ConcreteDataType::timestamp_millisecond_datatype()), - ("host", ConcreteDataType::string_datatype()), - ]); - let dirty_filter = test_dirty_filter("ts"); - - assert_eq!( - Some("ts".to_string()), - infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) - ); - } - - #[test] - fn test_infer_sink_time_window_filter_col_uses_unique_timestamp_group_key() { - let analysis = test_analysis_with_group_keys(vec!["host", "time_window"]); - let sink_schema = test_sink_schema(vec![ - ("host", ConcreteDataType::string_datatype()), - ( - "time_window", - ConcreteDataType::timestamp_millisecond_datatype(), - ), - ( - AUTO_CREATED_UPDATE_AT_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - ]); - let dirty_filter = test_dirty_filter("ts"); - - assert_eq!( - Some("time_window".to_string()), - infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) - ); - } - - #[test] - fn test_infer_sink_time_window_filter_col_skips_global_aggregate() { - let analysis = test_analysis_with_group_keys(vec![]); - let sink_schema = test_sink_schema(vec![ - ("number", ConcreteDataType::uint32_datatype()), - ( - "time_window", - ConcreteDataType::timestamp_millisecond_datatype(), - ), - ]); - let dirty_filter = test_dirty_filter("ts"); - - assert_eq!( - None, - infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) - ); - } - - #[test] - fn test_infer_sink_time_window_filter_col_skips_without_timestamp_group_key() { - let analysis = test_analysis_with_group_keys(vec!["host", "device"]); - let sink_schema = test_sink_schema(vec![ - ("host", ConcreteDataType::string_datatype()), - ("device", ConcreteDataType::string_datatype()), - ( - AUTO_CREATED_UPDATE_AT_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - ]); - let dirty_filter = test_dirty_filter("ts"); - - assert_eq!( - None, - infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) - ); - } - - #[test] - fn test_infer_sink_time_window_filter_col_skips_ambiguous_timestamp_group_keys() { - let analysis = test_analysis_with_group_keys(vec!["ts", "time_window"]); - let sink_schema = test_sink_schema(vec![ - ("ts", ConcreteDataType::timestamp_millisecond_datatype()), - ( - "time_window", - ConcreteDataType::timestamp_millisecond_datatype(), - ), - ]); - let dirty_filter = test_dirty_filter("source_ts"); - - assert_eq!( - None, - infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter) - ); - } -} diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 42b71a4ec7..c5fcc74143 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -66,12 +66,20 @@ pub struct TaskState { } impl TaskState { pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self { + Self::with_dirty_time_windows(query_ctx, shutdown_rx, DirtyTimeWindows::default()) + } + + pub fn with_dirty_time_windows( + query_ctx: QueryContextRef, + shutdown_rx: oneshot::Receiver<()>, + dirty_time_windows: DirtyTimeWindows, + ) -> Self { Self { query_ctx, last_update_time: Instant::now(), last_query_duration: Duration::from_secs(0), last_exec_time_millis: None, - dirty_time_windows: Default::default(), + dirty_time_windows, checkpoint_mode: CheckpointMode::FullSnapshot, checkpoints: Default::default(), incremental_disabled: false, @@ -264,6 +272,16 @@ impl DirtyTimeWindows { time_window_merge_threshold, } } + + #[cfg(test)] + pub(crate) fn max_filter_num_per_query(&self) -> usize { + self.max_filter_num_per_query + } + + #[cfg(test)] + pub(crate) fn time_window_merge_threshold(&self) -> usize { + self.time_window_merge_threshold + } } impl Default for DirtyTimeWindows { @@ -681,7 +699,7 @@ impl DirtyTimeWindows { } } -fn to_df_literal(value: Timestamp) -> Result { +pub(crate) fn to_df_literal(value: Timestamp) -> Result { let value = Value::from(value); let value = value .try_to_scalar_value(&value.data_type()) diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 3cdf7899a6..cbd6a05cc2 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -27,7 +27,7 @@ use datafusion::datasource::DefaultTableSource; use datafusion::sql::unparser::expr_to_sql; use datafusion_common::DFSchemaRef; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp}; +use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp, col, lit}; use datatypes::schema::Schema; use query::QueryEngineRef; use query::options::FLOW_INCREMENTAL_MODE; @@ -38,14 +38,16 @@ use sql::parsers::utils::is_tql; use store_api::mito_engine_options::MERGE_MODE_KEY; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::adapter::DfTableProviderAdapter; -use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; +use tokio::sync::{Mutex, oneshot}; use tokio::time::Instant; use crate::batching_mode::BatchingModeOptions; use crate::batching_mode::checkpoint::checkpoint_mode_label; use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc}; -use crate::batching_mode::state::{CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState}; +use crate::batching_mode::state::{ + CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState, to_df_literal, +}; use crate::batching_mode::table_creator::{QueryType, create_table_with_expr}; use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::utils::{ @@ -67,12 +69,6 @@ use crate::{Error, FlowId}; mod ckpt; mod inc; -/// Maximum number of dirty time-window predicates attached to one incremental -/// SQL query. This keeps generated OR filters bounded so Substrait encoding and -/// downstream planning remain predictable; if the backlog is larger, the flow -/// drains one capped batch and postpones checkpoint advancement to a later run. -const MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS: usize = 4096; - /// The task's config, immutable once created #[derive(Clone)] pub struct TaskConfig { @@ -113,6 +109,10 @@ fn is_merge_mode_last_non_null(options: &HashMap) -> bool { pub struct BatchingTask { pub config: Arc, pub state: Arc>, + /// Serializes plan generation, execution, checkpoint advancement, and dirty + /// window restoration for this flow. Without this, a manual flush and the + /// background loop can process the same checkpoint range concurrently. + execution_lock: Arc>, } /// Arguments for creating batching task @@ -150,6 +150,16 @@ pub enum DirtyRestore { Unscoped(DirtyTimeWindows), } +struct ExecuteOnceOutcome { + new_query: Option, + /// Execution result of the generated insert plan. + /// + /// `Ok(Some((affected_rows, elapsed)))` means a query was executed. + /// `Ok(None)` means no query was generated because there was no dirty signal. + /// `Err(_)` means plan generation or execution failed. + result: Result, Error>, +} + impl BatchingTask { #[allow(clippy::too_many_arguments)] pub fn try_new( @@ -168,6 +178,18 @@ impl BatchingTask { flow_eval_interval, }: TaskArgs<'_>, ) -> Result { + let mut state = TaskState::with_dirty_time_windows( + query_ctx.clone(), + shutdown_rx, + DirtyTimeWindows::new( + batch_opts.experimental_max_filter_num_per_query, + batch_opts.experimental_time_window_merge_threshold, + ), + ); + if !batch_opts.experimental_enable_incremental_read { + state.disable_incremental(); + } + Ok(Self { config: Arc::new(TaskConfig { flow_id, @@ -182,7 +204,8 @@ impl BatchingTask { batch_opts, flow_eval_interval, }), - state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))), + state: Arc::new(RwLock::new(state)), + execution_lock: Arc::new(Mutex::new(())), }) } @@ -251,40 +274,75 @@ impl BatchingTask { .context(ExternalSnafu) } - pub async fn gen_exec_once( + pub(crate) async fn execute_once_serialized( &self, engine: &QueryEngineRef, frontend_client: &Arc, max_window_cnt: Option, ) -> Result, Error> { - if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? { + let outcome = self + .execute_once_serialized_with_outcome(engine, frontend_client, max_window_cnt) + .await; + outcome.result + } + + /// Executes one flow evaluation under `execution_lock` and keeps the + /// generated query context for the background loop's error logging/backoff. + async fn execute_once_serialized_with_outcome( + &self, + engine: &QueryEngineRef, + frontend_client: &Arc, + max_window_cnt: Option, + ) -> ExecuteOnceOutcome { + let _execution_guard = self.execution_lock.lock().await; + self.execute_once_unlocked(engine, frontend_client, max_window_cnt) + .await + } + + /// Executes one flow evaluation. Caller must hold `execution_lock`. + async fn execute_once_unlocked( + &self, + engine: &QueryEngineRef, + frontend_client: &Arc, + max_window_cnt: Option, + ) -> ExecuteOnceOutcome { + let new_query = match self.gen_insert_plan_unlocked(engine, max_window_cnt).await { + Ok(new_query) => new_query, + Err(err) => { + return ExecuteOnceOutcome { + new_query: None, + result: Err(err), + }; + } + }; + + if let Some(new_query) = new_query { debug!("Generate new query: {}", new_query.plan); - let dirty_filter = match &new_query.dirty_restore { - DirtyRestore::Scoped(f) => Some(f), - _ => None, - }; - match self - .execute_logical_plan( + let res = self + .execute_logical_plan_unlocked( frontend_client, &new_query.plan, - dirty_filter, new_query.can_advance_checkpoints, ) - .await - { - Ok(result) => Ok(result), - Err(err) => { - self.handle_executed_query_failure(Some(&new_query)); - Err(err) - } + .await; + if res.is_err() { + self.handle_executed_query_failure(Some(&new_query)); + } + ExecuteOnceOutcome { + new_query: Some(new_query), + result: res, } } else { debug!("Generate no query"); - Ok(None) + ExecuteOnceOutcome { + new_query: None, + result: Ok(None), + } } } - pub async fn gen_insert_plan( + /// Generates the insert plan. Caller must reach this through the serialized path. + async fn gen_insert_plan_unlocked( &self, engine: &QueryEngineRef, max_window_cnt: Option, @@ -388,11 +446,11 @@ impl BatchingTask { Ok(()) } - pub async fn execute_logical_plan( + /// Executes the insert plan. Caller must reach this through the serialized path. + async fn execute_logical_plan_unlocked( &self, frontend_client: &Arc, plan: &LogicalPlan, - dirty_filter: Option<&FilterExprInfo>, can_advance_checkpoints: bool, ) -> Result, Error> { let instant = Instant::now(); @@ -426,8 +484,7 @@ impl BatchingTask { // For incremental-mode SQL queries, attempt to rewrite the delta aggregate // plan into a safe delta-LEFT-JOIN-sink form before deciding on extensions. let incremental_plan = if can_advance_checkpoints { - self.prepare_plan_for_incremental(&plan, dirty_filter) - .await? + self.prepare_plan_for_incremental(&plan).await? } else { None }; @@ -580,6 +637,112 @@ impl BatchingTask { }) } + fn restore_unscoped_dirty_windows(&self, dirty_windows: &DirtyTimeWindows) { + self.state + .write() + .unwrap() + .dirty_time_windows + .add_dirty_windows(dirty_windows); + } + + fn restore_unscoped_dirty_windows_on_err( + &self, + dirty_windows: &DirtyTimeWindows, + result: Result, + ) -> Result { + result.inspect_err(|_| { + self.restore_unscoped_dirty_windows(dirty_windows); + }) + } + + fn drain_dirty_windows_signal(&self) -> (bool, DirtyTimeWindows) { + let mut state = self.state.write().unwrap(); + let dirty_windows_to_restore = state.dirty_time_windows.clone(); + let is_dirty = !dirty_windows_to_restore.is_empty(); + state.dirty_time_windows.clean(); + (is_dirty, dirty_windows_to_restore) + } + + #[allow(clippy::too_many_arguments)] + async fn gen_unfiltered_plan_info( + &self, + engine: QueryEngineRef, + query_ctx: QueryContextRef, + sink_table_schema: Arc, + primary_key_indices: &[usize], + allow_partial: bool, + dirty_windows_to_restore: DirtyTimeWindows, + retention_filter: Option<(&str, Timestamp, &'static str)>, + ) -> Result { + let mut plan = self.restore_unscoped_dirty_windows_on_err( + &dirty_windows_to_restore, + gen_plan_with_matching_schema( + &self.config.query, + query_ctx, + engine, + sink_table_schema, + primary_key_indices, + allow_partial, + ) + .await, + )?; + + if let Some((col_name, lower_bound, context)) = retention_filter { + let lower = self.restore_unscoped_dirty_windows_on_err( + &dirty_windows_to_restore, + to_df_literal(lower_bound), + )?; + let retention_filter = col(col_name).gt_eq(lit(lower)); + let mut add_filter = AddFilterRewriter::new(retention_filter); + plan = self.restore_unscoped_dirty_windows_on_err( + &dirty_windows_to_restore, + plan.clone() + .rewrite(&mut add_filter) + .with_context(|_| DatafusionSnafu { + context: format!( + "Failed to apply {context} expire_after filter to plan:\n {}\n", + plan + ), + }) + .map(|rewrite| rewrite.data), + )?; + } + + Ok(PlanInfo { + plan, + dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore), + can_advance_checkpoints: true, + }) + } + + async fn gen_unfiltered_plan_info_if_dirty( + &self, + engine: QueryEngineRef, + query_ctx: QueryContextRef, + sink_table_schema: Arc, + primary_key_indices: &[usize], + allow_partial: bool, + retention_filter: Option<(&str, Timestamp, &'static str)>, + ) -> Result, Error> { + let (is_dirty, dirty_windows_to_restore) = self.drain_dirty_windows_signal(); + if !is_dirty { + debug!("Flow id={:?}, no new data, not update", self.config.flow_id); + return Ok(None); + } + + self.gen_unfiltered_plan_info( + engine, + query_ctx, + sink_table_schema, + primary_key_indices, + allow_partial, + dirty_windows_to_restore, + retention_filter, + ) + .await + .map(Some) + } + fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) { if let Some(query) = query { self.restore_dirty_windows_after_failure(query); @@ -626,33 +789,11 @@ impl BatchingTask { let min_refresh = self.config.batch_opts.experimental_min_refresh_duration; - let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await { - Ok(new_query) => new_query, - Err(err) => { - common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id); - // also sleep for a little while before try again to prevent flooding logs - tokio::time::sleep(min_refresh).await; - continue; - } - }; + let outcome = self + .execute_once_serialized_with_outcome(&engine, &frontend_client, max_window_cnt) + .await; - let res = if let Some(new_query) = &new_query { - let dirty_filter = match &new_query.dirty_restore { - DirtyRestore::Scoped(f) => Some(f), - _ => None, - }; - self.execute_logical_plan( - &frontend_client, - &new_query.plan, - dirty_filter, - new_query.can_advance_checkpoints, - ) - .await - } else { - Ok(None) - }; - - match res { + match outcome.result { // normal execute, sleep for some time before doing next query Ok(Some(_)) => { // can increase max_window_cnt to query more windows next time @@ -703,11 +844,10 @@ impl BatchingTask { } // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed Err(err) => { - self.handle_executed_query_failure(new_query.as_ref()); METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT .with_label_values(&[&flow_id_str]) .inc(); - match new_query { + match outcome.new_query { Some(query) => { common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan); // TODO(discord9): add some backoff here? half the query time window or what @@ -743,6 +883,20 @@ impl BatchingTask { create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type) } + fn should_use_unfiltered_incremental_delta(&self) -> bool { + let state = self.state.read().unwrap(); + state.checkpoint_mode() == CheckpointMode::Incremental + && !state.is_incremental_disabled() + && matches!(self.config.query_type, QueryType::Sql) + } + + fn should_use_unfiltered_full_snapshot_seeding(&self) -> bool { + let state = self.state.read().unwrap(); + state.checkpoint_mode() == CheckpointMode::FullSnapshot + && !state.is_incremental_disabled() + && matches!(self.config.query_type, QueryType::Sql) + } + /// will merge and use the first ten time window in query async fn gen_query_with_time_window( &self, @@ -783,83 +937,35 @@ impl BatchingTask { self.config.flow_id ); // clean dirty time window too, this could be from create flow's check_execute - let (is_dirty, dirty_windows_to_restore) = { - let mut state = self.state.write().unwrap(); - let dirty_windows_to_restore = state.dirty_time_windows.clone(); - let is_dirty = !dirty_windows_to_restore.is_empty(); - state.dirty_time_windows.clean(); - (is_dirty, dirty_windows_to_restore) - }; - - if !is_dirty { - // no dirty data, hence no need to update - debug!("Flow id={:?}, no new data, not update", self.config.flow_id); - return Ok(None); - } - - let plan = match gen_plan_with_matching_schema( - &self.config.query, - query_ctx, - engine, - sink_table_schema.clone(), - primary_key_indices, - allow_partial, - ) - .await - { - Ok(plan) => plan, - Err(err) => { - self.state - .write() - .unwrap() - .dirty_time_windows - .add_dirty_windows(&dirty_windows_to_restore); - return Err(err); - } - }; - - return Ok(Some(PlanInfo { - plan, - dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore), - can_advance_checkpoints: true, - })); + return self + .gen_unfiltered_plan_info_if_dirty( + engine, + query_ctx, + sink_table_schema.clone(), + primary_key_indices, + allow_partial, + None, + ) + .await; } _ => { // Clean dirty windows for full-query/non-scoped paths, // such as TQL, that cannot use a time-window filter. - let dirty_windows_to_restore = { - let mut state = self.state.write().unwrap(); - let dirty_windows_to_restore = state.dirty_time_windows.clone(); - state.dirty_time_windows.clean(); - dirty_windows_to_restore - }; + let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal(); - let plan = match gen_plan_with_matching_schema( - &self.config.query, - query_ctx, - engine, - sink_table_schema.clone(), - primary_key_indices, - allow_partial, - ) - .await - { - Ok(plan) => plan, - Err(err) => { - self.state - .write() - .unwrap() - .dirty_time_windows - .add_dirty_windows(&dirty_windows_to_restore); - return Err(err); - } - }; + let plan_info = self + .gen_unfiltered_plan_info( + engine, + query_ctx, + sink_table_schema.clone(), + primary_key_indices, + allow_partial, + dirty_windows_to_restore, + None, + ) + .await?; - return Ok(Some(PlanInfo { - plan, - dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore), - can_advance_checkpoints: true, - })); + return Ok(Some(plan_info)); } }; @@ -889,22 +995,61 @@ impl BatchingTask { ), })?; + if self.should_use_unfiltered_full_snapshot_seeding() { + // A full-snapshot query that can seed/refresh incremental + // checkpoints must not use dirty-window predicates. Rows can be + // written after dirty windows are drained but before the source scan + // snapshot opens; a stale dirty-window filter could exclude those + // rows while the returned watermark includes them, causing the next + // incremental read to skip them forever. Execute an unfiltered full + // snapshot instead, and keep dirty windows only as the scheduling and + // failure-restoration signal. + let retention_filter = self + .config + .expire_after + .map(|_| (col_name.as_str(), expire_lower_bound, "full-snapshot")); + return self + .gen_unfiltered_plan_info_if_dirty( + engine, + query_ctx, + sink_table_schema.clone(), + primary_key_indices, + allow_partial, + retention_filter, + ) + .await; + } + + if self.should_use_unfiltered_incremental_delta() { + // In incremental mode, source correctness is defined by the + // per-region sequence range `(checkpoint, scan-open snapshot]`, not + // by dirty-window predicates. Dirty windows are only a scheduling + // signal here. Applying a stale dirty-window filter to the source can + // exclude rows that are inside the returned watermark and make a + // checkpoint advance skip them forever. The sink side is also left + // unfiltered by dirty windows; the incremental rewrite joins the + // delta groups with the full sink state for correctness. Future + // dynamic filters can prune sink reads as a pure optimization. + let retention_filter = self + .config + .expire_after + .map(|_| (col_name.as_str(), expire_lower_bound, "incremental")); + return self + .gen_unfiltered_plan_info_if_dirty( + engine, + query_ctx, + sink_table_schema.clone(), + primary_key_indices, + allow_partial, + retention_filter, + ) + .await; + } + let (expr, can_advance_checkpoints) = { let mut state = self.state.write().unwrap(); - let window_cnt = if state.checkpoint_mode() == CheckpointMode::Incremental - && !state.is_incremental_disabled() - && matches!(self.config.query_type, QueryType::Sql) - { - // Incremental scans are bounded by region sequence checkpoints, - // so the dirty-window filter only narrows sink-side/time-window - // work. Drain more windows than normal, but keep a hard cap to - // avoid building a huge OR filter after a long downtime. If - // windows remain, checkpoints won't advance this round. - MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS - } else { - max_window_cnt - .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query) - }; + let window_cnt = max_window_cnt + .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query); let expr = state.dirty_time_windows.gen_filter_exprs( &col_name, Some(expire_lower_bound), diff --git a/src/flow/src/batching_mode/task/inc.rs b/src/flow/src/batching_mode/task/inc.rs index 4fb64a676e..9af54c1ba7 100644 --- a/src/flow/src/batching_mode/task/inc.rs +++ b/src/flow/src/batching_mode/task/inc.rs @@ -26,8 +26,7 @@ use snafu::ResultExt; use table::metadata::TableId; use crate::Error; -use crate::batching_mode::incremental_filter::build_sink_dirty_time_window_filter_expr; -use crate::batching_mode::state::{CheckpointMode, FilterExprInfo}; +use crate::batching_mode::state::CheckpointMode; use crate::batching_mode::table_creator::QueryType; use crate::batching_mode::task::BatchingTask; use crate::batching_mode::utils::{ @@ -74,7 +73,6 @@ impl BatchingTask { pub(super) async fn prepare_plan_for_incremental( &self, plan: &LogicalPlan, - dirty_filter: Option<&FilterExprInfo>, ) -> Result, Error> { let is_incremental_sql = { let state = self.state.read().unwrap(); @@ -152,31 +150,12 @@ impl BatchingTask { return Ok(None); } }; - let sink_schema = sink_table.table_info().meta.schema.clone(); - let sink_dirty_filter = match build_sink_dirty_time_window_filter_expr( - self.config.flow_id, - &analysis, - &sink_schema, - dirty_filter, - ) { - Ok(filter) => filter, - Err(err) => { - warn!( - "Flow {} failed to build sink dirty time window filter; \ - falling back to full snapshot for this round: {:?}", - self.config.flow_id, err - ); - self.state.write().unwrap().mark_full_snapshot(); - return Ok(None); - } - }; - let rewritten_inner = match rewrite_incremental_aggregate_with_sink_merge( &inner_plan, &analysis, sink_table, &self.config.sink_table_name, - sink_dirty_filter, + None, ) .await { diff --git a/src/flow/src/batching_mode/task/test.rs b/src/flow/src/batching_mode/task/test.rs index 959aeb00c9..d64b4ef1b9 100644 --- a/src/flow/src/batching_mode/task/test.rs +++ b/src/flow/src/batching_mode/task/test.rs @@ -25,7 +25,9 @@ use datatypes::data_type::ConcreteDataType as CDT; use datatypes::schema::ColumnSchema; use datatypes::vectors::{TimestampMillisecondVector, UInt32Vector, VectorRef}; use pretty_assertions::assert_eq; -use query::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY}; +use query::options::{ + FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY, QueryOptions, +}; use session::context::QueryContext; use table::test_util::MemTable; @@ -38,6 +40,13 @@ use crate::batching_mode::state::CheckpointMode; use crate::batching_mode::time_window::find_time_window_expr; use crate::test_utils::create_test_query_engine; +fn incremental_batch_opts() -> Arc { + Arc::new(BatchingModeOptions { + experimental_enable_incremental_read: true, + ..Default::default() + }) +} + async fn new_test_task_and_plan_with_missing_sink() -> (BatchingTask, LogicalPlan) { new_test_task_engine_and_plan_with_query( "SELECT number, ts FROM numbers_with_ts", @@ -60,6 +69,15 @@ impl TestTaskParts { } async fn new_test_task_engine_and_plan_with_query(query: &str, sink_table: &str) -> TestTaskParts { + new_test_task_engine_and_plan_with_query_and_opts(query, sink_table, incremental_batch_opts()) + .await +} + +async fn new_test_task_engine_and_plan_with_query_and_opts( + query: &str, + sink_table: &str, + batch_opts: Arc, +) -> TestTaskParts { let query_engine = create_test_query_engine(); let ctx = QueryContext::arc(); let plan = sql_to_df_plan( @@ -91,7 +109,7 @@ async fn new_test_task_engine_and_plan_with_query(query: &str, sink_table: &str) query_ctx: ctx, catalog_manager: query_engine.engine_state().catalog_manager().clone(), shutdown_rx: rx, - batch_opts: Arc::new(BatchingModeOptions::default()), + batch_opts, flow_eval_interval: None, }) .unwrap(); @@ -103,6 +121,75 @@ async fn new_test_task_engine_and_plan_with_query(query: &str, sink_table: &str) } } +#[tokio::test] +async fn test_incremental_read_is_disabled_by_default() { + let task = new_test_task_engine_and_plan_with_query_and_opts( + "SELECT number, ts FROM numbers_with_ts", + "numbers_with_ts", + Arc::new(BatchingModeOptions::default()), + ) + .await + .task; + + assert!(task.state.read().unwrap().is_incremental_disabled()); +} + +#[tokio::test] +async fn test_dirty_time_windows_uses_batch_opts() { + let task = new_test_task_engine_and_plan_with_query_and_opts( + "SELECT number, ts FROM numbers_with_ts", + "numbers_with_ts", + Arc::new(BatchingModeOptions { + experimental_max_filter_num_per_query: 7, + experimental_time_window_merge_threshold: 11, + ..Default::default() + }), + ) + .await + .task; + + let state = task.state.read().unwrap(); + assert_eq!(7, state.dirty_time_windows.max_filter_num_per_query()); + assert_eq!(11, state.dirty_time_windows.time_window_merge_threshold()); +} + +#[tokio::test] +async fn test_execute_once_serialized_waits_for_execution_lock() { + let TestTaskParts { + task, query_engine, .. + } = new_test_task_engine_and_plan_with_query( + "SELECT number, ts FROM numbers_with_ts", + "missing_sink", + ) + .await; + let (frontend_client, _handler) = + FrontendClient::from_empty_grpc_handler(QueryOptions::default()); + let frontend_client = Arc::new(frontend_client); + + let guard = task.execution_lock.clone().lock_owned().await; + let task_to_run = task.clone(); + let query_engine_to_run = query_engine.clone(); + let frontend_client_to_run = frontend_client.clone(); + let exec = tokio::spawn(async move { + task_to_run + .execute_once_serialized(&query_engine_to_run, &frontend_client_to_run, None) + .await + }); + + tokio::time::sleep(Duration::from_millis(20)).await; + assert!( + !exec.is_finished(), + "execute_once_serialized should wait for execution_lock" + ); + + drop(guard); + tokio::time::timeout(Duration::from_secs(1), exec) + .await + .expect("execute_once_serialized should finish once execution_lock is released") + .expect("execute_once_serialized task should not panic") + .expect_err("missing sink should fail after acquiring execution_lock"); +} + async fn new_time_window_test_task_with_query(query: &str) -> TestTaskParts { let query_engine = create_test_query_engine(); let ctx = QueryContext::arc(); @@ -147,7 +234,7 @@ async fn new_time_window_test_task_with_query(query: &str) -> TestTaskParts { query_ctx: ctx, catalog_manager: query_engine.engine_state().catalog_manager().clone(), shutdown_rx: rx, - batch_opts: Arc::new(BatchingModeOptions::default()), + batch_opts: incremental_batch_opts(), flow_eval_interval: None, }) .unwrap(); @@ -226,6 +313,14 @@ fn dirty_range(start: i64, end: i64) -> DirtyTimeWindows { dirty } +fn expire_after_for_retention_filter_test() -> i64 { + let now_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + (now_secs - 10) as i64 +} + async fn assert_unscoped_failure_restore( consumed_dirty_windows: DirtyTimeWindows, current_dirty_windows: DirtyTimeWindows, @@ -626,6 +721,7 @@ async fn test_full_snapshot_scoped_plan_marks_checkpoint_advance_safe_only_after .await; { let mut state = task.state.write().unwrap(); + state.disable_incremental(); state .dirty_time_windows .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); @@ -657,7 +753,7 @@ async fn test_full_snapshot_scoped_plan_marks_checkpoint_advance_safe_only_after } #[tokio::test] -async fn test_incremental_scoped_plan_consumes_all_dirty_windows_for_checkpoint_safety() { +async fn test_incremental_plan_consumes_dirty_signal_for_checkpoint_safety() { let TestTaskParts { task, query_engine, @@ -692,6 +788,192 @@ async fn test_incremental_scoped_plan_consumes_all_dirty_windows_for_checkpoint_ assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); } +#[tokio::test] +async fn test_full_snapshot_seeding_for_incremental_does_not_add_dirty_window_filter() { + let TestTaskParts { + task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT max(number) AS number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window", + ) + .await; + { + let mut state = task.state.write().unwrap(); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(!state.is_incremental_disabled()); + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + state + .dirty_time_windows + .add_window(Timestamp::new_second(30), Some(Timestamp::new_second(35))); + } + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])); + + let plan = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1)) + .await + .unwrap() + .unwrap(); + + let plan_text = plan.plan.to_string(); + assert!(plan.can_advance_checkpoints); + assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); + assert!(!plan_text.contains("Filter:"), "{plan_text}"); +} + +#[tokio::test] +async fn test_full_snapshot_seeding_applies_expire_after_retention_filter() { + let TestTaskParts { + mut task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT max(number) AS number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window", + ) + .await; + { + let mut state = task.state.write().unwrap(); + assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot); + assert!(!state.is_incremental_disabled()); + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + } + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])); + + Arc::get_mut(&mut task.config) + .expect("test task config should be uniquely owned") + .expire_after = Some(expire_after_for_retention_filter_test()); + let plan = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1)) + .await + .unwrap() + .unwrap(); + + assert!(plan.can_advance_checkpoints); + assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); + let plan_text = plan.plan.to_string(); + assert!( + plan_text.contains("Filter: ts >= TimestampMillisecond("), + "{plan_text}" + ); +} + +#[tokio::test] +async fn test_incremental_plan_does_not_add_dirty_window_filter() { + let TestTaskParts { + task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT max(number) AS number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window", + ) + .await; + { + let mut state = task.state.write().unwrap(); + state.advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + } + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])); + + let plan = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1)) + .await + .unwrap() + .unwrap(); + + let plan_text = plan.plan.to_string(); + assert!(plan.can_advance_checkpoints); + assert!(!plan_text.contains("Filter:"), "{plan_text}"); +} + +#[tokio::test] +async fn test_incremental_delta_applies_expire_after_retention_filter() { + let TestTaskParts { + mut task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT max(number) AS number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window", + ) + .await; + { + let mut state = task.state.write().unwrap(); + state.advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); + state + .dirty_time_windows + .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5))); + } + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])); + + Arc::get_mut(&mut task.config) + .expect("test task config should be uniquely owned") + .expire_after = Some(expire_after_for_retention_filter_test()); + let plan = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1)) + .await + .unwrap() + .unwrap(); + + assert!(plan.can_advance_checkpoints); + assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); + let plan_text = plan.plan.to_string(); + assert!( + plan_text.contains("Filter: ts >= TimestampMillisecond("), + "{plan_text}" + ); +} + +#[tokio::test] +async fn test_non_scoped_path_generates_plan_with_empty_dirty_signal() { + let TestTaskParts { + mut task, + query_engine, + .. + } = new_test_task_engine_and_plan_with_query( + "SELECT number, ts FROM numbers_with_ts", + "missing_sink", + ) + .await; + Arc::get_mut(&mut task.config) + .expect("test task config should be uniquely owned") + .query_type = QueryType::Tql; + task.state.write().unwrap().dirty_time_windows.clean(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true), + ])); + + let plan = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, None) + .await + .unwrap() + .expect("non-scoped path should generate a plan even with an empty dirty signal"); + + assert!(plan.can_advance_checkpoints); + assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); +} + #[tokio::test] async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_path() { let (task, plan) = new_test_task_and_plan_with_missing_sink().await; @@ -773,7 +1055,7 @@ async fn test_prepare_plan_for_incremental_disables_on_non_aggregate() { query_ctx: ctx, catalog_manager: query_engine.engine_state().catalog_manager().clone(), shutdown_rx: rx, - batch_opts: Arc::new(BatchingModeOptions::default()), + batch_opts: incremental_batch_opts(), flow_eval_interval: None, }) .unwrap(); @@ -788,10 +1070,7 @@ async fn test_prepare_plan_for_incremental_disables_on_non_aggregate() { CheckpointMode::Incremental ); - let incremental_plan = task - .prepare_plan_for_incremental(&dml_plan, None) - .await - .unwrap(); + let incremental_plan = task.prepare_plan_for_incremental(&dml_plan).await.unwrap(); assert!(incremental_plan.is_none()); let state = task.state.read().unwrap(); assert!(state.is_incremental_disabled()); @@ -852,7 +1131,7 @@ async fn test_prepare_plan_for_incremental_falls_back_without_disable_on_rewrite query_ctx: ctx, catalog_manager: query_engine.engine_state().catalog_manager().clone(), shutdown_rx: rx, - batch_opts: Arc::new(BatchingModeOptions::default()), + batch_opts: incremental_batch_opts(), flow_eval_interval: None, }) .unwrap(); @@ -866,10 +1145,7 @@ async fn test_prepare_plan_for_incremental_falls_back_without_disable_on_rewrite CheckpointMode::Incremental ); - let incremental_plan = task - .prepare_plan_for_incremental(&dml_plan, None) - .await - .unwrap(); + let incremental_plan = task.prepare_plan_for_incremental(&dml_plan).await.unwrap(); assert!(incremental_plan.is_none()); let state = task.state.read().unwrap(); assert!(!state.is_incremental_disabled()); @@ -928,7 +1204,7 @@ async fn test_prepare_plan_for_incremental_group_by_without_merge_columns_uses_o query_ctx: ctx, catalog_manager: query_engine.engine_state().catalog_manager().clone(), shutdown_rx: rx, - batch_opts: Arc::new(BatchingModeOptions::default()), + batch_opts: incremental_batch_opts(), flow_eval_interval: None, }) .unwrap(); @@ -939,7 +1215,7 @@ async fn test_prepare_plan_for_incremental_group_by_without_merge_columns_uses_o .advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); let incremental_plan = task - .prepare_plan_for_incremental(&dml_plan, None) + .prepare_plan_for_incremental(&dml_plan) .await .unwrap() .expect("plain GROUP BY is incremental-safe without a rewrite"); @@ -962,7 +1238,7 @@ async fn test_auto_created_sql_aggregate_sink_reaches_incremental_safe() { task.state.write().unwrap().dirty_time_windows.set_dirty(); let plan_info = task - .gen_insert_plan(&query_engine, None) + .gen_insert_plan_unlocked(&query_engine, None) .await .unwrap() .unwrap(); @@ -973,7 +1249,7 @@ async fn test_auto_created_sql_aggregate_sink_reaches_incremental_safe() { .unwrap() .advance_checkpoints(HashMap::from([(1_u64, 10_u64)])); let incremental_plan = task - .prepare_plan_for_incremental(&plan_info.plan, None) + .prepare_plan_for_incremental(&plan_info.plan) .await .unwrap(); let incremental_safe = incremental_plan.is_some(); @@ -1078,11 +1354,11 @@ async fn test_insert_plan_matching_failure_restores_consumed_dirty_marker() { register_number_only_sink(&query_engine, sink_table); task.state.write().unwrap().dirty_time_windows.set_dirty(); - let result = task.gen_insert_plan(&query_engine, None).await; + let result = task.gen_insert_plan_unlocked(&query_engine, None).await; assert!(result.is_err()); let _err = match result { - Ok(_) => panic!("gen_insert_plan should fail with a sink column mismatch"), + Ok(_) => panic!("gen_insert_plan_unlocked should fail with a sink column mismatch"), Err(err) => err, }; let state = task.state.read().unwrap(); diff --git a/src/flow/src/batching_mode/utils/test.rs b/src/flow/src/batching_mode/utils/test.rs index 5b9cf7f507..317b0a5475 100644 --- a/src/flow/src/batching_mode/utils/test.rs +++ b/src/flow/src/batching_mode/utils/test.rs @@ -1288,9 +1288,10 @@ async fn test_rewrite_incremental_aggregate_with_left_join() { #[tokio::test] async fn test_rewrite_incremental_aggregate_filters_sink_dirty_time_window() { - // This verifies the rewrite placement when callers supply an already - // inferred sink dirty-window predicate. The task-level inference rules are - // covered by `infer_sink_time_window_filter_col` tests in task.rs. + // This verifies the rewrite placement when callers supply a sink predicate. + // The production incremental flow path currently leaves sink scans + // unfiltered for correctness and relies on future dynamic filters for + // pruning. let query_engine = create_test_query_engine(); let ctx = QueryContext::arc(); let sql = "SELECT max(number) AS number, date_bin(INTERVAL '1 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window"; diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index aef164c6bb..cadb1cde66 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -35,7 +35,9 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_reado use common_catalog::{format_full_flow_name, format_full_table_name}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::Context; -use common_meta::ddl::create_flow::{DEFER_ON_MISSING_SOURCE_KEY, FlowType}; +use common_meta::ddl::create_flow::{ + DEFER_ON_MISSING_SOURCE_KEY, FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType, +}; use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::procedure_executor::ExecutorContext; @@ -114,7 +116,10 @@ struct DdlSubmitOptions { timeout: Duration, } -const ALLOWED_FLOW_OPTIONS: [&str; 1] = [DEFER_ON_MISSING_SOURCE_KEY]; +const ALLOWED_FLOW_OPTIONS: [&str; 2] = [ + DEFER_ON_MISSING_SOURCE_KEY, + FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, +]; fn build_procedure_id_output(procedure_id: Vec) -> Result { let procedure_id = String::from_utf8_lossy(&procedure_id).to_string(); @@ -187,7 +192,9 @@ fn validate_and_normalize_flow_options( } let normalized_value = match key.as_str() { - DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?, + DEFER_ON_MISSING_SOURCE_KEY | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY => { + normalize_flow_bool_option(&key, &value)? + } _ => { return InvalidSqlSnafu { err_msg: format!( @@ -2478,12 +2485,23 @@ mod test { #[test] fn test_validate_and_normalize_flow_options_valid() { - let options = - HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string())]); + let options = HashMap::from([ + (DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string()), + ( + FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(), + "FALSE".to_string(), + ), + ]); assert_eq!( validate_and_normalize_flow_options(options).unwrap(), - HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),)]) + HashMap::from([ + (DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),), + ( + FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(), + "false".to_string(), + ) + ]) ); } @@ -2497,7 +2515,7 @@ mod test { assert!( err.to_string() - .contains("unknown flow option 'foo', supported options: defer_on_missing_source") + .contains("unknown flow option 'foo', supported options: defer_on_missing_source, experimental_enable_incremental_read") ); } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 6e35ed7656..5fe22359f3 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1602,6 +1602,7 @@ experimental_grpc_max_retries = 3 experimental_frontend_scan_timeout = "30s" experimental_max_filter_num_per_query = 20 experimental_time_window_merge_threshold = 3 +experimental_enable_incremental_read = false read_preference = "Leader" [logging] diff --git a/tests/cases/standalone/common/flow/flow_incremental_aggr.result b/tests/cases/standalone/common/flow/flow_incremental_aggr.result index bb66d5362c..2273e0e821 100644 --- a/tests/cases/standalone/common/flow/flow_incremental_aggr.result +++ b/tests/cases/standalone/common/flow/flow_incremental_aggr.result @@ -1,3 +1,31 @@ +-- Incremental aggregate reads only support append-only source tables because +-- update/upsert sources need old-value compensation. +CREATE TABLE incremental_non_append_input ( + host_id INT, + n INT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(host_id) +); + +Affected Rows: 0 + +CREATE FLOW incremental_non_append_flow SINK TO incremental_non_append_sink +WITH (experimental_enable_incremental_read = 'true') +AS +SELECT + sum(n) AS total, + date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window +FROM + incremental_non_append_input +GROUP BY + time_window; + +Error: 3001(EngineExecuteQuery), Unsupported: Flow incremental read requires append-only source table, but source table `greptime.public.incremental_non_append_input` is not append-only. Consider setting append_mode='true' on the source table or disabling experimental_enable_incremental_read + +DROP TABLE incremental_non_append_input; + +Affected Rows: 0 + CREATE TABLE incremental_aggr_input ( host_id INT, n INT, @@ -9,7 +37,9 @@ CREATE TABLE incremental_aggr_input ( Affected Rows: 0 -CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink AS +CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink +WITH (experimental_enable_incremental_read = 'true') +AS SELECT sum(n) AS total, min(n) AS min_n, diff --git a/tests/cases/standalone/common/flow/flow_incremental_aggr.sql b/tests/cases/standalone/common/flow/flow_incremental_aggr.sql index 51dd431fef..4c012aef23 100644 --- a/tests/cases/standalone/common/flow/flow_incremental_aggr.sql +++ b/tests/cases/standalone/common/flow/flow_incremental_aggr.sql @@ -1,3 +1,25 @@ +-- Incremental aggregate reads only support append-only source tables because +-- update/upsert sources need old-value compensation. +CREATE TABLE incremental_non_append_input ( + host_id INT, + n INT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(host_id) +); + +CREATE FLOW incremental_non_append_flow SINK TO incremental_non_append_sink +WITH (experimental_enable_incremental_read = 'true') +AS +SELECT + sum(n) AS total, + date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window +FROM + incremental_non_append_input +GROUP BY + time_window; + +DROP TABLE incremental_non_append_input; + CREATE TABLE incremental_aggr_input ( host_id INT, n INT, @@ -7,7 +29,9 @@ CREATE TABLE incremental_aggr_input ( append_mode = 'true' ); -CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink AS +CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink +WITH (experimental_enable_incremental_read = 'true') +AS SELECT sum(n) AS total, min(n) AS min_n, diff --git a/tests/cases/standalone/common/flow/flow_incremental_memtable.result b/tests/cases/standalone/common/flow/flow_incremental_memtable.result index 1e452b21ad..67326e1261 100644 --- a/tests/cases/standalone/common/flow/flow_incremental_memtable.result +++ b/tests/cases/standalone/common/flow/flow_incremental_memtable.result @@ -12,7 +12,9 @@ CREATE TABLE flow_incr_memtable_input ( Affected Rows: 0 -CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink AS +CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink +WITH (experimental_enable_incremental_read = 'true') +AS SELECT sum(n) AS total, min(n) AS min_n, diff --git a/tests/cases/standalone/common/flow/flow_incremental_memtable.sql b/tests/cases/standalone/common/flow/flow_incremental_memtable.sql index 66dccbb8b3..6dbbf6064f 100644 --- a/tests/cases/standalone/common/flow/flow_incremental_memtable.sql +++ b/tests/cases/standalone/common/flow/flow_incremental_memtable.sql @@ -10,7 +10,9 @@ CREATE TABLE flow_incr_memtable_input ( append_mode = 'true' ); -CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink AS +CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink +WITH (experimental_enable_incremental_read = 'true') +AS SELECT sum(n) AS total, min(n) AS min_n, diff --git a/tests/cases/standalone/common/flow/flow_incremental_partitioned.result b/tests/cases/standalone/common/flow/flow_incremental_partitioned.result index b56b390abd..0899d4acb0 100644 --- a/tests/cases/standalone/common/flow/flow_incremental_partitioned.result +++ b/tests/cases/standalone/common/flow/flow_incremental_partitioned.result @@ -17,7 +17,9 @@ WITH ( Affected Rows: 0 -CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink AS +CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink +WITH (experimental_enable_incremental_read = 'true') +AS SELECT sum(n) AS total, min(n) AS min_n, diff --git a/tests/cases/standalone/common/flow/flow_incremental_partitioned.sql b/tests/cases/standalone/common/flow/flow_incremental_partitioned.sql index 234c9b9085..18cece1889 100644 --- a/tests/cases/standalone/common/flow/flow_incremental_partitioned.sql +++ b/tests/cases/standalone/common/flow/flow_incremental_partitioned.sql @@ -15,7 +15,9 @@ WITH ( append_mode = 'true' ); -CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink AS +CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink +WITH (experimental_enable_incremental_read = 'true') +AS SELECT sum(n) AS total, min(n) AS min_n, diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 113822cd18..431d1dfbb5 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -476,7 +476,7 @@ SINK TO out_num_cnt_show WITH (access_key_id = [true]) AS SELECT number AS n1 FROM numbers_input_show where number > 10; -Error: 1004(InvalidArguments), Invalid SQL, error: unknown flow option 'access_key_id', supported options: defer_on_missing_source +Error: 1004(InvalidArguments), Invalid SQL, error: unknown flow option 'access_key_id', supported options: defer_on_missing_source, experimental_enable_incremental_read DROP FLOW filter_numbers_show;