feat: validate batching flow sink schema on create (#8176)

* feat: check schema on create

Signed-off-by: discord9 <discord9@163.com>

* chore: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* fix(flow): avoid duplicate fields when matching sink schema

Signed-off-by: discord9 <discord9@163.com>

* fix: null handling

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: debug log

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-02 16:24:50 +08:00
committed by GitHub
parent 50b1a07232
commit 7f76ad5439
10 changed files with 1093 additions and 75 deletions

View File

@@ -630,8 +630,11 @@ impl BatchingEngine {
let engine = self.query_engine.clone();
let frontend = self.frontend_client.clone();
// check execute once first to detect any error early
// Create sink table if needed, then validate an existing/created sink schema before
// spawning the background task. This catches user-created sink schema mismatches at
// CREATE FLOW time instead of surfacing them later in the execution loop.
task.check_or_create_sink_table(&engine, &frontend).await?;
task.validate_sink_table_schema(&engine).await?;
let (start_tx, start_rx) = oneshot::channel();

View File

@@ -265,6 +265,36 @@ impl BatchingTask {
Ok(None)
}
/// Validates that the sink table schema can accept this flow's output.
///
/// This is a dry-run of the same schema matching logic used by runtime insert-plan
/// generation, but without adding dirty-window filters or executing the query. It is used
/// during CREATE FLOW to catch existing sink table mismatches early.
pub async fn validate_sink_table_schema(&self, engine: &QueryEngineRef) -> Result<(), Error> {
let (table, _) = get_table_info_df_schema(
self.config.catalog_manager.clone(),
self.config.sink_table_name.clone(),
)
.await?;
let table_meta = &table.table_info().meta;
let merge_mode_last_non_null =
is_merge_mode_last_non_null(&table_meta.options.extra_options);
let primary_key_indices = table_meta.primary_key_indices.clone();
let query_ctx = self.state.read().unwrap().query_ctx.clone();
gen_plan_with_matching_schema(
&self.config.query,
query_ctx,
engine.clone(),
table_meta.schema.clone(),
&primary_key_indices,
merge_mode_last_non_null,
)
.await
.map(|_| ())
}
async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
self.config
.catalog_manager

View File

@@ -33,9 +33,10 @@ use datafusion_common::{
};
use datafusion_expr::logical_plan::{Aggregate, TableScan};
use datafusion_expr::{
Distinct, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, Projection, and, binary_expr,
bitwise_and, bitwise_or, bitwise_xor, is_null, or, when,
Distinct, ExprSchemable, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, Projection, and,
binary_expr, bitwise_and, bitwise_or, bitwise_xor, is_null, or, when,
};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use query::QueryEngineRef;
use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
@@ -955,7 +956,7 @@ pub(crate) async fn gen_plan_with_matching_schema(
.clone()
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan:\n {}\n", plan),
context: "Failed to rewrite plan".to_string(),
})?
.data;
Ok(plan)
@@ -1090,33 +1091,23 @@ impl ColumnMatcherRewriter {
}
/// modify the exprs in place so that it matches the schema and some auto columns are added
fn modify_project_exprs(&mut self, mut exprs: Vec<Expr>) -> DfResult<Vec<Expr>> {
fn modify_project_exprs(
&mut self,
mut exprs: Vec<Expr>,
input_schema: &DFSchema,
) -> DfResult<Vec<Expr>> {
if self.allow_partial {
return self.modify_project_exprs_with_partial(exprs);
}
let original_exprs = exprs.clone();
let all_names = self
.schema
.column_schemas()
.iter()
.map(|c| c.name.clone())
.collect::<BTreeSet<_>>();
// first match by position
for (idx, expr) in exprs.iter_mut().enumerate() {
if !all_names.contains(&expr.qualified_name().1)
&& let Some(col_name) = self
.schema
.column_schemas()
.get(idx)
.map(|c| c.name.clone())
{
// if the data type mismatched, later check_execute will error out
// hence no need to check it here, beside, optimize pass might be able to cast it
// so checking here is not necessary
*expr = expr.clone().alias(col_name);
}
}
// add columns if have different column count
let query_col_cnt = exprs.len();
let table_col_cnt = self.schema.column_schemas().len();
@@ -1140,10 +1131,9 @@ impl ColumnMatcherRewriter {
// is the update at column
exprs.push(datafusion::prelude::now().alias(&last_col_schema.name));
} else {
// helpful error message
return Err(DataFusionError::Plan(format!(
"Expect the last column in table to be timestamp column, found column {} with type {:?}",
last_col_schema.name, last_col_schema.data_type
return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch(
&original_exprs,
self.schema.as_ref(),
)));
}
} else if query_col_cnt + 2 == table_col_cnt {
@@ -1170,14 +1160,110 @@ impl ColumnMatcherRewriter {
)));
}
} else {
return Err(DataFusionError::Plan(format!(
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?}",
query_col_cnt,
exprs,
table_col_cnt,
self.schema.column_schemas()
return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch(
&original_exprs,
self.schema.as_ref(),
)));
}
self.match_extra_output_columns(exprs, input_schema, &original_exprs, &all_names)
}
/// Match flow output columns whose names are not in the sink schema by the same position only.
///
/// This keeps the legacy "omit output aliases and map by position" behavior, but only when the
/// sink column at the same index is actually missing from the flow output. If the extra output
/// would be aliased to a sink column that already exists elsewhere, report a schema mismatch
/// instead of guessing another sink column by type.
///
/// In particular, this intentionally rejects cross-position remaps like
/// `record_time_window2 -> record_time_window`: they are easy to confuse with real schema
/// mismatches and should be fixed by giving the flow output the sink column name explicitly.
fn match_extra_output_columns(
&self,
mut exprs: Vec<Expr>,
input_schema: &DFSchema,
original_exprs: &[Expr],
all_names: &BTreeSet<String>,
) -> DfResult<Vec<Expr>> {
let mut output_names = exprs
.iter()
.map(|expr| expr.qualified_name().1)
.collect::<Vec<_>>();
let output_name_set = output_names.iter().cloned().collect::<BTreeSet<_>>();
let extra_expr_indices = output_names
.iter()
.enumerate()
.filter_map(|(idx, name)| (!all_names.contains(name)).then_some(idx))
.collect::<Vec<_>>();
let missing_sink_indices = self
.schema
.column_schemas()
.iter()
.enumerate()
.filter_map(|(idx, column)| (!output_name_set.contains(&column.name)).then_some(idx))
.collect::<Vec<_>>();
if extra_expr_indices.is_empty() && missing_sink_indices.is_empty() {
return Ok(exprs);
}
if extra_expr_indices.len() != missing_sink_indices.len() {
return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch(
original_exprs,
self.schema.as_ref(),
)));
}
let mut positional_matches = Vec::new();
for expr_idx in extra_expr_indices {
if !missing_sink_indices.contains(&expr_idx) {
return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch(
original_exprs,
self.schema.as_ref(),
)));
}
let target_col_schema = &self.schema.column_schemas()[expr_idx];
let expr_type =
ConcreteDataType::from_arrow_type(&exprs[expr_idx].get_type(input_schema)?);
if is_obviously_incompatible_positional_match(&expr_type, &target_col_schema.data_type)
{
return Err(DataFusionError::Plan(format!(
"Cannot match flow output column '{}' to sink column '{}' by position: incompatible data types, flow output type is {:?}, sink column type is {:?}. {}",
output_names[expr_idx],
target_col_schema.name,
expr_type,
target_col_schema.data_type,
format_flow_sink_schema_mismatch(original_exprs, self.schema.as_ref())
)));
}
let target_name = target_col_schema.name.clone();
positional_matches.push(format!(
"{} -> {} (flow output type: {:?}, sink column type: {:?})",
output_names[expr_idx], target_name, expr_type, target_col_schema.data_type
));
exprs[expr_idx] = exprs[expr_idx].clone().alias(target_name.clone());
output_names[expr_idx] = target_name;
}
if !positional_matches.is_empty() {
debug!(
"Matched flow output columns to sink columns by position: {:?}",
positional_matches
);
}
let duplicated_output_names = duplicate_names(&output_names);
if !duplicated_output_names.is_empty() {
return Err(DataFusionError::Plan(format!(
"Flow output schema contains duplicate column(s) after schema matching {:?}. {}",
duplicated_output_names,
format_flow_sink_schema_mismatch(&exprs, self.schema.as_ref())
)));
}
Ok(exprs)
}
@@ -1186,12 +1272,9 @@ impl ColumnMatcherRewriter {
let query_col_cnt = exprs.len();
if query_col_cnt > table_col_cnt {
return Err(DataFusionError::Plan(format!(
"Expect query column count <= table column count, found {} query columns {:?}, {} table columns {:?}",
query_col_cnt,
exprs,
table_col_cnt,
self.schema.column_schemas()
return Err(DataFusionError::Plan(format_flow_sink_schema_mismatch(
&exprs,
self.schema.as_ref(),
)));
}
@@ -1209,8 +1292,9 @@ impl ColumnMatcherRewriter {
.collect();
if !missing.is_empty() {
return Err(DataFusionError::Plan(format!(
"Column(s) {:?} required by sink table are missing from flow output when merge_mode=last_non_null",
missing
"Column(s) {:?} required by sink table are missing from flow output when merge_mode=last_non_null. {}",
missing,
format_flow_sink_schema_mismatch(&exprs, self.schema.as_ref())
)));
}
@@ -1250,8 +1334,9 @@ impl ColumnMatcherRewriter {
if !remap.is_empty() {
let extra: Vec<_> = remap.keys().cloned().collect();
return Err(DataFusionError::Plan(format!(
"Flow output has extra column(s) {:?} not found in sink schema when merge_mode=last_non_null",
extra
"Flow output has extra column(s) {:?} not found in sink schema when merge_mode=last_non_null. {}",
extra,
format_flow_sink_schema_mismatch(&exprs, self.schema.as_ref())
)));
}
@@ -1281,6 +1366,80 @@ impl ColumnMatcherRewriter {
}
}
fn is_obviously_incompatible_positional_match(
expr_type: &ConcreteDataType,
sink_type: &ConcreteDataType,
) -> bool {
// This is a coarse type-family guard for legacy positional aliasing, not a strict type equality
// check. For example, numeric width/sign differences are allowed here and left to downstream
// coercion, and untyped NULL can be coerced to any target type. Clearly different families such
// as timestamp vs string are rejected early.
if expr_type.is_null() || expr_type == sink_type {
return false;
}
expr_type.is_timestamp() != sink_type.is_timestamp()
|| expr_type.is_string() != sink_type.is_string()
|| expr_type.is_boolean() != sink_type.is_boolean()
|| expr_type.is_json() != sink_type.is_json()
|| expr_type.is_vector() != sink_type.is_vector()
}
fn duplicate_names(names: &[String]) -> Vec<String> {
let mut seen = HashSet::new();
let mut duplicated = BTreeSet::new();
for name in names {
if !seen.insert(name.as_str()) {
duplicated.insert(name.as_str());
}
}
duplicated.into_iter().map(str::to_string).collect()
}
fn format_flow_sink_schema_mismatch(
query_exprs: &[Expr],
sink_schema: &datatypes::schema::Schema,
) -> String {
let flow_output_columns = query_exprs
.iter()
.map(|expr| expr.qualified_name().1)
.collect::<Vec<_>>();
let sink_table_columns = sink_schema
.column_schemas()
.iter()
.map(|col| col.name.clone())
.collect::<Vec<_>>();
let flow_output_set = flow_output_columns.iter().cloned().collect::<HashSet<_>>();
let sink_table_set = sink_table_columns.iter().cloned().collect::<HashSet<_>>();
let mut extra_flow_columns = flow_output_columns
.iter()
.filter(|name| !sink_table_set.contains(*name))
.cloned()
.collect::<Vec<_>>();
extra_flow_columns.sort();
extra_flow_columns.dedup();
let mut missing_sink_columns = sink_table_columns
.iter()
.filter(|name| !flow_output_set.contains(*name))
.cloned()
.collect::<Vec<_>>();
missing_sink_columns.sort();
missing_sink_columns.dedup();
format!(
"Flow output schema does not match sink table schema: found {} flow output columns and {} sink table columns. flow output columns: {:?}, sink table columns: {:?}, extra flow columns not in sink: {:?}, missing sink columns from flow output: {:?}",
flow_output_columns.len(),
sink_table_columns.len(),
flow_output_columns,
sink_table_columns,
extra_flow_columns,
missing_sink_columns
)
}
impl TreeNodeRewriter for ColumnMatcherRewriter {
type Node = LogicalPlan;
fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
@@ -1327,7 +1486,7 @@ impl TreeNodeRewriter for ColumnMatcherRewriter {
// if not, wrap it in a projection
if let LogicalPlan::Projection(project) = &node {
let exprs = project.expr.clone();
let exprs = self.modify_project_exprs(exprs)?;
let exprs = self.modify_project_exprs(exprs, project.input.schema())?;
self.is_rewritten = true;
let new_plan =
@@ -1341,7 +1500,7 @@ impl TreeNodeRewriter for ColumnMatcherRewriter {
field.name(),
)));
}
let exprs = self.modify_project_exprs(exprs)?;
let exprs = self.modify_project_exprs(exprs, node.schema())?;
self.is_rewritten = true;
let new_plan =
LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?);

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use catalog::RegisterTableRequest;
use common_recordbatch::RecordBatch;
use common_time::Timestamp;
use datafusion_common::tree_node::TreeNode as _;
@@ -29,7 +30,9 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::test_util::MemTable;
use super::*;
use crate::batching_mode::BatchingModeOptions;
use crate::batching_mode::state::FilterExprInfo;
use crate::batching_mode::task::{BatchingTask, TaskArgs};
use crate::test_utils::create_test_query_engine;
fn u32_table(table_name: &str, columns: Vec<&str>, rows: usize) -> TableRef {
@@ -432,9 +435,7 @@ async fn test_add_auto_column_rewriter() {
// error datatype mismatch
(
"SELECT number, ts FROM numbers_with_ts",
Err(
"Expect the last column in table to be timestamp column, found column atat with type Int8",
),
Err("missing sink columns from flow output: [\"atat\"]"),
vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -498,6 +499,383 @@ async fn test_add_auto_column_rewriter() {
}
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_reports_extra_flow_columns_before_positional_alias() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"max(numbers_with_ts.number)",
ConcreteDataType::uint32_datatype(),
true,
),
]));
let err = gen_plan_with_matching_schema(
"SELECT number, number AS extra, ts, max(number) FROM numbers_with_ts GROUP BY number, ts",
ctx,
query_engine,
sink_schema,
&[],
false,
)
.await
.unwrap_err()
.to_string();
assert!(
err.contains("Flow output schema does not match sink table schema"),
"{err}"
);
assert!(err.contains("flow output columns"), "{err}");
assert!(err.contains("sink table columns"), "{err}");
assert!(err.contains("extra flow columns not in sink"), "{err}");
assert!(err.contains("extra"), "{err}");
assert!(
!err.contains("extra AS ts"),
"schema error should not primarily expose positional alias: {err}"
);
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_rejects_positional_alias_type_mismatch() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"event_time",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"max(numbers_with_ts.number)",
ConcreteDataType::uint32_datatype(),
true,
),
]));
let err = gen_plan_with_matching_schema(
"SELECT number, number AS not_time, max(number) FROM numbers_with_ts GROUP BY number",
ctx,
query_engine,
sink_schema,
&[],
false,
)
.await
.unwrap_err()
.to_string();
assert!(
err.contains(
"Cannot match flow output column 'not_time' to sink column 'event_time' by position"
),
"{err}"
);
assert!(err.contains("incompatible data types"), "{err}");
assert!(
!err.contains("not_time AS event_time"),
"schema error should not expose an incompatible positional alias: {err}"
);
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_rejects_cross_position_extra_column_match() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"time_window",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
]));
let err = gen_plan_with_matching_schema(
"SELECT number, ts, date_bin('5 minutes', ts) AS time_window2 FROM numbers_with_ts GROUP BY number, ts, time_window2",
ctx,
query_engine,
sink_schema,
&[],
false,
)
.await
.unwrap_err()
.to_string();
assert!(
err.contains("Flow output schema does not match sink table schema"),
"{err}"
);
assert!(err.contains("time_window2"), "{err}");
assert!(err.contains("time_window"), "{err}");
assert!(!err.contains("DuplicateUnqualifiedField"), "{err}");
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_accepts_out_of_order_matching_names() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"time_window",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
]));
let plan = gen_plan_with_matching_schema(
"SELECT number, ts, date_bin('5 minutes', ts) AS time_window FROM numbers_with_ts GROUP BY number, ts, time_window",
ctx,
query_engine,
sink_schema,
&[],
false,
)
.await
.unwrap();
let output_names = plan
.schema()
.fields()
.iter()
.map(|field| field.name().clone())
.collect::<Vec<_>>();
assert_eq!(
output_names,
vec![
"number".to_string(),
"ts".to_string(),
"time_window".to_string()
]
);
assert!(duplicate_names(&output_names).is_empty());
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_allows_numeric_positional_alias() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("renamed_number", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
]));
let plan = gen_plan_with_matching_schema(
"SELECT number, ts FROM numbers_with_ts",
ctx,
query_engine,
sink_schema,
&[],
false,
)
.await
.unwrap();
let sql = df_plan_to_sql(&plan).unwrap();
assert_eq!(
"SELECT numbers_with_ts.number AS renamed_number, numbers_with_ts.ts FROM numbers_with_ts",
sql
);
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_allows_null_positional_alias() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new("label", ConcreteDataType::string_datatype(), true),
]));
let plan = gen_plan_with_matching_schema(
"SELECT number, NULL AS label_placeholder FROM numbers_with_ts",
ctx,
query_engine,
sink_schema,
&[],
false,
)
.await
.unwrap();
let output_names = plan
.schema()
.fields()
.iter()
.map(|field| field.name().clone())
.collect::<Vec<_>>();
let sql = df_plan_to_sql(&plan).unwrap();
assert_eq!(
output_names,
vec!["number".to_string(), "label".to_string()]
);
assert!(sql.contains("NULL AS label"), "{sql}");
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_accepts_matching_flow_schema() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new("extra", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"max(numbers_with_ts.number)",
ConcreteDataType::uint32_datatype(),
true,
),
]));
let plan = gen_plan_with_matching_schema(
"SELECT number, number AS extra, ts, max(number) FROM numbers_with_ts GROUP BY number, ts",
ctx,
query_engine,
sink_schema,
&[],
false,
)
.await
.unwrap();
let sql = df_plan_to_sql(&plan).unwrap();
assert_eq!(
"SELECT numbers_with_ts.number, numbers_with_ts.number AS extra, numbers_with_ts.ts, max(numbers_with_ts.number) FROM numbers_with_ts GROUP BY numbers_with_ts.number, numbers_with_ts.ts",
sql
);
}
#[tokio::test]
async fn test_validate_sink_table_schema_rejects_existing_sink_missing_flow_column() {
let query_engine = create_test_query_engine();
let query_ctx = QueryContext::arc();
let sql = "SELECT number, number AS extra, max(number) FROM numbers_with_ts GROUP BY number";
let plan = sql_to_df_plan(query_ctx.clone(), query_engine.clone(), sql, true)
.await
.unwrap();
let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap();
let sink_table_name = [
"greptime".to_string(),
"public".to_string(),
"existing_sink".to_string(),
];
let sink_table = u32_table(
"existing_sink",
vec!["number", "max(numbers_with_ts.number)"],
0,
);
catalog_manager
.register_table_sync(RegisterTableRequest {
catalog: sink_table_name[0].clone(),
schema: sink_table_name[1].clone(),
table_name: sink_table_name[2].clone(),
table_id: 4096,
table: sink_table,
})
.unwrap();
let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let task = BatchingTask::try_new(TaskArgs {
flow_id: 1,
query: sql,
plan,
time_window_expr: None,
expire_after: None,
sink_table_name,
source_table_names: vec![[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
]],
query_ctx,
catalog_manager,
shutdown_rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
flow_eval_interval: None,
})
.unwrap();
let err = task
.validate_sink_table_schema(&query_engine)
.await
.unwrap_err()
.to_string();
assert!(
err.contains("Flow output schema does not match sink table schema"),
"{err}"
);
assert!(err.contains("extra"), "{err}");
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_allow_partial_fills_nullable_columns() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("optional_value", ConcreteDataType::uint32_datatype(), true),
]));
let plan = gen_plan_with_matching_schema(
"SELECT number, ts FROM numbers_with_ts",
ctx,
query_engine,
sink_schema,
&[0],
true,
)
.await
.unwrap();
let sql = df_plan_to_sql(&plan).unwrap();
assert_eq!(
"SELECT numbers_with_ts.number, numbers_with_ts.ts, NULL AS optional_value FROM numbers_with_ts",
sql
);
}
#[tokio::test]
async fn test_find_group_by_exprs() {
let testcases = vec![
@@ -1491,3 +1869,118 @@ async fn test_analyze_incremental_aggregate_plan_rejects_cast_wrapped_alias() {
);
}
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_last_non_null_rejects_missing_primary_key_column() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
// Sink table with primary_key_indices=[0] ("number"), time_index="ts", and merge_mode=last_non_null.
// The flow query omits "number", which is a required primary-key column.
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("optional_value", ConcreteDataType::uint32_datatype(), true),
]));
let err = gen_plan_with_matching_schema(
"SELECT ts FROM numbers_with_ts",
ctx,
query_engine,
sink_schema,
&[0],
true,
)
.await
.unwrap_err()
.to_string();
assert!(
err.contains(
"required by sink table are missing from flow output when merge_mode=last_non_null"
),
"{err}"
);
assert!(err.contains("number"), "{err}");
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_last_non_null_rejects_missing_time_index_column() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
// Sink table with primary_key_indices=[0] ("number"), time_index="ts", and merge_mode=last_non_null.
// The flow query omits "ts", which is a required time-index column.
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("optional_value", ConcreteDataType::uint32_datatype(), true),
]));
let err = gen_plan_with_matching_schema(
"SELECT number FROM numbers_with_ts",
ctx,
query_engine,
sink_schema,
&[0],
true,
)
.await
.unwrap_err()
.to_string();
assert!(
err.contains(
"required by sink table are missing from flow output when merge_mode=last_non_null"
),
"{err}"
);
assert!(err.contains("ts"), "{err}");
}
#[tokio::test]
async fn test_gen_plan_with_matching_schema_last_non_null_rejects_extra_flow_column() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
// Sink table with merge_mode=last_non_null.
// Sink has 3 columns: number (pk), ts (time_index), optional_value (nullable).
// Flow outputs: number, number AS extra, ts → "extra" is not in sink schema.
// query_col_cnt(3) <= table_col_cnt(3), so the extra branch is reached.
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("optional_value", ConcreteDataType::uint32_datatype(), true),
]));
let err = gen_plan_with_matching_schema(
"SELECT number, number AS extra, ts FROM numbers_with_ts",
ctx,
query_engine,
sink_schema,
&[0],
true,
)
.await
.unwrap_err()
.to_string();
assert!(err.contains("extra column(s)"), "{err}");
assert!(err.contains("extra"), "{err}");
assert!(
err.contains("Flow output schema does not match sink table schema"),
"{err}"
);
}

