diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 68fb3793e4..319ddcf2e7 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -630,8 +630,11 @@ impl BatchingEngine { let engine = self.query_engine.clone(); let frontend = self.frontend_client.clone(); - // check execute once first to detect any error early + // Create sink table if needed, then validate an existing/created sink schema before + // spawning the background task. This catches user-created sink schema mismatches at + // CREATE FLOW time instead of surfacing them later in the execution loop. task.check_or_create_sink_table(&engine, &frontend).await?; + task.validate_sink_table_schema(&engine).await?; let (start_tx, start_rx) = oneshot::channel(); diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index cbd6a05cc2..5bd328ad9a 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -265,6 +265,36 @@ impl BatchingTask { Ok(None) } + /// Validates that the sink table schema can accept this flow's output. + /// + /// This is a dry-run of the same schema matching logic used by runtime insert-plan + /// generation, but without adding dirty-window filters or executing the query. It is used + /// during CREATE FLOW to catch existing sink table mismatches early. + pub async fn validate_sink_table_schema(&self, engine: &QueryEngineRef) -> Result<(), Error> { + let (table, _) = get_table_info_df_schema( + self.config.catalog_manager.clone(), + self.config.sink_table_name.clone(), + ) + .await?; + + let table_meta = &table.table_info().meta; + let merge_mode_last_non_null = + is_merge_mode_last_non_null(&table_meta.options.extra_options); + let primary_key_indices = table_meta.primary_key_indices.clone(); + let query_ctx = self.state.read().unwrap().query_ctx.clone(); + + gen_plan_with_matching_schema( + &self.config.query, + query_ctx, + engine.clone(), + table_meta.schema.clone(), + &primary_key_indices, + merge_mode_last_non_null, + ) + .await + .map(|_| ()) + } + async fn is_table_exist(&self, table_name: &[String; 3]) -> Result { self.config .catalog_manager diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index e86b1ee3be..5e033c6ae7 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -33,9 +33,10 @@ use datafusion_common::{ }; use datafusion_expr::logical_plan::{Aggregate, TableScan}; use datafusion_expr::{ - Distinct, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, Projection, and, binary_expr, - bitwise_and, bitwise_or, bitwise_xor, is_null, or, when, + Distinct, ExprSchemable, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, Projection, and, + binary_expr, bitwise_and, bitwise_or, bitwise_xor, is_null, or, when, }; +use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaRef}; use query::QueryEngineRef; use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement}; @@ -955,7 +956,7 @@ pub(crate) async fn gen_plan_with_matching_schema( .clone() .rewrite(&mut add_auto_column) .with_context(|_| DatafusionSnafu { - context: format!("Failed to rewrite plan:\n {}\n", plan), + context: "Failed to rewrite plan".to_string(), })? .data; Ok(plan) @@ -1090,33 +1091,23 @@ impl ColumnMatcherRewriter { } /// modify the exprs in place so that it matches the schema and some auto columns are added - fn modify_project_exprs(&mut self, mut exprs: Vec) -> DfResult> { + fn modify_project_exprs( + &mut self, + mut exprs: Vec, + input_schema: &DFSchema, + ) -> DfResult> { if self.allow_partial { return self.modify_project_exprs_with_partial(exprs); } + let original_exprs = exprs.clone(); + let all_names = self .schema .column_schemas() .iter() .map(|c| c.name.clone()) .collect::>(); - // first match by position - for (idx, expr) in exprs.iter_mut().enumerate() { - if !all_names.contains(&expr.qualified_name().1) - && let Some(col_name) = self - .schema - .column_schemas() - .get(idx) - .map(|c| c.name.clone()) - { - // if the data type mismatched, later check_execute will error out - // hence no need to check it here, beside, optimize pass might be able to cast it - // so checking here is not necessary - *expr = expr.clone().alias(col_name); - } - } - // add columns if have different column count let query_col_cnt = exprs.len(); let table_col_cnt = self.schema.column_schemas().len(); @@ -1140,10 +1131,9 @@ impl ColumnMatcherRewriter { // is the update at column exprs.push(datafusion::prelude::now().alias(&last_col_schema.name)); } else { - // helpful error message - return Err(DataFusionError::Plan(format!( - "Expect the last column in table to be timestamp column, found column {} with type {:?}", - last_col_schema.name, last_col_schema.data_type + return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch( + &original_exprs, + self.schema.as_ref(), ))); } } else if query_col_cnt + 2 == table_col_cnt { @@ -1170,14 +1160,110 @@ impl ColumnMatcherRewriter { ))); } } else { - return Err(DataFusionError::Plan(format!( - "Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?}", - query_col_cnt, - exprs, - table_col_cnt, - self.schema.column_schemas() + return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch( + &original_exprs, + self.schema.as_ref(), ))); } + + self.match_extra_output_columns(exprs, input_schema, &original_exprs, &all_names) + } + + /// Match flow output columns whose names are not in the sink schema by the same position only. + /// + /// This keeps the legacy "omit output aliases and map by position" behavior, but only when the + /// sink column at the same index is actually missing from the flow output. If the extra output + /// would be aliased to a sink column that already exists elsewhere, report a schema mismatch + /// instead of guessing another sink column by type. + /// + /// In particular, this intentionally rejects cross-position remaps like + /// `record_time_window2 -> record_time_window`: they are easy to confuse with real schema + /// mismatches and should be fixed by giving the flow output the sink column name explicitly. + fn match_extra_output_columns( + &self, + mut exprs: Vec, + input_schema: &DFSchema, + original_exprs: &[Expr], + all_names: &BTreeSet, + ) -> DfResult> { + let mut output_names = exprs + .iter() + .map(|expr| expr.qualified_name().1) + .collect::>(); + let output_name_set = output_names.iter().cloned().collect::>(); + let extra_expr_indices = output_names + .iter() + .enumerate() + .filter_map(|(idx, name)| (!all_names.contains(name)).then_some(idx)) + .collect::>(); + let missing_sink_indices = self + .schema + .column_schemas() + .iter() + .enumerate() + .filter_map(|(idx, column)| (!output_name_set.contains(&column.name)).then_some(idx)) + .collect::>(); + + if extra_expr_indices.is_empty() && missing_sink_indices.is_empty() { + return Ok(exprs); + } + + if extra_expr_indices.len() != missing_sink_indices.len() { + return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch( + original_exprs, + self.schema.as_ref(), + ))); + } + + let mut positional_matches = Vec::new(); + for expr_idx in extra_expr_indices { + if !missing_sink_indices.contains(&expr_idx) { + return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch( + original_exprs, + self.schema.as_ref(), + ))); + } + + let target_col_schema = &self.schema.column_schemas()[expr_idx]; + let expr_type = + ConcreteDataType::from_arrow_type(&exprs[expr_idx].get_type(input_schema)?); + if is_obviously_incompatible_positional_match(&expr_type, &target_col_schema.data_type) + { + return Err(DataFusionError::Plan(format!( + "Cannot match flow output column '{}' to sink column '{}' by position: incompatible data types, flow output type is {:?}, sink column type is {:?}. {}", + output_names[expr_idx], + target_col_schema.name, + expr_type, + target_col_schema.data_type, + format_flow_sink_schema_mismatch(original_exprs, self.schema.as_ref()) + ))); + } + + let target_name = target_col_schema.name.clone(); + positional_matches.push(format!( + "{} -> {} (flow output type: {:?}, sink column type: {:?})", + output_names[expr_idx], target_name, expr_type, target_col_schema.data_type + )); + exprs[expr_idx] = exprs[expr_idx].clone().alias(target_name.clone()); + output_names[expr_idx] = target_name; + } + + if !positional_matches.is_empty() { + debug!( + "Matched flow output columns to sink columns by position: {:?}", + positional_matches + ); + } + + let duplicated_output_names = duplicate_names(&output_names); + if !duplicated_output_names.is_empty() { + return Err(DataFusionError::Plan(format!( + "Flow output schema contains duplicate column(s) after schema matching {:?}. {}", + duplicated_output_names, + format_flow_sink_schema_mismatch(&exprs, self.schema.as_ref()) + ))); + } + Ok(exprs) } @@ -1186,12 +1272,9 @@ impl ColumnMatcherRewriter { let query_col_cnt = exprs.len(); if query_col_cnt > table_col_cnt { - return Err(DataFusionError::Plan(format!( - "Expect query column count <= table column count, found {} query columns {:?}, {} table columns {:?}", - query_col_cnt, - exprs, - table_col_cnt, - self.schema.column_schemas() + return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch( + &exprs, + self.schema.as_ref(), ))); } @@ -1209,8 +1292,9 @@ impl ColumnMatcherRewriter { .collect(); if !missing.is_empty() { return Err(DataFusionError::Plan(format!( - "Column(s) {:?} required by sink table are missing from flow output when merge_mode=last_non_null", - missing + "Column(s) {:?} required by sink table are missing from flow output when merge_mode=last_non_null. {}", + missing, + format_flow_sink_schema_mismatch(&exprs, self.schema.as_ref()) ))); } @@ -1250,8 +1334,9 @@ impl ColumnMatcherRewriter { if !remap.is_empty() { let extra: Vec<_> = remap.keys().cloned().collect(); return Err(DataFusionError::Plan(format!( - "Flow output has extra column(s) {:?} not found in sink schema when merge_mode=last_non_null", - extra + "Flow output has extra column(s) {:?} not found in sink schema when merge_mode=last_non_null. {}", + extra, + format_flow_sink_schema_mismatch(&exprs, self.schema.as_ref()) ))); } @@ -1281,6 +1366,80 @@ impl ColumnMatcherRewriter { } } +fn is_obviously_incompatible_positional_match( + expr_type: &ConcreteDataType, + sink_type: &ConcreteDataType, +) -> bool { + // This is a coarse type-family guard for legacy positional aliasing, not a strict type equality + // check. For example, numeric width/sign differences are allowed here and left to downstream + // coercion, and untyped NULL can be coerced to any target type. Clearly different families such + // as timestamp vs string are rejected early. + if expr_type.is_null() || expr_type == sink_type { + return false; + } + + expr_type.is_timestamp() != sink_type.is_timestamp() + || expr_type.is_string() != sink_type.is_string() + || expr_type.is_boolean() != sink_type.is_boolean() + || expr_type.is_json() != sink_type.is_json() + || expr_type.is_vector() != sink_type.is_vector() +} + +fn duplicate_names(names: &[String]) -> Vec { + let mut seen = HashSet::new(); + let mut duplicated = BTreeSet::new(); + for name in names { + if !seen.insert(name.as_str()) { + duplicated.insert(name.as_str()); + } + } + duplicated.into_iter().map(str::to_string).collect() +} + +fn format_flow_sink_schema_mismatch( + query_exprs: &[Expr], + sink_schema: &datatypes::schema::Schema, +) -> String { + let flow_output_columns = query_exprs + .iter() + .map(|expr| expr.qualified_name().1) + .collect::>(); + let sink_table_columns = sink_schema + .column_schemas() + .iter() + .map(|col| col.name.clone()) + .collect::>(); + + let flow_output_set = flow_output_columns.iter().cloned().collect::>(); + let sink_table_set = sink_table_columns.iter().cloned().collect::>(); + + let mut extra_flow_columns = flow_output_columns + .iter() + .filter(|name| !sink_table_set.contains(*name)) + .cloned() + .collect::>(); + extra_flow_columns.sort(); + extra_flow_columns.dedup(); + + let mut missing_sink_columns = sink_table_columns + .iter() + .filter(|name| !flow_output_set.contains(*name)) + .cloned() + .collect::>(); + missing_sink_columns.sort(); + missing_sink_columns.dedup(); + + format!( + "Flow output schema does not match sink table schema: found {} flow output columns and {} sink table columns. flow output columns: {:?}, sink table columns: {:?}, extra flow columns not in sink: {:?}, missing sink columns from flow output: {:?}", + flow_output_columns.len(), + sink_table_columns.len(), + flow_output_columns, + sink_table_columns, + extra_flow_columns, + missing_sink_columns + ) +} + impl TreeNodeRewriter for ColumnMatcherRewriter { type Node = LogicalPlan; fn f_down(&mut self, mut node: Self::Node) -> DfResult> { @@ -1327,7 +1486,7 @@ impl TreeNodeRewriter for ColumnMatcherRewriter { // if not, wrap it in a projection if let LogicalPlan::Projection(project) = &node { let exprs = project.expr.clone(); - let exprs = self.modify_project_exprs(exprs)?; + let exprs = self.modify_project_exprs(exprs, project.input.schema())?; self.is_rewritten = true; let new_plan = @@ -1341,7 +1500,7 @@ impl TreeNodeRewriter for ColumnMatcherRewriter { field.name(), ))); } - let exprs = self.modify_project_exprs(exprs)?; + let exprs = self.modify_project_exprs(exprs, node.schema())?; self.is_rewritten = true; let new_plan = LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?); diff --git a/src/flow/src/batching_mode/utils/test.rs b/src/flow/src/batching_mode/utils/test.rs index 317b0a5475..9ca1186fb6 100644 --- a/src/flow/src/batching_mode/utils/test.rs +++ b/src/flow/src/batching_mode/utils/test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use catalog::RegisterTableRequest; use common_recordbatch::RecordBatch; use common_time::Timestamp; use datafusion_common::tree_node::TreeNode as _; @@ -29,7 +30,9 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::test_util::MemTable; use super::*; +use crate::batching_mode::BatchingModeOptions; use crate::batching_mode::state::FilterExprInfo; +use crate::batching_mode::task::{BatchingTask, TaskArgs}; use crate::test_utils::create_test_query_engine; fn u32_table(table_name: &str, columns: Vec<&str>, rows: usize) -> TableRef { @@ -432,9 +435,7 @@ async fn test_add_auto_column_rewriter() { // error datatype mismatch ( "SELECT number, ts FROM numbers_with_ts", - Err( - "Expect the last column in table to be timestamp column, found column atat with type Int8", - ), + Err("missing sink columns from flow output: [\"atat\"]"), vec![ ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), ColumnSchema::new( @@ -498,6 +499,383 @@ async fn test_add_auto_column_rewriter() { } } +#[tokio::test] +async fn test_gen_plan_with_matching_schema_reports_extra_flow_columns_before_positional_alias() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "max(numbers_with_ts.number)", + ConcreteDataType::uint32_datatype(), + true, + ), + ])); + + let err = gen_plan_with_matching_schema( + "SELECT number, number AS extra, ts, max(number) FROM numbers_with_ts GROUP BY number, ts", + ctx, + query_engine, + sink_schema, + &[], + false, + ) + .await + .unwrap_err() + .to_string(); + + assert!( + err.contains("Flow output schema does not match sink table schema"), + "{err}" + ); + assert!(err.contains("flow output columns"), "{err}"); + assert!(err.contains("sink table columns"), "{err}"); + assert!(err.contains("extra flow columns not in sink"), "{err}"); + assert!(err.contains("extra"), "{err}"); + assert!( + !err.contains("extra AS ts"), + "schema error should not primarily expose positional alias: {err}" + ); +} + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_rejects_positional_alias_type_mismatch() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "event_time", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "max(numbers_with_ts.number)", + ConcreteDataType::uint32_datatype(), + true, + ), + ])); + + let err = gen_plan_with_matching_schema( + "SELECT number, number AS not_time, max(number) FROM numbers_with_ts GROUP BY number", + ctx, + query_engine, + sink_schema, + &[], + false, + ) + .await + .unwrap_err() + .to_string(); + + assert!( + err.contains( + "Cannot match flow output column 'not_time' to sink column 'event_time' by position" + ), + "{err}" + ); + assert!(err.contains("incompatible data types"), "{err}"); + assert!( + !err.contains("not_time AS event_time"), + "schema error should not expose an incompatible positional alias: {err}" + ); +} + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_rejects_cross_position_extra_column_match() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "time_window", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ), + ])); + + let err = gen_plan_with_matching_schema( + "SELECT number, ts, date_bin('5 minutes', ts) AS time_window2 FROM numbers_with_ts GROUP BY number, ts, time_window2", + ctx, + query_engine, + sink_schema, + &[], + false, + ) + .await + .unwrap_err() + .to_string(); + + assert!( + err.contains("Flow output schema does not match sink table schema"), + "{err}" + ); + assert!(err.contains("time_window2"), "{err}"); + assert!(err.contains("time_window"), "{err}"); + assert!(!err.contains("DuplicateUnqualifiedField"), "{err}"); +} + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_accepts_out_of_order_matching_names() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "time_window", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ), + ])); + + let plan = gen_plan_with_matching_schema( + "SELECT number, ts, date_bin('5 minutes', ts) AS time_window FROM numbers_with_ts GROUP BY number, ts, time_window", + ctx, + query_engine, + sink_schema, + &[], + false, + ) + .await + .unwrap(); + let output_names = plan + .schema() + .fields() + .iter() + .map(|field| field.name().clone()) + .collect::>(); + + assert_eq!( + output_names, + vec![ + "number".to_string(), + "ts".to_string(), + "time_window".to_string() + ] + ); + assert!(duplicate_names(&output_names).is_empty()); +} + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_allows_numeric_positional_alias() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("renamed_number", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ])); + + let plan = gen_plan_with_matching_schema( + "SELECT number, ts FROM numbers_with_ts", + ctx, + query_engine, + sink_schema, + &[], + false, + ) + .await + .unwrap(); + let sql = df_plan_to_sql(&plan).unwrap(); + + assert_eq!( + "SELECT numbers_with_ts.number AS renamed_number, numbers_with_ts.ts FROM numbers_with_ts", + sql + ); +} + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_allows_null_positional_alias() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new("label", ConcreteDataType::string_datatype(), true), + ])); + + let plan = gen_plan_with_matching_schema( + "SELECT number, NULL AS label_placeholder FROM numbers_with_ts", + ctx, + query_engine, + sink_schema, + &[], + false, + ) + .await + .unwrap(); + let output_names = plan + .schema() + .fields() + .iter() + .map(|field| field.name().clone()) + .collect::>(); + let sql = df_plan_to_sql(&plan).unwrap(); + + assert_eq!( + output_names, + vec!["number".to_string(), "label".to_string()] + ); + assert!(sql.contains("NULL AS label"), "{sql}"); +} + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_accepts_matching_flow_schema() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new("extra", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "max(numbers_with_ts.number)", + ConcreteDataType::uint32_datatype(), + true, + ), + ])); + + let plan = gen_plan_with_matching_schema( + "SELECT number, number AS extra, ts, max(number) FROM numbers_with_ts GROUP BY number, ts", + ctx, + query_engine, + sink_schema, + &[], + false, + ) + .await + .unwrap(); + let sql = df_plan_to_sql(&plan).unwrap(); + + assert_eq!( + "SELECT numbers_with_ts.number, numbers_with_ts.number AS extra, numbers_with_ts.ts, max(numbers_with_ts.number) FROM numbers_with_ts GROUP BY numbers_with_ts.number, numbers_with_ts.ts", + sql + ); +} + +#[tokio::test] +async fn test_validate_sink_table_schema_rejects_existing_sink_missing_flow_column() { + let query_engine = create_test_query_engine(); + let query_ctx = QueryContext::arc(); + let sql = "SELECT number, number AS extra, max(number) FROM numbers_with_ts GROUP BY number"; + let plan = sql_to_df_plan(query_ctx.clone(), query_engine.clone(), sql, true) + .await + .unwrap(); + + let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap(); + let sink_table_name = [ + "greptime".to_string(), + "public".to_string(), + "existing_sink".to_string(), + ]; + let sink_table = u32_table( + "existing_sink", + vec!["number", "max(numbers_with_ts.number)"], + 0, + ); + catalog_manager + .register_table_sync(RegisterTableRequest { + catalog: sink_table_name[0].clone(), + schema: sink_table_name[1].clone(), + table_name: sink_table_name[2].clone(), + table_id: 4096, + table: sink_table, + }) + .unwrap(); + + let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + let task = BatchingTask::try_new(TaskArgs { + flow_id: 1, + query: sql, + plan, + time_window_expr: None, + expire_after: None, + sink_table_name, + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx, + catalog_manager, + shutdown_rx, + batch_opts: Arc::new(BatchingModeOptions::default()), + flow_eval_interval: None, + }) + .unwrap(); + + let err = task + .validate_sink_table_schema(&query_engine) + .await + .unwrap_err() + .to_string(); + + assert!( + err.contains("Flow output schema does not match sink table schema"), + "{err}" + ); + assert!(err.contains("extra"), "{err}"); +} + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_allow_partial_fills_nullable_columns() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("optional_value", ConcreteDataType::uint32_datatype(), true), + ])); + + let plan = gen_plan_with_matching_schema( + "SELECT number, ts FROM numbers_with_ts", + ctx, + query_engine, + sink_schema, + &[0], + true, + ) + .await + .unwrap(); + let sql = df_plan_to_sql(&plan).unwrap(); + + assert_eq!( + "SELECT numbers_with_ts.number, numbers_with_ts.ts, NULL AS optional_value FROM numbers_with_ts", + sql + ); +} + #[tokio::test] async fn test_find_group_by_exprs() { let testcases = vec![ @@ -1491,3 +1869,118 @@ async fn test_analyze_incremental_aggregate_plan_rejects_cast_wrapped_alias() { ); } } + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_last_non_null_rejects_missing_primary_key_column() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + // Sink table with primary_key_indices=[0] ("number"), time_index="ts", and merge_mode=last_non_null. + // The flow query omits "number", which is a required primary-key column. + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("optional_value", ConcreteDataType::uint32_datatype(), true), + ])); + + let err = gen_plan_with_matching_schema( + "SELECT ts FROM numbers_with_ts", + ctx, + query_engine, + sink_schema, + &[0], + true, + ) + .await + .unwrap_err() + .to_string(); + + assert!( + err.contains( + "required by sink table are missing from flow output when merge_mode=last_non_null" + ), + "{err}" + ); + assert!(err.contains("number"), "{err}"); +} + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_last_non_null_rejects_missing_time_index_column() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + // Sink table with primary_key_indices=[0] ("number"), time_index="ts", and merge_mode=last_non_null. + // The flow query omits "ts", which is a required time-index column. + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("optional_value", ConcreteDataType::uint32_datatype(), true), + ])); + + let err = gen_plan_with_matching_schema( + "SELECT number FROM numbers_with_ts", + ctx, + query_engine, + sink_schema, + &[0], + true, + ) + .await + .unwrap_err() + .to_string(); + + assert!( + err.contains( + "required by sink table are missing from flow output when merge_mode=last_non_null" + ), + "{err}" + ); + assert!(err.contains("ts"), "{err}"); +} + +#[tokio::test] +async fn test_gen_plan_with_matching_schema_last_non_null_rejects_extra_flow_column() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + // Sink table with merge_mode=last_non_null. + // Sink has 3 columns: number (pk), ts (time_index), optional_value (nullable). + // Flow outputs: number, number AS extra, ts → "extra" is not in sink schema. + // query_col_cnt(3) <= table_col_cnt(3), so the extra branch is reached. + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("optional_value", ConcreteDataType::uint32_datatype(), true), + ])); + + let err = gen_plan_with_matching_schema( + "SELECT number, number AS extra, ts FROM numbers_with_ts", + ctx, + query_engine, + sink_schema, + &[0], + true, + ) + .await + .unwrap_err() + .to_string(); + + assert!(err.contains("extra column(s)"), "{err}"); + assert!(err.contains("extra"), "{err}"); + assert!( + err.contains("Flow output schema does not match sink table schema"), + "{err}" + ); +} diff --git a/tests/cases/standalone/common/flow/flow_last_non_null.result b/tests/cases/standalone/common/flow/flow_last_non_null.result index 50cb46faa3..0c03c19399 100644 --- a/tests/cases/standalone/common/flow/flow_last_non_null.result +++ b/tests/cases/standalone/common/flow/flow_last_non_null.result @@ -162,6 +162,8 @@ CREATE TABLE approx_rate ( Affected Rows: 0 +-- Without merge_mode=last_non_null, this partial output is rejected at CREATE FLOW time. +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan CREATE FLOW find_approx_rate SINK TO approx_rate AS SELECT (max(byte) - min(byte)) / 30.0 as rate, @@ -172,24 +174,7 @@ from GROUP BY time_window; -Affected Rows: 0 - -INSERT INTO - bytes_log -VALUES - (NULL, '2023-01-01 00:00:01'), - (300, '2023-01-01 00:00:31'); - -Affected Rows: 2 - --- should return error -ADMIN FLUSH_FLOW('find_approx_rate'); - -Error: 1002(Unexpected), Failed to execute admin function flush_flow: Execution error: Internal error: 1003 - -DROP FLOW find_approx_rate; - -Affected Rows: 0 +Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Flow output schema does not match sink table schema: found 3 flow output columns and 4 sink table columns. flow output columns: [\"rate\", \"time_window\", \"update_at\"], sink table columns: [\"rate\", \"time_window\", \"update_at\", \"bb\"], extra flow columns not in sink: [], missing sink columns from flow output: [\"bb\"]") in context: Failed to rewrite plan DROP TABLE bytes_log; diff --git a/tests/cases/standalone/common/flow/flow_last_non_null.sql b/tests/cases/standalone/common/flow/flow_last_non_null.sql index 95ebe4aaa6..29c5444f95 100644 --- a/tests/cases/standalone/common/flow/flow_last_non_null.sql +++ b/tests/cases/standalone/common/flow/flow_last_non_null.sql @@ -84,6 +84,8 @@ CREATE TABLE approx_rate ( TIME INDEX(time_window) ); +-- Without merge_mode=last_non_null, this partial output is rejected at CREATE FLOW time. +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan CREATE FLOW find_approx_rate SINK TO approx_rate AS SELECT (max(byte) - min(byte)) / 30.0 as rate, @@ -93,16 +95,5 @@ from bytes_log GROUP BY time_window; - -INSERT INTO - bytes_log -VALUES - (NULL, '2023-01-01 00:00:01'), - (300, '2023-01-01 00:00:31'); - --- should return error -ADMIN FLUSH_FLOW('find_approx_rate'); - -DROP FLOW find_approx_rate; DROP TABLE bytes_log; DROP TABLE approx_rate; diff --git a/tests/cases/standalone/common/flow/flow_sink_schema_mismatch.result b/tests/cases/standalone/common/flow/flow_sink_schema_mismatch.result new file mode 100644 index 0000000000..54fcba2285 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_sink_schema_mismatch.result @@ -0,0 +1,123 @@ +-- Verify that batching flow rejects CREATE FLOW when the pre-existing sink +-- table schema does not match the flow output (create-time validation, not runtime). +CREATE TABLE source_mm ( + "number" INT, + extra STRING, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +-- Pre-create a sink table that is intentionally missing the "extra" column. +-- This case validates batching mode at CREATE FLOW time, before any INSERT/FLUSH. +CREATE TABLE sink_mm ( + "number" INT, + time_window TIMESTAMP TIME INDEX, + cnt BIGINT +); + +Affected Rows: 0 + +-- This CREATE FLOW should fail immediately: the flow outputs (number, extra, time_window, cnt) +-- but sink_mm has only (number, time_window, cnt). +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan +CREATE FLOW mismatch_flow SINK TO sink_mm AS +SELECT + "number", + extra, + date_bin(INTERVAL '1 second', ts) as time_window, + count(*) as cnt +FROM + source_mm +GROUP BY + "number", extra, time_window; + +Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Flow output schema does not match sink table schema: found 4 flow output columns and 3 sink table columns. flow output columns: [\"number\", \"extra\", \"time_window\", \"cnt\"], sink table columns: [\"number\", \"time_window\", \"cnt\"], extra flow columns not in sink: [\"extra\"], missing sink columns from flow output: []") in context: Failed to rewrite plan + +DROP TABLE source_mm; + +Affected Rows: 0 + +DROP TABLE sink_mm; + +Affected Rows: 0 + +-- TQL/PromQL flows use the same create-time sink schema validation path. +CREATE TABLE tql_source_mm ( + `value` DOUBLE, + ts TIMESTAMP TIME INDEX, + sensor STRING, + loc STRING, + PRIMARY KEY (sensor, loc) +); + +Affected Rows: 0 + +-- Pre-create a TQL sink table that is intentionally missing the "sensor" tag column. +CREATE TABLE tql_sink_mm ( + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +-- This CREATE FLOW should fail immediately: the TQL output has (value, sensor, ts), +-- but tql_sink_mm has only (value, ts). +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan +CREATE FLOW tql_mismatch_flow +SINK TO tql_sink_mm +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '1m') +avg by(sensor) (tql_source_mm) AS value; + +Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Flow output schema does not match sink table schema: found 3 flow output columns and 2 sink table columns. flow output columns: [\"value\", \"sensor\", \"ts\"], sink table columns: [\"value\", \"ts\"], extra flow columns not in sink: [\"sensor\"], missing sink columns from flow output: []") in context: Failed to rewrite plan + +DROP TABLE tql_source_mm; + +Affected Rows: 0 + +DROP TABLE tql_sink_mm; + +Affected Rows: 0 + +-- Real merge_mode=last_non_null sink options should enable partial schema validation. +CREATE TABLE lnn_source_mm ( + device STRING, + val DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE TABLE lnn_sink_mm ( + device STRING, + time_window TIMESTAMP TIME INDEX, + cnt BIGINT, + PRIMARY KEY (device) +) WITH('merge_mode'='last_non_null'); + +Affected Rows: 0 + +-- This CREATE FLOW should fail through the last_non_null partial validator: the +-- sink primary key "device" is required but absent from the flow output. +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan +CREATE FLOW lnn_missing_pk_flow +SINK TO lnn_sink_mm AS +SELECT + date_bin(INTERVAL '1 second', ts) as time_window, + count(*) as cnt +FROM + lnn_source_mm +GROUP BY + time_window; + +Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Column(s) [\"device\"] required by sink table are missing from flow output when merge_mode=last_non_null. Flow output schema does not match sink table schema: found 2 flow output columns and 3 sink table columns. flow output columns: [\"time_window\", \"cnt\"], sink table columns: [\"device\", \"time_window\", \"cnt\"], extra flow columns not in sink: [], missing sink columns from flow output: [\"device\"]") in context: Failed to rewrite plan + +DROP TABLE lnn_source_mm; + +Affected Rows: 0 + +DROP TABLE lnn_sink_mm; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_sink_schema_mismatch.sql b/tests/cases/standalone/common/flow/flow_sink_schema_mismatch.sql new file mode 100644 index 0000000000..2d00799817 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_sink_schema_mismatch.sql @@ -0,0 +1,89 @@ +-- Verify that batching flow rejects CREATE FLOW when the pre-existing sink +-- table schema does not match the flow output (create-time validation, not runtime). +CREATE TABLE source_mm ( + "number" INT, + extra STRING, + ts TIMESTAMP TIME INDEX +); + +-- Pre-create a sink table that is intentionally missing the "extra" column. +-- This case validates batching mode at CREATE FLOW time, before any INSERT/FLUSH. +CREATE TABLE sink_mm ( + "number" INT, + time_window TIMESTAMP TIME INDEX, + cnt BIGINT +); + +-- This CREATE FLOW should fail immediately: the flow outputs (number, extra, time_window, cnt) +-- but sink_mm has only (number, time_window, cnt). +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan +CREATE FLOW mismatch_flow SINK TO sink_mm AS +SELECT + "number", + extra, + date_bin(INTERVAL '1 second', ts) as time_window, + count(*) as cnt +FROM + source_mm +GROUP BY + "number", extra, time_window; + +DROP TABLE source_mm; +DROP TABLE sink_mm; + +-- TQL/PromQL flows use the same create-time sink schema validation path. +CREATE TABLE tql_source_mm ( + `value` DOUBLE, + ts TIMESTAMP TIME INDEX, + sensor STRING, + loc STRING, + PRIMARY KEY (sensor, loc) +); + +-- Pre-create a TQL sink table that is intentionally missing the "sensor" tag column. +CREATE TABLE tql_sink_mm ( + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +-- This CREATE FLOW should fail immediately: the TQL output has (value, sensor, ts), +-- but tql_sink_mm has only (value, ts). +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan +CREATE FLOW tql_mismatch_flow +SINK TO tql_sink_mm +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '1m') +avg by(sensor) (tql_source_mm) AS value; + +DROP TABLE tql_source_mm; +DROP TABLE tql_sink_mm; + +-- Real merge_mode=last_non_null sink options should enable partial schema validation. +CREATE TABLE lnn_source_mm ( + device STRING, + val DOUBLE, + ts TIMESTAMP TIME INDEX +); + +CREATE TABLE lnn_sink_mm ( + device STRING, + time_window TIMESTAMP TIME INDEX, + cnt BIGINT, + PRIMARY KEY (device) +) WITH('merge_mode'='last_non_null'); + +-- This CREATE FLOW should fail through the last_non_null partial validator: the +-- sink primary key "device" is required but absent from the flow output. +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan +CREATE FLOW lnn_missing_pk_flow +SINK TO lnn_sink_mm AS +SELECT + date_bin(INTERVAL '1 second', ts) as time_window, + count(*) as cnt +FROM + lnn_source_mm +GROUP BY + time_window; + +DROP TABLE lnn_source_mm; +DROP TABLE lnn_sink_mm; diff --git a/tests/cases/standalone/flow-tql/flow_tql_missing_value_sink_schema.result b/tests/cases/standalone/flow-tql/flow_tql_missing_value_sink_schema.result new file mode 100644 index 0000000000..53df353078 --- /dev/null +++ b/tests/cases/standalone/flow-tql/flow_tql_missing_value_sink_schema.result @@ -0,0 +1,90 @@ +-- Regression for a TQL flow whose pre-created sink table is missing the value +-- output column. The labels are intentionally minimal and anonymous. +CREATE DATABASE source_schema; + +Affected Rows: 1 + +CREATE DATABASE sink_schema; + +Affected Rows: 1 + +USE source_schema; + +Affected Rows: 0 + +CREATE TABLE metric_input ( + namespace STRING NULL, + app STRING NULL, + greptime_timestamp TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (greptime_timestamp), + PRIMARY KEY (namespace, app) +); + +Affected Rows: 0 + +INSERT INTO metric_input VALUES + ('ns', 'app-a', '2026-01-23T03:40:00Z', 10.0), + ('ns', 'app-a', '2026-01-23T03:50:00Z', 20.0); + +Affected Rows: 2 + +USE sink_schema; + +Affected Rows: 0 + +-- Intentionally omit greptime_value DOUBLE from the pre-created sink table. +CREATE TABLE missing_value_sink ( + namespace STRING NULL, + app STRING NULL, + greptime_timestamp TIMESTAMP(3) NOT NULL, + TIME INDEX (greptime_timestamp), + PRIMARY KEY (namespace, app) +) +ENGINE=mito; + +Affected Rows: 0 + +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan +CREATE FLOW missing_value_flow +SINK TO sink_schema.missing_value_sink +EVAL INTERVAL '3600 s' +AS TQL EVAL ( + date_bin('2m'::interval, now() - '2m'::interval), + date_bin('2m'::interval, now() - '2m'::interval), + '1h' +) + avg by (namespace, app) ( + avg_over_time(metric_input{__schema__="source_schema"}[1h]) + ); + +Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Flow output schema does not match sink table schema: found 4 flow output columns and 3 sink table columns. flow output columns: [\"namespace\", \"app\", \"greptime_timestamp\", \"avg(prom_avg_over_time(greptime_timestamp_range,greptime_value))\"], sink table columns: [\"namespace\", \"app\", \"greptime_timestamp\"], extra flow columns not in sink: [\"avg(prom_avg_over_time(greptime_timestamp_range,greptime_value))\"], missing sink columns from flow output: []") in context: Failed to rewrite plan + +DROP FLOW IF EXISTS missing_value_flow; + +Affected Rows: 0 + +DROP TABLE missing_value_sink; + +Affected Rows: 0 + +USE source_schema; + +Affected Rows: 0 + +DROP TABLE metric_input; + +Affected Rows: 0 + +USE public; + +Affected Rows: 0 + +DROP DATABASE sink_schema; + +Affected Rows: 0 + +DROP DATABASE source_schema; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow-tql/flow_tql_missing_value_sink_schema.sql b/tests/cases/standalone/flow-tql/flow_tql_missing_value_sink_schema.sql new file mode 100644 index 0000000000..3693775800 --- /dev/null +++ b/tests/cases/standalone/flow-tql/flow_tql_missing_value_sink_schema.sql @@ -0,0 +1,55 @@ +-- Regression for a TQL flow whose pre-created sink table is missing the value +-- output column. The labels are intentionally minimal and anonymous. + +CREATE DATABASE source_schema; +CREATE DATABASE sink_schema; + +USE source_schema; + +CREATE TABLE metric_input ( + namespace STRING NULL, + app STRING NULL, + greptime_timestamp TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (greptime_timestamp), + PRIMARY KEY (namespace, app) +); + +INSERT INTO metric_input VALUES + ('ns', 'app-a', '2026-01-23T03:40:00Z', 10.0), + ('ns', 'app-a', '2026-01-23T03:50:00Z', 20.0); + +USE sink_schema; + +-- Intentionally omit greptime_value DOUBLE from the pre-created sink table. +CREATE TABLE missing_value_sink ( + namespace STRING NULL, + app STRING NULL, + greptime_timestamp TIMESTAMP(3) NOT NULL, + TIME INDEX (greptime_timestamp), + PRIMARY KEY (namespace, app) +) +ENGINE=mito; + +-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan +CREATE FLOW missing_value_flow +SINK TO sink_schema.missing_value_sink +EVAL INTERVAL '3600 s' +AS TQL EVAL ( + date_bin('2m'::interval, now() - '2m'::interval), + date_bin('2m'::interval, now() - '2m'::interval), + '1h' +) + avg by (namespace, app) ( + avg_over_time(metric_input{__schema__="source_schema"}[1h]) + ); + +DROP FLOW IF EXISTS missing_value_flow; +DROP TABLE missing_value_sink; + +USE source_schema; +DROP TABLE metric_input; + +USE public; +DROP DATABASE sink_schema; +DROP DATABASE source_schema;