From acee8a8ec55c8971e0ca9ea387e2e2760862ed1a Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 14 Apr 2026 17:24:53 +0800 Subject: [PATCH] feat: validate after parse Signed-off-by: discord9 --- src/operator/src/expr_helper.rs | 50 +++++++++++- src/operator/src/statement/ddl.rs | 125 ++++++++++++++++++++++++++++-- 2 files changed, 168 insertions(+), 7 deletions(-) diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index b2ec96a76b..a409063501 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -1047,7 +1047,7 @@ pub fn to_create_flow_task_expr( eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }), comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), - flow_options: Default::default(), + flow_options: create_flow.flow_options.into_map(), }) } @@ -1065,6 +1065,8 @@ fn sanitize_flow_name(mut flow_name: ObjectName) -> Result { #[cfg(test)] mod tests { + use std::collections::HashMap; + use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions}; use datatypes::value::Value; use session::context::{QueryContext, QueryContextBuilder}; @@ -1327,6 +1329,52 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; to_dot_sep(expr.source_table_names[0].clone()) ); assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql); + assert!(expr.flow_options.is_empty()); + + let sql = r" +CREATE FLOW task_3 +SINK TO schema_1.table_1 +WITH (defer_on_missing_source = 'true', foo = 'bar') +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + assert_eq!( + expr.flow_options, + HashMap::from([ + ("defer_on_missing_source".to_string(), "true".to_string()), + ("foo".to_string(), "bar".to_string()), + ]) + ); + + let sql = r" +CREATE FLOW task_4 +SINK TO schema_1.table_1 +WITH (defer_on_missing_source = true) +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + assert_eq!( + expr.flow_options, + HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)]) + ); let sql = r" CREATE FLOW abc.`task_2` diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 0a24eb3713..91d4896004 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -113,6 +113,9 @@ struct DdlSubmitOptions { timeout: Duration, } +const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source"; +const ALLOWED_FLOW_OPTIONS: [&str; 1] = [DEFER_ON_MISSING_SOURCE_KEY]; + fn build_procedure_id_output(procedure_id: Vec) -> Result { let procedure_id = String::from_utf8_lossy(&procedure_id).to_string(); let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id])); @@ -152,6 +155,55 @@ fn parse_ddl_options(options: &OptionMap) -> Result { Ok(DdlSubmitOptions { wait, timeout }) } +fn supported_flow_options() -> String { + ALLOWED_FLOW_OPTIONS.join(", ") +} + +fn normalize_flow_bool_option(key: &str, value: &str) -> Result { + value + .trim() + .to_ascii_lowercase() + .parse::() + .map(|value| value.to_string()) + .map_err(|_| { + InvalidSqlSnafu { + err_msg: format!("invalid flow option '{key}': '{value}'"), + } + .build() + }) +} + +fn validate_and_normalize_flow_options( + options: HashMap, +) -> Result> { + options + .into_iter() + .map(|(key, value)| { + if key == FlowType::FLOW_TYPE_KEY { + return InvalidSqlSnafu { + err_msg: format!("flow option '{key}' is reserved for internal use"), + } + .fail(); + } + + let normalized_value = match key.as_str() { + DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?, + _ => { + return InvalidSqlSnafu { + err_msg: format!( + "unknown flow option '{key}', supported options: {}", + supported_flow_options() + ), + } + .fail(); + } + }; + + Ok((key, normalized_value)) + }) + .collect() +} + impl StatementExecutor { pub fn catalog_manager(&self) -> CatalogManagerRef { self.catalog_manager.clone() @@ -629,17 +681,16 @@ impl StatementExecutor { expr: CreateFlowExpr, query_context: QueryContextRef, ) -> Result { + let mut expr = expr; + expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?; + let flow_type = self .determine_flow_type(&expr, query_context.clone()) .await?; info!("determined flow={} type: {:#?}", expr.flow_name, flow_type); - let expr = { - let mut expr = expr; - expr.flow_options - .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string()); - expr - }; + expr.flow_options + .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string()); let task = CreateFlowTask::try_from(PbCreateFlowTask { create_flow: Some(expr), @@ -2334,6 +2385,68 @@ mod test { assert_eq!(Duration::from_secs(300), ddl_options.timeout); } + #[test] + fn test_validate_and_normalize_flow_options_empty() { + assert!( + validate_and_normalize_flow_options(HashMap::new()) + .unwrap() + .is_empty() + ); + } + + #[test] + fn test_validate_and_normalize_flow_options_valid() { + let options = + HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string())]); + + assert_eq!( + validate_and_normalize_flow_options(options).unwrap(), + HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),)]) + ); + } + + #[test] + fn test_validate_and_normalize_flow_options_unknown_option() { + let err = validate_and_normalize_flow_options(HashMap::from([( + "foo".to_string(), + "bar".to_string(), + )])) + .unwrap_err(); + + assert!( + err.to_string() + .contains("unknown flow option 'foo', supported options: defer_on_missing_source") + ); + } + + #[test] + fn test_validate_and_normalize_flow_options_reserved_option() { + let err = validate_and_normalize_flow_options(HashMap::from([( + FlowType::FLOW_TYPE_KEY.to_string(), + FlowType::BATCHING.to_string(), + )])) + .unwrap_err(); + + assert!( + err.to_string() + .contains("flow option 'flow_type' is reserved for internal use") + ); + } + + #[test] + fn test_validate_and_normalize_flow_options_invalid_bool() { + let err = validate_and_normalize_flow_options(HashMap::from([( + DEFER_ON_MISSING_SOURCE_KEY.to_string(), + "not-a-bool".to_string(), + )])) + .unwrap_err(); + + assert!( + err.to_string() + .contains("invalid flow option 'defer_on_missing_source': 'not-a-bool'") + ); + } + #[test] fn test_name_is_match() { assert!(!NAME_PATTERN_REG.is_match("/adaf"));