View File

@@ -162,6 +162,8 @@ CREATE TABLE approx_rate (
Affected Rows: 0
-- Without merge_mode=last_non_null, this partial output is rejected at CREATE FLOW time.
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
@@ -172,24 +174,7 @@ from
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
bytes_log
VALUES
(NULL, '2023-01-01 00:00:01'),
(300, '2023-01-01 00:00:31');
Affected Rows: 2
-- should return error
ADMIN FLUSH_FLOW('find_approx_rate');
Error: 1002(Unexpected), Failed to execute admin function flush_flow: Execution error: Internal error: 1003
DROP FLOW find_approx_rate;
Affected Rows: 0
Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Flow output schema does not match sink table schema: found 3 flow output columns and 4 sink table columns. flow output columns: [\"rate\", \"time_window\", \"update_at\"], sink table columns: [\"rate\", \"time_window\", \"update_at\", \"bb\"], extra flow columns not in sink: [], missing sink columns from flow output: [\"bb\"]") in context: Failed to rewrite plan
DROP TABLE bytes_log;

View File

@@ -84,6 +84,8 @@ CREATE TABLE approx_rate (
TIME INDEX(time_window)
);
-- Without merge_mode=last_non_null, this partial output is rejected at CREATE FLOW time.
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
@@ -93,16 +95,5 @@ from
bytes_log
GROUP BY
time_window;
INSERT INTO
bytes_log
VALUES
(NULL, '2023-01-01 00:00:01'),
(300, '2023-01-01 00:00:31');
-- should return error
ADMIN FLUSH_FLOW('find_approx_rate');
DROP FLOW find_approx_rate;
DROP TABLE bytes_log;
DROP TABLE approx_rate;

View File

@@ -0,0 +1,123 @@
-- Verify that batching flow rejects CREATE FLOW when the pre-existing sink
-- table schema does not match the flow output (create-time validation, not runtime).
CREATE TABLE source_mm (
"number" INT,
extra STRING,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
-- Pre-create a sink table that is intentionally missing the "extra" column.
-- This case validates batching mode at CREATE FLOW time, before any INSERT/FLUSH.
CREATE TABLE sink_mm (
"number" INT,
time_window TIMESTAMP TIME INDEX,
cnt BIGINT
);
Affected Rows: 0
-- This CREATE FLOW should fail immediately: the flow outputs (number, extra, time_window, cnt)
-- but sink_mm has only (number, time_window, cnt).
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW mismatch_flow SINK TO sink_mm AS
SELECT
"number",
extra,
date_bin(INTERVAL '1 second', ts) as time_window,
count(*) as cnt
FROM
source_mm
GROUP BY
"number", extra, time_window;
Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Flow output schema does not match sink table schema: found 4 flow output columns and 3 sink table columns. flow output columns: [\"number\", \"extra\", \"time_window\", \"cnt\"], sink table columns: [\"number\", \"time_window\", \"cnt\"], extra flow columns not in sink: [\"extra\"], missing sink columns from flow output: []") in context: Failed to rewrite plan
DROP TABLE source_mm;
Affected Rows: 0
DROP TABLE sink_mm;
Affected Rows: 0
-- TQL/PromQL flows use the same create-time sink schema validation path.
CREATE TABLE tql_source_mm (
`value` DOUBLE,
ts TIMESTAMP TIME INDEX,
sensor STRING,
loc STRING,
PRIMARY KEY (sensor, loc)
);
Affected Rows: 0
-- Pre-create a TQL sink table that is intentionally missing the "sensor" tag column.
CREATE TABLE tql_sink_mm (
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
-- This CREATE FLOW should fail immediately: the TQL output has (value, sensor, ts),
-- but tql_sink_mm has only (value, ts).
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW tql_mismatch_flow
SINK TO tql_sink_mm
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '1m')
avg by(sensor) (tql_source_mm) AS value;
Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Flow output schema does not match sink table schema: found 3 flow output columns and 2 sink table columns. flow output columns: [\"value\", \"sensor\", \"ts\"], sink table columns: [\"value\", \"ts\"], extra flow columns not in sink: [\"sensor\"], missing sink columns from flow output: []") in context: Failed to rewrite plan
DROP TABLE tql_source_mm;
Affected Rows: 0
DROP TABLE tql_sink_mm;
Affected Rows: 0
-- Real merge_mode=last_non_null sink options should enable partial schema validation.
CREATE TABLE lnn_source_mm (
device STRING,
val DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE TABLE lnn_sink_mm (
device STRING,
time_window TIMESTAMP TIME INDEX,
cnt BIGINT,
PRIMARY KEY (device)
) WITH('merge_mode'='last_non_null');
Affected Rows: 0
-- This CREATE FLOW should fail through the last_non_null partial validator: the
-- sink primary key "device" is required but absent from the flow output.
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW lnn_missing_pk_flow
SINK TO lnn_sink_mm AS
SELECT
date_bin(INTERVAL '1 second', ts) as time_window,
count(*) as cnt
FROM
lnn_source_mm
GROUP BY
time_window;
Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Column(s) [\"device\"] required by sink table are missing from flow output when merge_mode=last_non_null. Flow output schema does not match sink table schema: found 2 flow output columns and 3 sink table columns. flow output columns: [\"time_window\", \"cnt\"], sink table columns: [\"device\", \"time_window\", \"cnt\"], extra flow columns not in sink: [], missing sink columns from flow output: [\"device\"]") in context: Failed to rewrite plan
DROP TABLE lnn_source_mm;
Affected Rows: 0
DROP TABLE lnn_sink_mm;
Affected Rows: 0

View File

@@ -0,0 +1,89 @@
-- Verify that batching flow rejects CREATE FLOW when the pre-existing sink
-- table schema does not match the flow output (create-time validation, not runtime).
CREATE TABLE source_mm (
"number" INT,
extra STRING,
ts TIMESTAMP TIME INDEX
);
-- Pre-create a sink table that is intentionally missing the "extra" column.
-- This case validates batching mode at CREATE FLOW time, before any INSERT/FLUSH.
CREATE TABLE sink_mm (
"number" INT,
time_window TIMESTAMP TIME INDEX,
cnt BIGINT
);
-- This CREATE FLOW should fail immediately: the flow outputs (number, extra, time_window, cnt)
-- but sink_mm has only (number, time_window, cnt).
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW mismatch_flow SINK TO sink_mm AS
SELECT
"number",
extra,
date_bin(INTERVAL '1 second', ts) as time_window,
count(*) as cnt
FROM
source_mm
GROUP BY
"number", extra, time_window;
DROP TABLE source_mm;
DROP TABLE sink_mm;
-- TQL/PromQL flows use the same create-time sink schema validation path.
CREATE TABLE tql_source_mm (
`value` DOUBLE,
ts TIMESTAMP TIME INDEX,
sensor STRING,
loc STRING,
PRIMARY KEY (sensor, loc)
);
-- Pre-create a TQL sink table that is intentionally missing the "sensor" tag column.
CREATE TABLE tql_sink_mm (
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
-- This CREATE FLOW should fail immediately: the TQL output has (value, sensor, ts),
-- but tql_sink_mm has only (value, ts).
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW tql_mismatch_flow
SINK TO tql_sink_mm
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '1m')
avg by(sensor) (tql_source_mm) AS value;
DROP TABLE tql_source_mm;
DROP TABLE tql_sink_mm;
-- Real merge_mode=last_non_null sink options should enable partial schema validation.
CREATE TABLE lnn_source_mm (
device STRING,
val DOUBLE,
ts TIMESTAMP TIME INDEX
);
CREATE TABLE lnn_sink_mm (
device STRING,
time_window TIMESTAMP TIME INDEX,
cnt BIGINT,
PRIMARY KEY (device)
) WITH('merge_mode'='last_non_null');
-- This CREATE FLOW should fail through the last_non_null partial validator: the
-- sink primary key "device" is required but absent from the flow output.
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW lnn_missing_pk_flow
SINK TO lnn_sink_mm AS
SELECT
date_bin(INTERVAL '1 second', ts) as time_window,
count(*) as cnt
FROM
lnn_source_mm
GROUP BY
time_window;
DROP TABLE lnn_source_mm;
DROP TABLE lnn_sink_mm;

View File

@@ -0,0 +1,90 @@
-- Regression for a TQL flow whose pre-created sink table is missing the value
-- output column. The labels are intentionally minimal and anonymous.
CREATE DATABASE source_schema;
Affected Rows: 1
CREATE DATABASE sink_schema;
Affected Rows: 1
USE source_schema;
Affected Rows: 0
CREATE TABLE metric_input (
namespace STRING NULL,
app STRING NULL,
greptime_timestamp TIMESTAMP(3) NOT NULL,
greptime_value DOUBLE NULL,
TIME INDEX (greptime_timestamp),
PRIMARY KEY (namespace, app)
);
Affected Rows: 0
INSERT INTO metric_input VALUES
('ns', 'app-a', '2026-01-23T03:40:00Z', 10.0),
('ns', 'app-a', '2026-01-23T03:50:00Z', 20.0);
Affected Rows: 2
USE sink_schema;
Affected Rows: 0
-- Intentionally omit greptime_value DOUBLE from the pre-created sink table.
CREATE TABLE missing_value_sink (
namespace STRING NULL,
app STRING NULL,
greptime_timestamp TIMESTAMP(3) NOT NULL,
TIME INDEX (greptime_timestamp),
PRIMARY KEY (namespace, app)
)
ENGINE=mito;
Affected Rows: 0
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW missing_value_flow
SINK TO sink_schema.missing_value_sink
EVAL INTERVAL '3600 s'
AS TQL EVAL (
date_bin('2m'::interval, now() - '2m'::interval),
date_bin('2m'::interval, now() - '2m'::interval),
'1h'
)
avg by (namespace, app) (
avg_over_time(metric_input{__schema__="source_schema"}[1h])
);
Error: 3001(EngineExecuteQuery), Datafusion error: Plan("Flow output schema does not match sink table schema: found 4 flow output columns and 3 sink table columns. flow output columns: [\"namespace\", \"app\", \"greptime_timestamp\", \"avg(prom_avg_over_time(greptime_timestamp_range,greptime_value))\"], sink table columns: [\"namespace\", \"app\", \"greptime_timestamp\"], extra flow columns not in sink: [\"avg(prom_avg_over_time(greptime_timestamp_range,greptime_value))\"], missing sink columns from flow output: []") in context: Failed to rewrite plan
DROP FLOW IF EXISTS missing_value_flow;
Affected Rows: 0
DROP TABLE missing_value_sink;
Affected Rows: 0
USE source_schema;
Affected Rows: 0
DROP TABLE metric_input;
Affected Rows: 0
USE public;
Affected Rows: 0
DROP DATABASE sink_schema;
Affected Rows: 0
DROP DATABASE source_schema;
Affected Rows: 0

View File

@@ -0,0 +1,55 @@
-- Regression for a TQL flow whose pre-created sink table is missing the value
-- output column. The labels are intentionally minimal and anonymous.
CREATE DATABASE source_schema;
CREATE DATABASE sink_schema;
USE source_schema;
CREATE TABLE metric_input (
namespace STRING NULL,
app STRING NULL,
greptime_timestamp TIMESTAMP(3) NOT NULL,
greptime_value DOUBLE NULL,
TIME INDEX (greptime_timestamp),
PRIMARY KEY (namespace, app)
);
INSERT INTO metric_input VALUES
('ns', 'app-a', '2026-01-23T03:40:00Z', 10.0),
('ns', 'app-a', '2026-01-23T03:50:00Z', 20.0);
USE sink_schema;
-- Intentionally omit greptime_value DOUBLE from the pre-created sink table.
CREATE TABLE missing_value_sink (
namespace STRING NULL,
app STRING NULL,
greptime_timestamp TIMESTAMP(3) NOT NULL,
TIME INDEX (greptime_timestamp),
PRIMARY KEY (namespace, app)
)
ENGINE=mito;
-- SQLNESS REPLACE (in\scontext:\sFailed\sto\srewrite\splan:\sError\sduring\splanning:.*) in context: Failed to rewrite plan
CREATE FLOW missing_value_flow
SINK TO sink_schema.missing_value_sink
EVAL INTERVAL '3600 s'
AS TQL EVAL (
date_bin('2m'::interval, now() - '2m'::interval),
date_bin('2m'::interval, now() - '2m'::interval),
'1h'
)
avg by (namespace, app) (
avg_over_time(metric_input{__schema__="source_schema"}[1h])
);
DROP FLOW IF EXISTS missing_value_flow;
DROP TABLE missing_value_sink;
USE source_schema;
DROP TABLE metric_input;
USE public;
DROP DATABASE sink_schema;
DROP DATABASE source_schema;