fix(flow): harden incremental read correctness (#8196)

* fix(flow): harden incremental read correctness

Signed-off-by: discord9 <discord9@163.com>

* fix(flow): propagate dirty window options

Signed-off-by: discord9 <discord9@163.com>

* test: more

Signed-off-by: discord9 <discord9@163.com>

* chore: test config api

Signed-off-by: discord9 <discord9@163.com>

* refactor: split gen

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* fix: allowlist key

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-01 10:48:00 +08:00
committed by GitHub
parent ed9312f8e3
commit 28fd796f4e
21 changed files with 873 additions and 437 deletions

View File

@@ -630,6 +630,7 @@
| `flow.batching_mode.experimental_frontend_scan_timeout` | String | `30s` | Flow wait for available frontend timeout,<br/>if failed to find available frontend after frontend_scan_timeout elapsed, return error<br/>which prevent flownode from starting |
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
| `flow.batching_mode.experimental_enable_incremental_read` | Bool | `false` | Whether to enable experimental flow incremental source reads.<br/>When disabled, batching flows always execute full-snapshot queries.<br/>Can be overridden per flow with WITH (experimental_enable_incremental_read = 'true'). |
| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |
| `flow.batching_mode.frontend_tls` | -- | -- | -- |
| `flow.batching_mode.frontend_tls.enabled` | Bool | `false` | Whether to enable TLS for client. |

View File

@@ -31,6 +31,10 @@ node_id = 14
#+experimental_max_filter_num_per_query=20
## Time window merge distance
#+experimental_time_window_merge_threshold=3
## Whether to enable experimental flow incremental source reads.
## When disabled, batching flows always execute full-snapshot queries.
## Can be overridden per flow with WITH (experimental_enable_incremental_read = 'true').
#+experimental_enable_incremental_read=false
## Read preference of the Frontend client.
#+read_preference="Leader"
[flow.batching_mode.frontend_tls]

View File

@@ -437,11 +437,13 @@ pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result<bool> {
pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> {
for key in flow_task.flow_options.keys() {
match key.as_str() {
DEFER_ON_MISSING_SOURCE_KEY | FlowType::FLOW_TYPE_KEY => {}
DEFER_ON_MISSING_SOURCE_KEY
| FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY
| FlowType::FLOW_TYPE_KEY => {}
unknown => {
return UnexpectedSnafu {
err_msg: format!(
"Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}"
"Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}, {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}"
),
}
.fail();
@@ -487,6 +489,9 @@ pub enum FlowType {
Streaming,
}
pub const FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY: &str =
"experimental_enable_incremental_read";
impl FlowType {
pub const BATCHING: &str = "batching";
pub const STREAMING: &str = "streaming";

View File

@@ -24,8 +24,9 @@ use table::table_name::TableName;
use crate::ddl::DdlContext;
use crate::ddl::create_flow::{
CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY, FlowType,
defer_on_missing_source,
CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY,
FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType, defer_on_missing_source,
validate_flow_options,
};
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler;
@@ -275,6 +276,22 @@ fn test_defer_on_missing_source_invalid_value() {
);
}
#[test]
fn test_validate_flow_options_allows_incremental_read_option() {
let mut task = test_create_flow_task(
"my_flow",
vec![],
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"),
false,
);
task.flow_options.insert(
FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
"true".to_string(),
);
validate_flow_options(&task).unwrap();
}
#[tokio::test]
async fn test_create_flow_rejects_unknown_option_in_meta_task() {
let mut task = test_create_flow_task(

View File

@@ -23,7 +23,6 @@ use session::ReadPreference;
mod checkpoint;
pub(crate) mod engine;
pub(crate) mod frontend_client;
mod incremental_filter;
mod state;
mod table_creator;
mod task;
@@ -55,6 +54,10 @@ pub struct BatchingModeOptions {
pub experimental_max_filter_num_per_query: usize,
/// Time window merge distance
pub experimental_time_window_merge_threshold: usize,
/// Whether to enable experimental flow incremental source reads.
///
/// When disabled, batching flows always execute full-snapshot queries.
pub experimental_enable_incremental_read: bool,
/// Read preference of the Frontend client.
pub read_preference: ReadPreference,
/// TLS option for client connections to frontends.
@@ -72,6 +75,7 @@ impl Default for BatchingModeOptions {
experimental_frontend_scan_timeout: Duration::from_secs(30),
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
experimental_enable_incremental_read: false,
read_preference: Default::default(),
frontend_tls: None,
}

View File

@@ -21,7 +21,7 @@ use std::time::Duration;
use api::v1::flow::DirtyWindowRequests;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::ddl::create_flow::{FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::flow::flow_state::FlowStat;
@@ -38,6 +38,7 @@ use session::context::QueryContext;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parsers::utils::is_tql;
use store_api::metric_engine_consts::is_metric_engine_internal_column;
use store_api::mito_engine_options::APPEND_MODE_KEY;
use store_api::storage::{RegionId, TableId};
use table::table_reference::TableReference;
use tokio::sync::{RwLock, oneshot};
@@ -428,6 +429,55 @@ async fn get_table_info(
}
impl BatchingEngine {
fn batch_opts_for_flow_options(
&self,
flow_options: &HashMap<String, String>,
) -> Result<Arc<BatchingModeOptions>, Error> {
let mut batch_opts = (*self.batch_opts).clone();
if let Some(enable_incremental_read) =
flow_options.get(FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY)
{
batch_opts.experimental_enable_incremental_read = enable_incremental_read
.parse::<bool>()
.map_err(|_| {
InvalidQuerySnafu {
reason: format!(
"Invalid flow option {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}: {enable_incremental_read}"
),
}
.build()
})?;
}
Ok(Arc::new(batch_opts))
}
fn table_options_enable_append_mode(extra_options: &HashMap<String, String>) -> bool {
extra_options
.get(APPEND_MODE_KEY)
.is_some_and(|value| value.eq_ignore_ascii_case("true"))
}
fn ensure_incremental_source_append_only(
batch_opts: &BatchingModeOptions,
table_name: &[String; 3],
extra_options: &HashMap<String, String>,
) -> Result<(), Error> {
if batch_opts.experimental_enable_incremental_read {
ensure!(
Self::table_options_enable_append_mode(extra_options),
UnsupportedSnafu {
reason: format!(
"Flow incremental read requires append-only source table, but source table `{}` is not append-only. Consider setting append_mode='true' on the source table or disabling experimental_enable_incremental_read",
table_name.join(".")
),
}
);
}
Ok(())
}
pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
let CreateFlowArgs {
flow_id,
@@ -494,6 +544,8 @@ impl BatchingEngine {
}
);
let batch_opts = self.batch_opts_for_flow_options(&flow_options)?;
let mut source_table_names = Vec::with_capacity(2);
for src_id in source_table_ids {
// also check table option to see if ttl!=instant
@@ -509,6 +561,11 @@ impl BatchingEngine {
),
}
);
Self::ensure_incremental_source_append_only(
&batch_opts,
&table_name,
&table_info.table_info.meta.options.extra_options,
)?;
source_table_names.push(table_name);
}
@@ -563,7 +620,7 @@ impl BatchingEngine {
query_ctx,
catalog_manager: self.catalog_manager.clone(),
shutdown_rx: rx,
batch_opts: self.batch_opts.clone(),
batch_opts,
flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)),
};
@@ -808,7 +865,7 @@ impl BatchingEngine {
});
let res = task
.gen_exec_once(
.execute_once_serialized(
&self.query_engine,
&self.frontend_client,
cur_dirty_window_cnt,
@@ -946,6 +1003,76 @@ mod tests {
)
}
#[tokio::test]
async fn test_flow_option_overrides_incremental_read_switch() {
let engine = new_test_engine().await;
let default_opts = engine.batch_opts_for_flow_options(&HashMap::new()).unwrap();
assert!(!default_opts.experimental_enable_incremental_read);
let enabled_opts = engine
.batch_opts_for_flow_options(&HashMap::from([(
FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
"true".to_string(),
)]))
.unwrap();
assert!(enabled_opts.experimental_enable_incremental_read);
}
#[test]
fn test_table_options_enable_append_mode() {
assert!(!BatchingEngine::table_options_enable_append_mode(
&HashMap::new()
));
assert!(!BatchingEngine::table_options_enable_append_mode(
&HashMap::from([(APPEND_MODE_KEY.to_string(), "false".to_string())])
));
assert!(BatchingEngine::table_options_enable_append_mode(
&HashMap::from([(APPEND_MODE_KEY.to_string(), "TRUE".to_string())])
));
}
#[test]
fn test_incremental_source_append_only_enforcement() {
let table_name = [
"greptime".to_string(),
"public".to_string(),
"numbers".to_string(),
];
let disabled_opts = BatchingModeOptions::default();
let enabled_opts = BatchingModeOptions {
experimental_enable_incremental_read: true,
..Default::default()
};
let non_append_options = HashMap::new();
let append_options = HashMap::from([(APPEND_MODE_KEY.to_string(), "true".to_string())]);
BatchingEngine::ensure_incremental_source_append_only(
&disabled_opts,
&table_name,
&non_append_options,
)
.expect("disabled incremental read should not require append-only source");
BatchingEngine::ensure_incremental_source_append_only(
&enabled_opts,
&table_name,
&append_options,
)
.expect("append-only source should be accepted when incremental read is enabled");
let err = BatchingEngine::ensure_incremental_source_append_only(
&enabled_opts,
&table_name,
&non_append_options,
)
.expect_err("non-append source should be rejected when incremental read is enabled");
assert!(
err.to_string()
.contains("Flow incremental read requires append-only source table"),
"{err}"
);
}
async fn new_test_task(flow_id: FlowId) -> (BatchingTask, oneshot::Sender<()>) {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();

View File

@@ -1,222 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::tracing::debug;
use datafusion_expr::Expr;
use datatypes::schema::Schema;
use crate::batching_mode::state::FilterExprInfo;
use crate::batching_mode::utils::IncrementalAggregateAnalysis;
use crate::{Error, FlowId};
pub(super) fn build_sink_dirty_time_window_filter_expr(
flow_id: FlowId,
analysis: &IncrementalAggregateAnalysis,
sink_schema: &Schema,
dirty_filter: Option<&FilterExprInfo>,
) -> Result<Option<Expr>, Error> {
let Some(dirty_filter) = dirty_filter else {
return Ok(None);
};
let Some(sink_filter_col) =
infer_sink_time_window_filter_col(flow_id, analysis, sink_schema, dirty_filter)
else {
return Ok(None);
};
dirty_filter.predicate_for_col(&sink_filter_col)
}
fn infer_sink_time_window_filter_col(
flow_id: FlowId,
analysis: &IncrementalAggregateAnalysis,
sink_schema: &Schema,
dirty_filter: &FilterExprInfo,
) -> Option<String> {
if analysis.group_key_names.is_empty() {
return None;
}
let is_timestamp_group_key = |name: &str| {
analysis.group_key_names.iter().any(|key| key == name)
&& sink_schema
.column_schema_by_name(name)
.is_some_and(|col| col.data_type.is_timestamp())
};
if is_timestamp_group_key(&dirty_filter.col_name) {
return Some(dirty_filter.col_name.clone());
}
let candidates = analysis
.group_key_names
.iter()
.filter(|name| is_timestamp_group_key(name))
.cloned()
.collect::<Vec<_>>();
match candidates.as_slice() {
[name] => Some(name.clone()),
[] => {
debug!(
"Flow {} cannot infer sink dirty-window filter column: no timestamp group key in {:?}",
flow_id, analysis.group_key_names
);
None
}
_ => {
debug!(
"Flow {} cannot infer sink dirty-window filter column: ambiguous timestamp group keys {:?}",
flow_id, candidates
);
None
}
}
}
#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use pretty_assertions::assert_eq;
use super::*;
use crate::adapter::AUTO_CREATED_UPDATE_AT_TS_COL;
use crate::batching_mode::state::FilterExprInfo;
use crate::batching_mode::utils::IncrementalAggregateAnalysis;
fn test_analysis_with_group_keys(group_key_names: Vec<&str>) -> IncrementalAggregateAnalysis {
IncrementalAggregateAnalysis {
group_key_names: group_key_names
.into_iter()
.map(|name| name.to_string())
.collect(),
merge_columns: vec![],
literal_columns: vec![],
output_field_names: vec![],
unsupported_exprs: vec![],
}
}
fn test_dirty_filter(col_name: &str) -> FilterExprInfo {
FilterExprInfo {
expr: datafusion_expr::col(col_name),
col_name: col_name.to_string(),
time_ranges: vec![],
window_size: chrono::Duration::seconds(1),
}
}
fn test_sink_schema(columns: Vec<(&str, ConcreteDataType)>) -> Schema {
Schema::new(
columns
.into_iter()
.map(|(name, data_type)| ColumnSchema::new(name, data_type, true))
.collect(),
)
}
#[test]
fn test_infer_sink_time_window_filter_col_uses_matching_source_group_key() {
let analysis = test_analysis_with_group_keys(vec!["ts", "host"]);
let sink_schema = test_sink_schema(vec![
("ts", ConcreteDataType::timestamp_millisecond_datatype()),
("host", ConcreteDataType::string_datatype()),
]);
let dirty_filter = test_dirty_filter("ts");
assert_eq!(
Some("ts".to_string()),
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
#[test]
fn test_infer_sink_time_window_filter_col_uses_unique_timestamp_group_key() {
let analysis = test_analysis_with_group_keys(vec!["host", "time_window"]);
let sink_schema = test_sink_schema(vec![
("host", ConcreteDataType::string_datatype()),
(
"time_window",
ConcreteDataType::timestamp_millisecond_datatype(),
),
(
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
),
]);
let dirty_filter = test_dirty_filter("ts");
assert_eq!(
Some("time_window".to_string()),
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
#[test]
fn test_infer_sink_time_window_filter_col_skips_global_aggregate() {
let analysis = test_analysis_with_group_keys(vec![]);
let sink_schema = test_sink_schema(vec![
("number", ConcreteDataType::uint32_datatype()),
(
"time_window",
ConcreteDataType::timestamp_millisecond_datatype(),
),
]);
let dirty_filter = test_dirty_filter("ts");
assert_eq!(
None,
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
#[test]
fn test_infer_sink_time_window_filter_col_skips_without_timestamp_group_key() {
let analysis = test_analysis_with_group_keys(vec!["host", "device"]);
let sink_schema = test_sink_schema(vec![
("host", ConcreteDataType::string_datatype()),
("device", ConcreteDataType::string_datatype()),
(
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
),
]);
let dirty_filter = test_dirty_filter("ts");
assert_eq!(
None,
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
#[test]
fn test_infer_sink_time_window_filter_col_skips_ambiguous_timestamp_group_keys() {
let analysis = test_analysis_with_group_keys(vec!["ts", "time_window"]);
let sink_schema = test_sink_schema(vec![
("ts", ConcreteDataType::timestamp_millisecond_datatype()),
(
"time_window",
ConcreteDataType::timestamp_millisecond_datatype(),
),
]);
let dirty_filter = test_dirty_filter("source_ts");
assert_eq!(
None,
infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
);
}
}

View File

@@ -66,12 +66,20 @@ pub struct TaskState {
}
impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
Self::with_dirty_time_windows(query_ctx, shutdown_rx, DirtyTimeWindows::default())
}
pub fn with_dirty_time_windows(
query_ctx: QueryContextRef,
shutdown_rx: oneshot::Receiver<()>,
dirty_time_windows: DirtyTimeWindows,
) -> Self {
Self {
query_ctx,
last_update_time: Instant::now(),
last_query_duration: Duration::from_secs(0),
last_exec_time_millis: None,
dirty_time_windows: Default::default(),
dirty_time_windows,
checkpoint_mode: CheckpointMode::FullSnapshot,
checkpoints: Default::default(),
incremental_disabled: false,
@@ -264,6 +272,16 @@ impl DirtyTimeWindows {
time_window_merge_threshold,
}
}
#[cfg(test)]
pub(crate) fn max_filter_num_per_query(&self) -> usize {
self.max_filter_num_per_query
}
#[cfg(test)]
pub(crate) fn time_window_merge_threshold(&self) -> usize {
self.time_window_merge_threshold
}
}
impl Default for DirtyTimeWindows {
@@ -681,7 +699,7 @@ impl DirtyTimeWindows {
}
}
fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
pub(crate) fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
let value = Value::from(value);
let value = value
.try_to_scalar_value(&value.data_type())

View File

@@ -27,7 +27,7 @@ use datafusion::datasource::DefaultTableSource;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_common::DFSchemaRef;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp, col, lit};
use datatypes::schema::Schema;
use query::QueryEngineRef;
use query::options::FLOW_INCREMENTAL_MODE;
@@ -38,14 +38,16 @@ use sql::parsers::utils::is_tql;
use store_api::mito_engine_options::MERGE_MODE_KEY;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::adapter::DfTableProviderAdapter;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{Mutex, oneshot};
use tokio::time::Instant;
use crate::batching_mode::BatchingModeOptions;
use crate::batching_mode::checkpoint::checkpoint_mode_label;
use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc};
use crate::batching_mode::state::{CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState};
use crate::batching_mode::state::{
CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState, to_df_literal,
};
use crate::batching_mode::table_creator::{QueryType, create_table_with_expr};
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
@@ -67,12 +69,6 @@ use crate::{Error, FlowId};
mod ckpt;
mod inc;
/// Maximum number of dirty time-window predicates attached to one incremental
/// SQL query. This keeps generated OR filters bounded so Substrait encoding and
/// downstream planning remain predictable; if the backlog is larger, the flow
/// drains one capped batch and postpones checkpoint advancement to a later run.
const MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS: usize = 4096;
/// The task's config, immutable once created
#[derive(Clone)]
pub struct TaskConfig {
@@ -113,6 +109,10 @@ fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
pub struct BatchingTask {
pub config: Arc<TaskConfig>,
pub state: Arc<RwLock<TaskState>>,
/// Serializes plan generation, execution, checkpoint advancement, and dirty
/// window restoration for this flow. Without this, a manual flush and the
/// background loop can process the same checkpoint range concurrently.
execution_lock: Arc<Mutex<()>>,
}
/// Arguments for creating batching task
@@ -150,6 +150,16 @@ pub enum DirtyRestore {
Unscoped(DirtyTimeWindows),
}
struct ExecuteOnceOutcome {
new_query: Option<PlanInfo>,
/// Execution result of the generated insert plan.
///
/// `Ok(Some((affected_rows, elapsed)))` means a query was executed.
/// `Ok(None)` means no query was generated because there was no dirty signal.
/// `Err(_)` means plan generation or execution failed.
result: Result<Option<(usize, Duration)>, Error>,
}
impl BatchingTask {
#[allow(clippy::too_many_arguments)]
pub fn try_new(
@@ -168,6 +178,18 @@ impl BatchingTask {
flow_eval_interval,
}: TaskArgs<'_>,
) -> Result<Self, Error> {
let mut state = TaskState::with_dirty_time_windows(
query_ctx.clone(),
shutdown_rx,
DirtyTimeWindows::new(
batch_opts.experimental_max_filter_num_per_query,
batch_opts.experimental_time_window_merge_threshold,
),
);
if !batch_opts.experimental_enable_incremental_read {
state.disable_incremental();
}
Ok(Self {
config: Arc::new(TaskConfig {
flow_id,
@@ -182,7 +204,8 @@ impl BatchingTask {
batch_opts,
flow_eval_interval,
}),
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
state: Arc::new(RwLock::new(state)),
execution_lock: Arc::new(Mutex::new(())),
})
}
@@ -251,40 +274,75 @@ impl BatchingTask {
.context(ExternalSnafu)
}
pub async fn gen_exec_once(
pub(crate) async fn execute_once_serialized(
&self,
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
max_window_cnt: Option<usize>,
) -> Result<Option<(usize, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
let outcome = self
.execute_once_serialized_with_outcome(engine, frontend_client, max_window_cnt)
.await;
outcome.result
}
/// Executes one flow evaluation under `execution_lock` and keeps the
/// generated query context for the background loop's error logging/backoff.
async fn execute_once_serialized_with_outcome(
&self,
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
max_window_cnt: Option<usize>,
) -> ExecuteOnceOutcome {
let _execution_guard = self.execution_lock.lock().await;
self.execute_once_unlocked(engine, frontend_client, max_window_cnt)
.await
}
/// Executes one flow evaluation. Caller must hold `execution_lock`.
async fn execute_once_unlocked(
&self,
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
max_window_cnt: Option<usize>,
) -> ExecuteOnceOutcome {
let new_query = match self.gen_insert_plan_unlocked(engine, max_window_cnt).await {
Ok(new_query) => new_query,
Err(err) => {
return ExecuteOnceOutcome {
new_query: None,
result: Err(err),
};
}
};
if let Some(new_query) = new_query {
debug!("Generate new query: {}", new_query.plan);
let dirty_filter = match &new_query.dirty_restore {
DirtyRestore::Scoped(f) => Some(f),
_ => None,
};
match self
.execute_logical_plan(
let res = self
.execute_logical_plan_unlocked(
frontend_client,
&new_query.plan,
dirty_filter,
new_query.can_advance_checkpoints,
)
.await
{
Ok(result) => Ok(result),
Err(err) => {
self.handle_executed_query_failure(Some(&new_query));
Err(err)
}
.await;
if res.is_err() {
self.handle_executed_query_failure(Some(&new_query));
}
ExecuteOnceOutcome {
new_query: Some(new_query),
result: res,
}
} else {
debug!("Generate no query");
Ok(None)
ExecuteOnceOutcome {
new_query: None,
result: Ok(None),
}
}
}
pub async fn gen_insert_plan(
/// Generates the insert plan. Caller must reach this through the serialized path.
async fn gen_insert_plan_unlocked(
&self,
engine: &QueryEngineRef,
max_window_cnt: Option<usize>,
@@ -388,11 +446,11 @@ impl BatchingTask {
Ok(())
}
pub async fn execute_logical_plan(
/// Executes the insert plan. Caller must reach this through the serialized path.
async fn execute_logical_plan_unlocked(
&self,
frontend_client: &Arc<FrontendClient>,
plan: &LogicalPlan,
dirty_filter: Option<&FilterExprInfo>,
can_advance_checkpoints: bool,
) -> Result<Option<(usize, Duration)>, Error> {
let instant = Instant::now();
@@ -426,8 +484,7 @@ impl BatchingTask {
// For incremental-mode SQL queries, attempt to rewrite the delta aggregate
// plan into a safe delta-LEFT-JOIN-sink form before deciding on extensions.
let incremental_plan = if can_advance_checkpoints {
self.prepare_plan_for_incremental(&plan, dirty_filter)
.await?
self.prepare_plan_for_incremental(&plan).await?
} else {
None
};
@@ -580,6 +637,112 @@ impl BatchingTask {
})
}
fn restore_unscoped_dirty_windows(&self, dirty_windows: &DirtyTimeWindows) {
self.state
.write()
.unwrap()
.dirty_time_windows
.add_dirty_windows(dirty_windows);
}
fn restore_unscoped_dirty_windows_on_err<T>(
&self,
dirty_windows: &DirtyTimeWindows,
result: Result<T, Error>,
) -> Result<T, Error> {
result.inspect_err(|_| {
self.restore_unscoped_dirty_windows(dirty_windows);
})
}
fn drain_dirty_windows_signal(&self) -> (bool, DirtyTimeWindows) {
let mut state = self.state.write().unwrap();
let dirty_windows_to_restore = state.dirty_time_windows.clone();
let is_dirty = !dirty_windows_to_restore.is_empty();
state.dirty_time_windows.clean();
(is_dirty, dirty_windows_to_restore)
}
#[allow(clippy::too_many_arguments)]
async fn gen_unfiltered_plan_info(
&self,
engine: QueryEngineRef,
query_ctx: QueryContextRef,
sink_table_schema: Arc<Schema>,
primary_key_indices: &[usize],
allow_partial: bool,
dirty_windows_to_restore: DirtyTimeWindows,
retention_filter: Option<(&str, Timestamp, &'static str)>,
) -> Result<PlanInfo, Error> {
let mut plan = self.restore_unscoped_dirty_windows_on_err(
&dirty_windows_to_restore,
gen_plan_with_matching_schema(
&self.config.query,
query_ctx,
engine,
sink_table_schema,
primary_key_indices,
allow_partial,
)
.await,
)?;
if let Some((col_name, lower_bound, context)) = retention_filter {
let lower = self.restore_unscoped_dirty_windows_on_err(
&dirty_windows_to_restore,
to_df_literal(lower_bound),
)?;
let retention_filter = col(col_name).gt_eq(lit(lower));
let mut add_filter = AddFilterRewriter::new(retention_filter);
plan = self.restore_unscoped_dirty_windows_on_err(
&dirty_windows_to_restore,
plan.clone()
.rewrite(&mut add_filter)
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to apply {context} expire_after filter to plan:\n {}\n",
plan
),
})
.map(|rewrite| rewrite.data),
)?;
}
Ok(PlanInfo {
plan,
dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
can_advance_checkpoints: true,
})
}
async fn gen_unfiltered_plan_info_if_dirty(
&self,
engine: QueryEngineRef,
query_ctx: QueryContextRef,
sink_table_schema: Arc<Schema>,
primary_key_indices: &[usize],
allow_partial: bool,
retention_filter: Option<(&str, Timestamp, &'static str)>,
) -> Result<Option<PlanInfo>, Error> {
let (is_dirty, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
if !is_dirty {
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
return Ok(None);
}
self.gen_unfiltered_plan_info(
engine,
query_ctx,
sink_table_schema,
primary_key_indices,
allow_partial,
dirty_windows_to_restore,
retention_filter,
)
.await
.map(Some)
}
fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) {
if let Some(query) = query {
self.restore_dirty_windows_after_failure(query);
@@ -626,33 +789,11 @@ impl BatchingTask {
let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
Ok(new_query) => new_query,
Err(err) => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
// also sleep for a little while before try again to prevent flooding logs
tokio::time::sleep(min_refresh).await;
continue;
}
};
let outcome = self
.execute_once_serialized_with_outcome(&engine, &frontend_client, max_window_cnt)
.await;
let res = if let Some(new_query) = &new_query {
let dirty_filter = match &new_query.dirty_restore {
DirtyRestore::Scoped(f) => Some(f),
_ => None,
};
self.execute_logical_plan(
&frontend_client,
&new_query.plan,
dirty_filter,
new_query.can_advance_checkpoints,
)
.await
} else {
Ok(None)
};
match res {
match outcome.result {
// normal execute, sleep for some time before doing next query
Ok(Some(_)) => {
// can increase max_window_cnt to query more windows next time
@@ -703,11 +844,10 @@ impl BatchingTask {
}
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => {
self.handle_executed_query_failure(new_query.as_ref());
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
match new_query {
match outcome.new_query {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
// TODO(discord9): add some backoff here? half the query time window or what
@@ -743,6 +883,20 @@ impl BatchingTask {
create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
}
fn should_use_unfiltered_incremental_delta(&self) -> bool {
let state = self.state.read().unwrap();
state.checkpoint_mode() == CheckpointMode::Incremental
&& !state.is_incremental_disabled()
&& matches!(self.config.query_type, QueryType::Sql)
}
fn should_use_unfiltered_full_snapshot_seeding(&self) -> bool {
let state = self.state.read().unwrap();
state.checkpoint_mode() == CheckpointMode::FullSnapshot
&& !state.is_incremental_disabled()
&& matches!(self.config.query_type, QueryType::Sql)
}
/// will merge and use the first ten time window in query
async fn gen_query_with_time_window(
&self,
@@ -783,83 +937,35 @@ impl BatchingTask {
self.config.flow_id
);
// clean dirty time window too, this could be from create flow's check_execute
let (is_dirty, dirty_windows_to_restore) = {
let mut state = self.state.write().unwrap();
let dirty_windows_to_restore = state.dirty_time_windows.clone();
let is_dirty = !dirty_windows_to_restore.is_empty();
state.dirty_time_windows.clean();
(is_dirty, dirty_windows_to_restore)
};
if !is_dirty {
// no dirty data, hence no need to update
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
return Ok(None);
}
let plan = match gen_plan_with_matching_schema(
&self.config.query,
query_ctx,
engine,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
)
.await
{
Ok(plan) => plan,
Err(err) => {
self.state
.write()
.unwrap()
.dirty_time_windows
.add_dirty_windows(&dirty_windows_to_restore);
return Err(err);
}
};
return Ok(Some(PlanInfo {
plan,
dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
can_advance_checkpoints: true,
}));
return self
.gen_unfiltered_plan_info_if_dirty(
engine,
query_ctx,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
None,
)
.await;
}
_ => {
// Clean dirty windows for full-query/non-scoped paths,
// such as TQL, that cannot use a time-window filter.
let dirty_windows_to_restore = {
let mut state = self.state.write().unwrap();
let dirty_windows_to_restore = state.dirty_time_windows.clone();
state.dirty_time_windows.clean();
dirty_windows_to_restore
};
let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
let plan = match gen_plan_with_matching_schema(
&self.config.query,
query_ctx,
engine,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
)
.await
{
Ok(plan) => plan,
Err(err) => {
self.state
.write()
.unwrap()
.dirty_time_windows
.add_dirty_windows(&dirty_windows_to_restore);
return Err(err);
}
};
let plan_info = self
.gen_unfiltered_plan_info(
engine,
query_ctx,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
dirty_windows_to_restore,
None,
)
.await?;
return Ok(Some(PlanInfo {
plan,
dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
can_advance_checkpoints: true,
}));
return Ok(Some(plan_info));
}
};
@@ -889,22 +995,61 @@ impl BatchingTask {
),
})?;
if self.should_use_unfiltered_full_snapshot_seeding() {
// A full-snapshot query that can seed/refresh incremental
// checkpoints must not use dirty-window predicates. Rows can be
// written after dirty windows are drained but before the source scan
// snapshot opens; a stale dirty-window filter could exclude those
// rows while the returned watermark includes them, causing the next
// incremental read to skip them forever. Execute an unfiltered full
// snapshot instead, and keep dirty windows only as the scheduling and
// failure-restoration signal.
let retention_filter = self
.config
.expire_after
.map(|_| (col_name.as_str(), expire_lower_bound, "full-snapshot"));
return self
.gen_unfiltered_plan_info_if_dirty(
engine,
query_ctx,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
retention_filter,
)
.await;
}
if self.should_use_unfiltered_incremental_delta() {
// In incremental mode, source correctness is defined by the
// per-region sequence range `(checkpoint, scan-open snapshot]`, not
// by dirty-window predicates. Dirty windows are only a scheduling
// signal here. Applying a stale dirty-window filter to the source can
// exclude rows that are inside the returned watermark and make a
// checkpoint advance skip them forever. The sink side is also left
// unfiltered by dirty windows; the incremental rewrite joins the
// delta groups with the full sink state for correctness. Future
// dynamic filters can prune sink reads as a pure optimization.
let retention_filter = self
.config
.expire_after
.map(|_| (col_name.as_str(), expire_lower_bound, "incremental"));
return self
.gen_unfiltered_plan_info_if_dirty(
engine,
query_ctx,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
retention_filter,
)
.await;
}
let (expr, can_advance_checkpoints) = {
let mut state = self.state.write().unwrap();
let window_cnt = if state.checkpoint_mode() == CheckpointMode::Incremental
&& !state.is_incremental_disabled()
&& matches!(self.config.query_type, QueryType::Sql)
{
// Incremental scans are bounded by region sequence checkpoints,
// so the dirty-window filter only narrows sink-side/time-window
// work. Drain more windows than normal, but keep a hard cap to
// avoid building a huge OR filter after a long downtime. If
// windows remain, checkpoints won't advance this round.
MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS
} else {
max_window_cnt
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query)
};
let window_cnt = max_window_cnt
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query);
let expr = state.dirty_time_windows.gen_filter_exprs(
&col_name,
Some(expire_lower_bound),

View File

@@ -26,8 +26,7 @@ use snafu::ResultExt;
use table::metadata::TableId;
use crate::Error;
use crate::batching_mode::incremental_filter::build_sink_dirty_time_window_filter_expr;
use crate::batching_mode::state::{CheckpointMode, FilterExprInfo};
use crate::batching_mode::state::CheckpointMode;
use crate::batching_mode::table_creator::QueryType;
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::utils::{
@@ -74,7 +73,6 @@ impl BatchingTask {
pub(super) async fn prepare_plan_for_incremental(
&self,
plan: &LogicalPlan,
dirty_filter: Option<&FilterExprInfo>,
) -> Result<Option<LogicalPlan>, Error> {
let is_incremental_sql = {
let state = self.state.read().unwrap();
@@ -152,31 +150,12 @@ impl BatchingTask {
return Ok(None);
}
};
let sink_schema = sink_table.table_info().meta.schema.clone();
let sink_dirty_filter = match build_sink_dirty_time_window_filter_expr(
self.config.flow_id,
&analysis,
&sink_schema,
dirty_filter,
) {
Ok(filter) => filter,
Err(err) => {
warn!(
"Flow {} failed to build sink dirty time window filter; \
falling back to full snapshot for this round: {:?}",
self.config.flow_id, err
);
self.state.write().unwrap().mark_full_snapshot();
return Ok(None);
}
};
let rewritten_inner = match rewrite_incremental_aggregate_with_sink_merge(
&inner_plan,
&analysis,
sink_table,
&self.config.sink_table_name,
sink_dirty_filter,
None,
)
.await
{

View File

@@ -25,7 +25,9 @@ use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{TimestampMillisecondVector, UInt32Vector, VectorRef};
use pretty_assertions::assert_eq;
use query::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY};
use query::options::{
FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY, QueryOptions,
};
use session::context::QueryContext;
use table::test_util::MemTable;
@@ -38,6 +40,13 @@ use crate::batching_mode::state::CheckpointMode;
use crate::batching_mode::time_window::find_time_window_expr;
use crate::test_utils::create_test_query_engine;
fn incremental_batch_opts() -> Arc<BatchingModeOptions> {
Arc::new(BatchingModeOptions {
experimental_enable_incremental_read: true,
..Default::default()
})
}
async fn new_test_task_and_plan_with_missing_sink() -> (BatchingTask, LogicalPlan) {
new_test_task_engine_and_plan_with_query(
"SELECT number, ts FROM numbers_with_ts",
@@ -60,6 +69,15 @@ impl TestTaskParts {
}
async fn new_test_task_engine_and_plan_with_query(query: &str, sink_table: &str) -> TestTaskParts {
new_test_task_engine_and_plan_with_query_and_opts(query, sink_table, incremental_batch_opts())
.await
}
async fn new_test_task_engine_and_plan_with_query_and_opts(
query: &str,
sink_table: &str,
batch_opts: Arc<BatchingModeOptions>,
) -> TestTaskParts {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let plan = sql_to_df_plan(
@@ -91,7 +109,7 @@ async fn new_test_task_engine_and_plan_with_query(query: &str, sink_table: &str)
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
batch_opts,
flow_eval_interval: None,
})
.unwrap();
@@ -103,6 +121,75 @@ async fn new_test_task_engine_and_plan_with_query(query: &str, sink_table: &str)
}
}
#[tokio::test]
async fn test_incremental_read_is_disabled_by_default() {
let task = new_test_task_engine_and_plan_with_query_and_opts(
"SELECT number, ts FROM numbers_with_ts",
"numbers_with_ts",
Arc::new(BatchingModeOptions::default()),
)
.await
.task;
assert!(task.state.read().unwrap().is_incremental_disabled());
}
#[tokio::test]
async fn test_dirty_time_windows_uses_batch_opts() {
let task = new_test_task_engine_and_plan_with_query_and_opts(
"SELECT number, ts FROM numbers_with_ts",
"numbers_with_ts",
Arc::new(BatchingModeOptions {
experimental_max_filter_num_per_query: 7,
experimental_time_window_merge_threshold: 11,
..Default::default()
}),
)
.await
.task;
let state = task.state.read().unwrap();
assert_eq!(7, state.dirty_time_windows.max_filter_num_per_query());
assert_eq!(11, state.dirty_time_windows.time_window_merge_threshold());
}
#[tokio::test]
async fn test_execute_once_serialized_waits_for_execution_lock() {
let TestTaskParts {
task, query_engine, ..
} = new_test_task_engine_and_plan_with_query(
"SELECT number, ts FROM numbers_with_ts",
"missing_sink",
)
.await;
let (frontend_client, _handler) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
let frontend_client = Arc::new(frontend_client);
let guard = task.execution_lock.clone().lock_owned().await;
let task_to_run = task.clone();
let query_engine_to_run = query_engine.clone();
let frontend_client_to_run = frontend_client.clone();
let exec = tokio::spawn(async move {
task_to_run
.execute_once_serialized(&query_engine_to_run, &frontend_client_to_run, None)
.await
});
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(
!exec.is_finished(),
"execute_once_serialized should wait for execution_lock"
);
drop(guard);
tokio::time::timeout(Duration::from_secs(1), exec)
.await
.expect("execute_once_serialized should finish once execution_lock is released")
.expect("execute_once_serialized task should not panic")
.expect_err("missing sink should fail after acquiring execution_lock");
}
async fn new_time_window_test_task_with_query(query: &str) -> TestTaskParts {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
@@ -147,7 +234,7 @@ async fn new_time_window_test_task_with_query(query: &str) -> TestTaskParts {
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
batch_opts: incremental_batch_opts(),
flow_eval_interval: None,
})
.unwrap();
@@ -226,6 +313,14 @@ fn dirty_range(start: i64, end: i64) -> DirtyTimeWindows {
dirty
}
fn expire_after_for_retention_filter_test() -> i64 {
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
(now_secs - 10) as i64
}
async fn assert_unscoped_failure_restore(
consumed_dirty_windows: DirtyTimeWindows,
current_dirty_windows: DirtyTimeWindows,
@@ -626,6 +721,7 @@ async fn test_full_snapshot_scoped_plan_marks_checkpoint_advance_safe_only_after
.await;
{
let mut state = task.state.write().unwrap();
state.disable_incremental();
state
.dirty_time_windows
.add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
@@ -657,7 +753,7 @@ async fn test_full_snapshot_scoped_plan_marks_checkpoint_advance_safe_only_after
}
#[tokio::test]
async fn test_incremental_scoped_plan_consumes_all_dirty_windows_for_checkpoint_safety() {
async fn test_incremental_plan_consumes_dirty_signal_for_checkpoint_safety() {
let TestTaskParts {
task,
query_engine,
@@ -692,6 +788,192 @@ async fn test_incremental_scoped_plan_consumes_all_dirty_windows_for_checkpoint_
assert!(task.state.read().unwrap().dirty_time_windows.is_empty());
}
#[tokio::test]
async fn test_full_snapshot_seeding_for_incremental_does_not_add_dirty_window_filter() {
let TestTaskParts {
task,
query_engine,
..
} = new_time_window_test_task_with_query(
"SELECT max(number) AS number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window",
)
.await;
{
let mut state = task.state.write().unwrap();
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert!(!state.is_incremental_disabled());
state
.dirty_time_windows
.add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
state
.dirty_time_windows
.add_window(Timestamp::new_second(30), Some(Timestamp::new_second(35)));
}
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), false),
ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
]));
let plan = task
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1))
.await
.unwrap()
.unwrap();
let plan_text = plan.plan.to_string();
assert!(plan.can_advance_checkpoints);
assert!(task.state.read().unwrap().dirty_time_windows.is_empty());
assert!(!plan_text.contains("Filter:"), "{plan_text}");
}
#[tokio::test]
async fn test_full_snapshot_seeding_applies_expire_after_retention_filter() {
let TestTaskParts {
mut task,
query_engine,
..
} = new_time_window_test_task_with_query(
"SELECT max(number) AS number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window",
)
.await;
{
let mut state = task.state.write().unwrap();
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert!(!state.is_incremental_disabled());
state
.dirty_time_windows
.add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
}
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), false),
ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
]));
Arc::get_mut(&mut task.config)
.expect("test task config should be uniquely owned")
.expire_after = Some(expire_after_for_retention_filter_test());
let plan = task
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1))
.await
.unwrap()
.unwrap();
assert!(plan.can_advance_checkpoints);
assert!(task.state.read().unwrap().dirty_time_windows.is_empty());
let plan_text = plan.plan.to_string();
assert!(
plan_text.contains("Filter: ts >= TimestampMillisecond("),
"{plan_text}"
);
}
#[tokio::test]
async fn test_incremental_plan_does_not_add_dirty_window_filter() {
let TestTaskParts {
task,
query_engine,
..
} = new_time_window_test_task_with_query(
"SELECT max(number) AS number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window",
)
.await;
{
let mut state = task.state.write().unwrap();
state.advance_checkpoints(HashMap::from([(1_u64, 10_u64)]));
state
.dirty_time_windows
.add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
}
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), false),
ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
]));
let plan = task
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1))
.await
.unwrap()
.unwrap();
let plan_text = plan.plan.to_string();
assert!(plan.can_advance_checkpoints);
assert!(!plan_text.contains("Filter:"), "{plan_text}");
}
#[tokio::test]
async fn test_incremental_delta_applies_expire_after_retention_filter() {
let TestTaskParts {
mut task,
query_engine,
..
} = new_time_window_test_task_with_query(
"SELECT max(number) AS number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window",
)
.await;
{
let mut state = task.state.write().unwrap();
state.advance_checkpoints(HashMap::from([(1_u64, 10_u64)]));
state
.dirty_time_windows
.add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
}
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), false),
ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
]));
Arc::get_mut(&mut task.config)
.expect("test task config should be uniquely owned")
.expire_after = Some(expire_after_for_retention_filter_test());
let plan = task
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, Some(1))
.await
.unwrap()
.unwrap();
assert!(plan.can_advance_checkpoints);
assert!(task.state.read().unwrap().dirty_time_windows.is_empty());
let plan_text = plan.plan.to_string();
assert!(
plan_text.contains("Filter: ts >= TimestampMillisecond("),
"{plan_text}"
);
}
#[tokio::test]
async fn test_non_scoped_path_generates_plan_with_empty_dirty_signal() {
let TestTaskParts {
mut task,
query_engine,
..
} = new_test_task_engine_and_plan_with_query(
"SELECT number, ts FROM numbers_with_ts",
"missing_sink",
)
.await;
Arc::get_mut(&mut task.config)
.expect("test task config should be uniquely owned")
.query_type = QueryType::Tql;
task.state.write().unwrap().dirty_time_windows.clean();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), false),
ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true),
]));
let plan = task
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, None)
.await
.unwrap()
.expect("non-scoped path should generate a plan even with an empty dirty signal");
assert!(plan.can_advance_checkpoints);
assert!(task.state.read().unwrap().dirty_time_windows.is_empty());
}
#[tokio::test]
async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_path() {
let (task, plan) = new_test_task_and_plan_with_missing_sink().await;
@@ -773,7 +1055,7 @@ async fn test_prepare_plan_for_incremental_disables_on_non_aggregate() {
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
batch_opts: incremental_batch_opts(),
flow_eval_interval: None,
})
.unwrap();
@@ -788,10 +1070,7 @@ async fn test_prepare_plan_for_incremental_disables_on_non_aggregate() {
CheckpointMode::Incremental
);
let incremental_plan = task
.prepare_plan_for_incremental(&dml_plan, None)
.await
.unwrap();
let incremental_plan = task.prepare_plan_for_incremental(&dml_plan).await.unwrap();
assert!(incremental_plan.is_none());
let state = task.state.read().unwrap();
assert!(state.is_incremental_disabled());
@@ -852,7 +1131,7 @@ async fn test_prepare_plan_for_incremental_falls_back_without_disable_on_rewrite
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
batch_opts: incremental_batch_opts(),
flow_eval_interval: None,
})
.unwrap();
@@ -866,10 +1145,7 @@ async fn test_prepare_plan_for_incremental_falls_back_without_disable_on_rewrite
CheckpointMode::Incremental
);
let incremental_plan = task
.prepare_plan_for_incremental(&dml_plan, None)
.await
.unwrap();
let incremental_plan = task.prepare_plan_for_incremental(&dml_plan).await.unwrap();
assert!(incremental_plan.is_none());
let state = task.state.read().unwrap();
assert!(!state.is_incremental_disabled());
@@ -928,7 +1204,7 @@ async fn test_prepare_plan_for_incremental_group_by_without_merge_columns_uses_o
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
batch_opts: incremental_batch_opts(),
flow_eval_interval: None,
})
.unwrap();
@@ -939,7 +1215,7 @@ async fn test_prepare_plan_for_incremental_group_by_without_merge_columns_uses_o
.advance_checkpoints(HashMap::from([(1_u64, 10_u64)]));
let incremental_plan = task
.prepare_plan_for_incremental(&dml_plan, None)
.prepare_plan_for_incremental(&dml_plan)
.await
.unwrap()
.expect("plain GROUP BY is incremental-safe without a rewrite");
@@ -962,7 +1238,7 @@ async fn test_auto_created_sql_aggregate_sink_reaches_incremental_safe() {
task.state.write().unwrap().dirty_time_windows.set_dirty();
let plan_info = task
.gen_insert_plan(&query_engine, None)
.gen_insert_plan_unlocked(&query_engine, None)
.await
.unwrap()
.unwrap();
@@ -973,7 +1249,7 @@ async fn test_auto_created_sql_aggregate_sink_reaches_incremental_safe() {
.unwrap()
.advance_checkpoints(HashMap::from([(1_u64, 10_u64)]));
let incremental_plan = task
.prepare_plan_for_incremental(&plan_info.plan, None)
.prepare_plan_for_incremental(&plan_info.plan)
.await
.unwrap();
let incremental_safe = incremental_plan.is_some();
@@ -1078,11 +1354,11 @@ async fn test_insert_plan_matching_failure_restores_consumed_dirty_marker() {
register_number_only_sink(&query_engine, sink_table);
task.state.write().unwrap().dirty_time_windows.set_dirty();
let result = task.gen_insert_plan(&query_engine, None).await;
let result = task.gen_insert_plan_unlocked(&query_engine, None).await;
assert!(result.is_err());
let _err = match result {
Ok(_) => panic!("gen_insert_plan should fail with a sink column mismatch"),
Ok(_) => panic!("gen_insert_plan_unlocked should fail with a sink column mismatch"),
Err(err) => err,
};
let state = task.state.read().unwrap();

View File

@@ -1288,9 +1288,10 @@ async fn test_rewrite_incremental_aggregate_with_left_join() {
#[tokio::test]
async fn test_rewrite_incremental_aggregate_filters_sink_dirty_time_window() {
// This verifies the rewrite placement when callers supply an already
// inferred sink dirty-window predicate. The task-level inference rules are
// covered by `infer_sink_time_window_filter_col` tests in task.rs.
// This verifies the rewrite placement when callers supply a sink predicate.
// The production incremental flow path currently leaves sink scans
// unfiltered for correctness and relies on future dynamic filters for
// pruning.
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sql = "SELECT max(number) AS number, date_bin(INTERVAL '1 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window";

View File

@@ -35,7 +35,9 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_reado
use common_catalog::{format_full_flow_name, format_full_table_name};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::create_flow::{DEFER_ON_MISSING_SOURCE_KEY, FlowType};
use common_meta::ddl::create_flow::{
DEFER_ON_MISSING_SOURCE_KEY, FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType,
};
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
use common_meta::procedure_executor::ExecutorContext;
@@ -114,7 +116,10 @@ struct DdlSubmitOptions {
timeout: Duration,
}
const ALLOWED_FLOW_OPTIONS: [&str; 1] = [DEFER_ON_MISSING_SOURCE_KEY];
const ALLOWED_FLOW_OPTIONS: [&str; 2] = [
DEFER_ON_MISSING_SOURCE_KEY,
FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY,
];
fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
@@ -187,7 +192,9 @@ fn validate_and_normalize_flow_options(
}
let normalized_value = match key.as_str() {
DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?,
DEFER_ON_MISSING_SOURCE_KEY | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY => {
normalize_flow_bool_option(&key, &value)?
}
_ => {
return InvalidSqlSnafu {
err_msg: format!(
@@ -2478,12 +2485,23 @@ mod test {
#[test]
fn test_validate_and_normalize_flow_options_valid() {
let options =
HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string())]);
let options = HashMap::from([
(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string()),
(
FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
"FALSE".to_string(),
),
]);
assert_eq!(
validate_and_normalize_flow_options(options).unwrap(),
HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),)])
HashMap::from([
(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),),
(
FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
"false".to_string(),
)
])
);
}
@@ -2497,7 +2515,7 @@ mod test {
assert!(
err.to_string()
.contains("unknown flow option 'foo', supported options: defer_on_missing_source")
.contains("unknown flow option 'foo', supported options: defer_on_missing_source, experimental_enable_incremental_read")
);
}

View File

@@ -1602,6 +1602,7 @@ experimental_grpc_max_retries = 3
experimental_frontend_scan_timeout = "30s"
experimental_max_filter_num_per_query = 20
experimental_time_window_merge_threshold = 3
experimental_enable_incremental_read = false
read_preference = "Leader"
[logging]

View File

@@ -1,3 +1,31 @@
-- Incremental aggregate reads only support append-only source tables because
-- update/upsert sources need old-value compensation.
CREATE TABLE incremental_non_append_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
);
Affected Rows: 0
CREATE FLOW incremental_non_append_flow SINK TO incremental_non_append_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
incremental_non_append_input
GROUP BY
time_window;
Error: 3001(EngineExecuteQuery), Unsupported: Flow incremental read requires append-only source table, but source table `greptime.public.incremental_non_append_input` is not append-only. Consider setting append_mode='true' on the source table or disabling experimental_enable_incremental_read
DROP TABLE incremental_non_append_input;
Affected Rows: 0
CREATE TABLE incremental_aggr_input (
host_id INT,
n INT,
@@ -9,7 +37,9 @@ CREATE TABLE incremental_aggr_input (
Affected Rows: 0
CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink AS
CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
min(n) AS min_n,

View File

@@ -1,3 +1,25 @@
-- Incremental aggregate reads only support append-only source tables because
-- update/upsert sources need old-value compensation.
CREATE TABLE incremental_non_append_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
);
CREATE FLOW incremental_non_append_flow SINK TO incremental_non_append_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
incremental_non_append_input
GROUP BY
time_window;
DROP TABLE incremental_non_append_input;
CREATE TABLE incremental_aggr_input (
host_id INT,
n INT,
@@ -7,7 +29,9 @@ CREATE TABLE incremental_aggr_input (
append_mode = 'true'
);
CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink AS
CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
min(n) AS min_n,

View File

@@ -12,7 +12,9 @@ CREATE TABLE flow_incr_memtable_input (
Affected Rows: 0
CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink AS
CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
min(n) AS min_n,

View File

@@ -10,7 +10,9 @@ CREATE TABLE flow_incr_memtable_input (
append_mode = 'true'
);
CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink AS
CREATE FLOW flow_incr_memtable SINK TO flow_incr_memtable_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
min(n) AS min_n,

View File

@@ -17,7 +17,9 @@ WITH (
Affected Rows: 0
CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink AS
CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
min(n) AS min_n,

View File

@@ -15,7 +15,9 @@ WITH (
append_mode = 'true'
);
CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink AS
CREATE FLOW flow_incr_part SINK TO flow_incr_part_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
min(n) AS min_n,

View File

@@ -476,7 +476,7 @@ SINK TO out_num_cnt_show
WITH (access_key_id = [true])
AS SELECT number AS n1 FROM numbers_input_show where number > 10;
Error: 1004(InvalidArguments), Invalid SQL, error: unknown flow option 'access_key_id', supported options: defer_on_missing_source
Error: 1004(InvalidArguments), Invalid SQL, error: unknown flow option 'access_key_id', supported options: defer_on_missing_source, experimental_enable_incremental_read
DROP FLOW filter_numbers_show;