diff --git a/config/config.md b/config/config.md
index aa0c6701a0..d9cffaf122 100644
--- a/config/config.md
+++ b/config/config.md
@@ -630,6 +630,7 @@
| `flow.batching_mode.experimental_frontend_scan_timeout` | String | `30s` | Flow wait for available frontend timeout,
if failed to find available frontend after frontend_scan_timeout elapsed, return error
which prevent flownode from starting |
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
+| `flow.batching_mode.experimental_enable_incremental_read` | Bool | `false` | Whether to enable experimental flow incremental source reads.
When disabled, batching flows always execute full-snapshot queries.
Can be overridden per flow with WITH (experimental_enable_incremental_read = 'true'). |
| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |
| `flow.batching_mode.frontend_tls` | -- | -- | -- |
| `flow.batching_mode.frontend_tls.enabled` | Bool | `false` | Whether to enable TLS for client. |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index 2c053e6e8c..ff8a9e4a50 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -31,6 +31,10 @@ node_id = 14
#+experimental_max_filter_num_per_query=20
## Time window merge distance
#+experimental_time_window_merge_threshold=3
+## Whether to enable experimental flow incremental source reads.
+## When disabled, batching flows always execute full-snapshot queries.
+## Can be overridden per flow with WITH (experimental_enable_incremental_read = 'true').
+#+experimental_enable_incremental_read=false
## Read preference of the Frontend client.
#+read_preference="Leader"
[flow.batching_mode.frontend_tls]
diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs
index ddfb0c0759..8a419176c9 100644
--- a/src/common/meta/src/ddl/create_flow.rs
+++ b/src/common/meta/src/ddl/create_flow.rs
@@ -437,11 +437,13 @@ pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result {
pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> {
for key in flow_task.flow_options.keys() {
match key.as_str() {
- DEFER_ON_MISSING_SOURCE_KEY | FlowType::FLOW_TYPE_KEY => {}
+ DEFER_ON_MISSING_SOURCE_KEY
+ | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY
+ | FlowType::FLOW_TYPE_KEY => {}
unknown => {
return UnexpectedSnafu {
err_msg: format!(
- "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}"
+ "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}, {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}"
),
}
.fail();
@@ -487,6 +489,9 @@ pub enum FlowType {
Streaming,
}
+pub const FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY: &str =
+ "experimental_enable_incremental_read";
+
impl FlowType {
pub const BATCHING: &str = "batching";
pub const STREAMING: &str = "streaming";
diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs
index a1a6c040f1..7150be39cb 100644
--- a/src/common/meta/src/ddl/tests/create_flow.rs
+++ b/src/common/meta/src/ddl/tests/create_flow.rs
@@ -24,8 +24,9 @@ use table::table_name::TableName;
use crate::ddl::DdlContext;
use crate::ddl::create_flow::{
- CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY, FlowType,
- defer_on_missing_source,
+ CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY,
+ FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType, defer_on_missing_source,
+ validate_flow_options,
};
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler;
@@ -275,6 +276,22 @@ fn test_defer_on_missing_source_invalid_value() {
);
}
+#[test]
+fn test_validate_flow_options_allows_incremental_read_option() {
+ let mut task = test_create_flow_task(
+ "my_flow",
+ vec![],
+ TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"),
+ false,
+ );
+ task.flow_options.insert(
+ FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
+ "true".to_string(),
+ );
+
+ validate_flow_options(&task).unwrap();
+}
+
#[tokio::test]
async fn test_create_flow_rejects_unknown_option_in_meta_task() {
let mut task = test_create_flow_task(
diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs
index 580762a142..a8bd139d98 100644
--- a/src/flow/src/batching_mode.rs
+++ b/src/flow/src/batching_mode.rs
@@ -23,7 +23,6 @@ use session::ReadPreference;
mod checkpoint;
pub(crate) mod engine;
pub(crate) mod frontend_client;
-mod incremental_filter;
mod state;
mod table_creator;
mod task;
@@ -55,6 +54,10 @@ pub struct BatchingModeOptions {
pub experimental_max_filter_num_per_query: usize,
/// Time window merge distance
pub experimental_time_window_merge_threshold: usize,
+ /// Whether to enable experimental flow incremental source reads.
+ ///
+ /// When disabled, batching flows always execute full-snapshot queries.
+ pub experimental_enable_incremental_read: bool,
/// Read preference of the Frontend client.
pub read_preference: ReadPreference,
/// TLS option for client connections to frontends.
@@ -72,6 +75,7 @@ impl Default for BatchingModeOptions {
experimental_frontend_scan_timeout: Duration::from_secs(30),
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
+ experimental_enable_incremental_read: false,
read_preference: Default::default(),
frontend_tls: None,
}
diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs
index f37e54d80b..68fb3793e4 100644
--- a/src/flow/src/batching_mode/engine.rs
+++ b/src/flow/src/batching_mode/engine.rs
@@ -21,7 +21,7 @@ use std::time::Duration;
use api::v1::flow::DirtyWindowRequests;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
-use common_meta::ddl::create_flow::FlowType;
+use common_meta::ddl::create_flow::{FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::flow::flow_state::FlowStat;
@@ -38,6 +38,7 @@ use session::context::QueryContext;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parsers::utils::is_tql;
use store_api::metric_engine_consts::is_metric_engine_internal_column;
+use store_api::mito_engine_options::APPEND_MODE_KEY;
use store_api::storage::{RegionId, TableId};
use table::table_reference::TableReference;
use tokio::sync::{RwLock, oneshot};
@@ -428,6 +429,55 @@ async fn get_table_info(
}
impl BatchingEngine {
+ fn batch_opts_for_flow_options(
+ &self,
+ flow_options: &HashMap,
+ ) -> Result, Error> {
+ let mut batch_opts = (*self.batch_opts).clone();
+ if let Some(enable_incremental_read) =
+ flow_options.get(FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY)
+ {
+ batch_opts.experimental_enable_incremental_read = enable_incremental_read
+ .parse::()
+ .map_err(|_| {
+ InvalidQuerySnafu {
+ reason: format!(
+ "Invalid flow option {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}: {enable_incremental_read}"
+ ),
+ }
+ .build()
+ })?;
+ }
+
+ Ok(Arc::new(batch_opts))
+ }
+
+ fn table_options_enable_append_mode(extra_options: &HashMap) -> bool {
+ extra_options
+ .get(APPEND_MODE_KEY)
+ .is_some_and(|value| value.eq_ignore_ascii_case("true"))
+ }
+
+ fn ensure_incremental_source_append_only(
+ batch_opts: &BatchingModeOptions,
+ table_name: &[String; 3],
+ extra_options: &HashMap,
+ ) -> Result<(), Error> {
+ if batch_opts.experimental_enable_incremental_read {
+ ensure!(
+ Self::table_options_enable_append_mode(extra_options),
+ UnsupportedSnafu {
+ reason: format!(
+ "Flow incremental read requires append-only source table, but source table `{}` is not append-only. Consider setting append_mode='true' on the source table or disabling experimental_enable_incremental_read",
+ table_name.join(".")
+ ),
+ }
+ );
+ }
+
+ Ok(())
+ }
+
pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result