From a6e69bc180955e78bf65a33a1c56ab11e0ad09dd Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 30 Jan 2026 20:34:33 +0800 Subject: [PATCH] feat: flow last non null (#7646) * feat: flow last non null support Signed-off-by: discord9 * clippy Signed-off-by: discord9 * test: report error when sink is not last non null Signed-off-by: discord9 * fix: error if query column matches nothing if partial Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/flow/src/batching_mode/task.rs | 29 ++- src/flow/src/batching_mode/utils.rs | 126 ++++++++++- .../common/flow/flow_last_non_null.result | 201 ++++++++++++++++++ .../common/flow/flow_last_non_null.sql | 108 ++++++++++ 4 files changed, 456 insertions(+), 8 deletions(-) create mode 100644 tests/cases/standalone/common/flow/flow_last_non_null.result create mode 100644 tests/cases/standalone/common/flow/flow_last_non_null.sql diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 2d0ae40bd3..252735cdec 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -37,6 +37,7 @@ use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt, ensure}; use sql::parser::{ParseOptions, ParserContext}; use sql::statements::statement::Statement; +use store_api::mito_engine_options::MERGE_MODE_KEY; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::adapter::DfTableProviderAdapter; use tokio::sync::oneshot; @@ -101,6 +102,13 @@ fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result) -> bool { + options + .get(MERGE_MODE_KEY) + .map(|mode| mode.eq_ignore_ascii_case("last_non_null")) + .unwrap_or(false) +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum QueryType { /// query is a tql query @@ -260,10 +268,17 @@ impl BatchingTask { ) .await?; + let table_meta = &table.table_info().meta; + let merge_mode_last_non_null = + is_merge_mode_last_non_null(&table_meta.options.extra_options); + let primary_key_indices = table_meta.primary_key_indices.clone(); + let new_query = self .gen_query_with_time_window( engine.clone(), &table.table_info().meta.schema, + &primary_key_indices, + merge_mode_last_non_null, max_window_cnt, ) .await?; @@ -596,6 +611,8 @@ impl BatchingTask { &self, engine: QueryEngineRef, sink_table_schema: &Arc, + primary_key_indices: &[usize], + allow_partial: bool, max_window_cnt: Option, ) -> Result, Error> { let query_ctx = self.state.read().unwrap().query_ctx.clone(); @@ -643,6 +660,8 @@ impl BatchingTask { query_ctx, engine, sink_table_schema.clone(), + primary_key_indices, + allow_partial, ) .await?; @@ -657,6 +676,8 @@ impl BatchingTask { query_ctx, engine, sink_table_schema.clone(), + primary_key_indices, + allow_partial, ) .await?; @@ -725,7 +746,11 @@ impl BatchingTask { }; let mut add_filter = AddFilterRewriter::new(expr.expr.clone()); - let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema.clone()); + let mut add_auto_column = ColumnMatcherRewriter::new( + sink_table_schema.clone(), + primary_key_indices.to_vec(), + allow_partial, + ); let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?; diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index e40e0c5e49..dfbadbfc72 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -14,7 +14,7 @@ //! some utils for helping with batching mode -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use catalog::CatalogManagerRef; @@ -28,7 +28,7 @@ use datafusion_common::tree_node::{ }; use datafusion_common::{DFSchema, DataFusionError, ScalarValue}; use datafusion_expr::{Distinct, LogicalPlan, Projection}; -use datatypes::schema::SchemaRef; +use datatypes::schema::{ColumnSchema, SchemaRef}; use query::QueryEngineRef; use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement}; use session::context::QueryContextRef; @@ -38,7 +38,7 @@ use sql::statements::statement::Statement; use sql::statements::tql::Tql; use table::TableRef; -use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL; +use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; use crate::df_optimizer::apply_df_optimizer; use crate::error::{DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, TableNotFoundSnafu}; use crate::{Error, TableName}; @@ -143,10 +143,16 @@ pub(crate) async fn gen_plan_with_matching_schema( query_ctx: QueryContextRef, engine: QueryEngineRef, sink_table_schema: SchemaRef, + primary_key_indices: &[usize], + allow_partial: bool, ) -> Result { let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), sql, false).await?; - let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema); + let mut add_auto_column = ColumnMatcherRewriter::new( + sink_table_schema, + primary_key_indices.to_vec(), + allow_partial, + ); let plan = plan .clone() .rewrite(&mut add_auto_column) @@ -271,18 +277,26 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName { pub struct ColumnMatcherRewriter { pub schema: SchemaRef, pub is_rewritten: bool, + pub primary_key_indices: Vec, + pub allow_partial: bool, } impl ColumnMatcherRewriter { - pub fn new(schema: SchemaRef) -> Self { + pub fn new(schema: SchemaRef, primary_key_indices: Vec, allow_partial: bool) -> Self { Self { schema, is_rewritten: false, + primary_key_indices, + allow_partial, } } /// modify the exprs in place so that it matches the schema and some auto columns are added fn modify_project_exprs(&mut self, mut exprs: Vec) -> DfResult> { + if self.allow_partial { + return self.modify_project_exprs_with_partial(exprs); + } + let all_names = self .schema .column_schemas() @@ -368,6 +382,105 @@ impl ColumnMatcherRewriter { } Ok(exprs) } + + fn modify_project_exprs_with_partial(&mut self, exprs: Vec) -> DfResult> { + let table_col_cnt = self.schema.column_schemas().len(); + let query_col_cnt = exprs.len(); + + if query_col_cnt > table_col_cnt { + return Err(DataFusionError::Plan(format!( + "Expect query column count <= table column count, found {} query columns {:?}, {} table columns {:?}", + query_col_cnt, + exprs, + table_col_cnt, + self.schema.column_schemas() + ))); + } + + let name_to_expr: HashMap = exprs + .clone() + .into_iter() + .map(|e| (e.qualified_name().1, e)) + .collect(); + + let required_columns = self.required_columns_for_partial(); + let missing: Vec<_> = required_columns + .iter() + .filter(|name| !name_to_expr.contains_key(*name)) + .cloned() + .collect(); + if !missing.is_empty() { + return Err(DataFusionError::Plan(format!( + "Column(s) {:?} required by sink table are missing from flow output when merge_mode=last_non_null", + missing + ))); + } + + let placeholder_ts_expr = + datafusion::logical_expr::lit(ScalarValue::TimestampMillisecond(Some(0), None)) + .alias(AUTO_CREATED_PLACEHOLDER_TS_COL); + + let timestamp_index = self.schema.timestamp_index(); + let mut remap = name_to_expr; + let mut new_exprs = Vec::with_capacity(table_col_cnt); + + for (idx, col_schema) in self.schema.column_schemas().iter().enumerate() { + let col_name = col_schema.name.clone(); + if let Some(expr) = remap.remove(&col_name) { + let expr = if expr.qualified_name().1 == col_name { + expr + } else { + expr.alias(col_name.clone()) + }; + new_exprs.push(expr); + continue; + } + + if col_name == AUTO_CREATED_PLACEHOLDER_TS_COL && timestamp_index == Some(idx) { + new_exprs.push(placeholder_ts_expr.clone()); + continue; + } + + if col_name == AUTO_CREATED_UPDATE_AT_TS_COL && col_schema.data_type.is_timestamp() { + new_exprs.push(datafusion::prelude::now().alias(&col_name)); + continue; + } + + new_exprs.push(Self::null_expr(col_schema)); + } + + if !remap.is_empty() { + let extra: Vec<_> = remap.keys().cloned().collect(); + return Err(DataFusionError::Plan(format!( + "Flow output has extra column(s) {:?} not found in sink schema when merge_mode=last_non_null", + extra + ))); + } + + Ok(new_exprs) + } + + fn null_expr(col_schema: &ColumnSchema) -> Expr { + Expr::Literal(ScalarValue::Null, None).alias(col_schema.name.clone()) + } + + fn required_columns_for_partial(&self) -> HashSet { + let mut required = HashSet::new(); + for idx in &self.primary_key_indices { + if let Some(col) = self.schema.column_schemas().get(*idx) { + required.insert(col.name.clone()); + } + } + + if let Some(ts_idx) = self.schema.timestamp_index() + && let Some(col) = self.schema.column_schemas().get(ts_idx) + && col.name != AUTO_CREATED_PLACEHOLDER_TS_COL + { + required.insert(col.name.clone()); + } + + required + } } impl TreeNodeRewriter for ColumnMatcherRewriter { @@ -721,7 +834,8 @@ mod test { let ctx = QueryContext::arc(); for (before, after, column_schemas) in testcases { let schema = Arc::new(Schema::new(column_schemas)); - let mut add_auto_column_rewriter = ColumnMatcherRewriter::new(schema); + let mut add_auto_column_rewriter = + ColumnMatcherRewriter::new(schema, Vec::new(), false); let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false) .await diff --git a/tests/cases/standalone/common/flow/flow_last_non_null.result b/tests/cases/standalone/common/flow/flow_last_non_null.result new file mode 100644 index 0000000000..64cc8d7eed --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_last_non_null.result @@ -0,0 +1,201 @@ +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +Affected Rows: 0 + +-- TODO(discord9): remove this after auto infer table's time index is impl +CREATE TABLE approx_rate ( + rate DOUBLE NULL, + time_window TIMESTAMP, + update_at TIMESTAMP, + bb DOUBLE NULL, + TIME INDEX(time_window) +)with('merge_mode'='last_non_null'); + +Affected Rows: 0 + +INSERT INTO approx_rate(rate, time_window, update_at) VALUES (0.0, '2023-01-01 00:00:00', TIMESTAMP '2023-01-01 00:00:00.100'); + +Affected Rows: 1 + +INSERT INTO approx_rate(time_window, update_at, bb) VALUES ('2023-01-01 00:00:00', TIMESTAMP '2023-01-01 00:00:00.200', 50.0); + +Affected Rows: 1 + +select * from approx_rate; + ++------+---------------------+-------------------------+------+ +| rate | time_window | update_at | bb | ++------+---------------------+-------------------------+------+ +| 0.0 | 2023-01-01T00:00:00 | 2023-01-01T00:00:00.200 | 50.0 | ++------+---------------------+-------------------------+------+ + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window, + TIMESTAMP '2023-01-01 00:00:10' as update_at +from + bytes_log +GROUP BY + time_window; + +Affected Rows: 0 + +SHOW CREATE TABLE approx_rate; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( | +| | "rate" DOUBLE NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "bb" DOUBLE NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | merge_mode = 'last_non_null' | +| | ) | ++-------------+--------------------------------------------+ + +INSERT INTO + bytes_log +VALUES + (NULL, '2023-01-01 00:00:01'), + (300, '2023-01-01 00:00:31'); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('find_approx_rate') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT * FROM approx_rate; + ++------+---------------------+---------------------+------+ +| rate | time_window | update_at | bb | ++------+---------------------+---------------------+------+ +| 0.0 | 2023-01-01T00:00:00 | 2023-01-01T00:00:10 | 50.0 | +| 0.0 | 2023-01-01T00:00:30 | 2023-01-01T00:00:10 | | ++------+---------------------+---------------------+------+ + +CREATE FLOW find_bb_only SINK TO approx_rate AS +SELECT + date_bin(INTERVAL '30 second', ts) as time_window, + TIMESTAMP '2023-01-01 00:00:59' as update_at, + CAST(max(byte) AS DOUBLE) as bb +from + bytes_log +GROUP BY + time_window; + +Affected Rows: 0 + +-- make new windows dirty after flow is created +INSERT INTO bytes_log VALUES (600, '2023-01-01 00:00:10'), (320, '2023-01-01 00:00:35'); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_bb_only'); + ++----------------------------------+ +| ADMIN FLUSH_FLOW('find_bb_only') | ++----------------------------------+ +| FLOW_FLUSHED | ++----------------------------------+ + +SELECT * FROM approx_rate; + ++------+---------------------+---------------------+-------+ +| rate | time_window | update_at | bb | ++------+---------------------+---------------------+-------+ +| 0.0 | 2023-01-01T00:00:00 | 2023-01-01T00:00:59 | 600.0 | +| 0.0 | 2023-01-01T00:00:30 | 2023-01-01T00:00:59 | 320.0 | ++------+---------------------+---------------------+-------+ + +DROP FLOW find_approx_rate; + +Affected Rows: 0 + +DROP FLOW find_bb_only; + +Affected Rows: 0 + +DROP TABLE bytes_log; + +Affected Rows: 0 + +DROP TABLE approx_rate; + +Affected Rows: 0 + +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +Affected Rows: 0 + +-- TODO(discord9): remove this after auto infer table's time index is impl +CREATE TABLE approx_rate ( + rate DOUBLE NULL, + time_window TIMESTAMP, + update_at TIMESTAMP, + bb DOUBLE NULL, + TIME INDEX(time_window) +); + +Affected Rows: 0 + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window, + TIMESTAMP '2023-01-01 00:00:10' as update_at +from + bytes_log +GROUP BY + time_window; + +Affected Rows: 0 + +INSERT INTO + bytes_log +VALUES + (NULL, '2023-01-01 00:00:01'), + (300, '2023-01-01 00:00:31'); + +Affected Rows: 2 + +-- should return error +ADMIN FLUSH_FLOW('find_approx_rate'); + +Error: 1002(Unexpected), Failed to execute admin function: Failed to execute admin function flush_flow, error: Execution error: Function execution error: Internal error: 1003: Execution error: Function execution error: Internal error: 1003 + +DROP FLOW find_approx_rate; + +Affected Rows: 0 + +DROP TABLE bytes_log; + +Affected Rows: 0 + +DROP TABLE approx_rate; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_last_non_null.sql b/tests/cases/standalone/common/flow/flow_last_non_null.sql new file mode 100644 index 0000000000..95ebe4aaa6 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_last_non_null.sql @@ -0,0 +1,108 @@ + +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +-- TODO(discord9): remove this after auto infer table's time index is impl +CREATE TABLE approx_rate ( + rate DOUBLE NULL, + time_window TIMESTAMP, + update_at TIMESTAMP, + bb DOUBLE NULL, + TIME INDEX(time_window) +)with('merge_mode'='last_non_null'); + +INSERT INTO approx_rate(rate, time_window, update_at) VALUES (0.0, '2023-01-01 00:00:00', TIMESTAMP '2023-01-01 00:00:00.100'); + +INSERT INTO approx_rate(time_window, update_at, bb) VALUES ('2023-01-01 00:00:00', TIMESTAMP '2023-01-01 00:00:00.200', 50.0); + +select * from approx_rate; + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window, + TIMESTAMP '2023-01-01 00:00:10' as update_at +from + bytes_log +GROUP BY + time_window; + +SHOW CREATE TABLE approx_rate; + +INSERT INTO + bytes_log +VALUES + (NULL, '2023-01-01 00:00:01'), + (300, '2023-01-01 00:00:31'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + +SELECT * FROM approx_rate; + +CREATE FLOW find_bb_only SINK TO approx_rate AS +SELECT + date_bin(INTERVAL '30 second', ts) as time_window, + TIMESTAMP '2023-01-01 00:00:59' as update_at, + CAST(max(byte) AS DOUBLE) as bb +from + bytes_log +GROUP BY + time_window; + +-- make new windows dirty after flow is created +INSERT INTO bytes_log VALUES (600, '2023-01-01 00:00:10'), (320, '2023-01-01 00:00:35'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_bb_only'); + +SELECT * FROM approx_rate; + +DROP FLOW find_approx_rate; +DROP FLOW find_bb_only; +DROP TABLE bytes_log; +DROP TABLE approx_rate; + + +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +-- TODO(discord9): remove this after auto infer table's time index is impl +CREATE TABLE approx_rate ( + rate DOUBLE NULL, + time_window TIMESTAMP, + update_at TIMESTAMP, + bb DOUBLE NULL, + TIME INDEX(time_window) +); + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window, + TIMESTAMP '2023-01-01 00:00:10' as update_at +from + bytes_log +GROUP BY + time_window; + +INSERT INTO + bytes_log +VALUES + (NULL, '2023-01-01 00:00:01'), + (300, '2023-01-01 00:00:31'); + +-- should return error +ADMIN FLUSH_FLOW('find_approx_rate'); + +DROP FLOW find_approx_rate; +DROP TABLE bytes_log; +DROP TABLE approx_rate;