From 7ecf3eb35e4a4ee496a18d13ed449f0e61b60e73 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 23 Apr 2026 13:03:25 +0800 Subject: [PATCH] pcr Signed-off-by: discord9 --- .../system_schema/information_schema/flows.rs | 16 ++------ src/operator/src/expr_helper.rs | 37 ++++++++++++++++++- src/query/src/sql.rs | 13 ++----- src/sql/src/statements/option_map.rs | 28 ++++++++++++++ 4 files changed, 72 insertions(+), 22 deletions(-) diff --git a/src/catalog/src/system_schema/information_schema/flows.rs b/src/catalog/src/system_schema/information_schema/flows.rs index 8d153a1756..7e29c7b1f5 100644 --- a/src/catalog/src/system_schema/information_schema/flows.rs +++ b/src/catalog/src/system_schema/information_schema/flows.rs @@ -55,17 +55,6 @@ use crate::system_schema::utils; const INIT_CAPACITY: usize = 42; -fn user_visible_flow_options( - options: &std::collections::HashMap, -) -> sql::statements::OptionMap { - sql::statements::OptionMap::from( - options - .iter() - .filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY) - .map(|(key, value)| (key.clone(), value.clone())), - ) -} - // rows of information_schema.flows // pk is (flow_name, flow_id, table_catalog) pub const FLOW_NAME: &str = "flow_name"; @@ -177,7 +166,10 @@ impl InformationSchemaFlows { expire_after: flow_info.expire_after(), eval_interval: flow_info.eval_interval(), comment, - flow_options: user_visible_flow_options(flow_info.options()), + flow_options: sql::statements::OptionMap::from_filtered_string_map( + flow_info.options(), + &[FlowType::FLOW_TYPE_KEY], + ), query, }; diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index a409063501..378122030c 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -1047,10 +1047,22 @@ pub fn to_create_flow_task_expr( eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }), comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), - flow_options: create_flow.flow_options.into_map(), + flow_options: stringify_flow_options(create_flow.flow_options)?, }) } +fn stringify_flow_options(flow_options: OptionMap) -> Result> { + let options_len = flow_options.len(); + let flow_options = flow_options.into_map(); + ensure!( + flow_options.len() == options_len, + InvalidSqlSnafu { + err_msg: "flow options only support scalar string-compatible values".to_string(), + } + ); + Ok(flow_options) +} + /// sanitize the flow name, remove possible quotes fn sanitize_flow_name(mut flow_name: ObjectName) -> Result { ensure!( @@ -1376,6 +1388,29 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)]) ); + let sql = r" +CREATE FLOW task_5 +SINK TO schema_1.table_1 +WITH (defer_on_missing_source = [true]) +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let res = to_create_flow_task_expr(create_flow, &QueryContext::arc()); + assert!(res.is_err()); + assert!( + res.unwrap_err() + .to_string() + .contains("flow options only support scalar string-compatible values") + ); + let sql = r" CREATE FLOW abc.`task_2` SINK TO schema_1.table_1 diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 5e906b44fd..d7d7399a5a 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -80,14 +80,6 @@ const FLOWS_COLUMN: &str = "Flows"; const FIELD_COLUMN: &str = "Field"; const TABLE_TYPE_COLUMN: &str = "Table_type"; -fn user_visible_flow_options(options: &HashMap) -> OptionMap { - OptionMap::from( - options - .iter() - .filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY) - .map(|(key, value)| (key.clone(), value.clone())), - ) -} const COLUMN_NAME_COLUMN: &str = "Column"; const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type"; const COLUMN_TYPE_COLUMN: &str = "Type"; @@ -1066,7 +1058,10 @@ pub fn show_create_flow( expire_after: flow_val.expire_after(), eval_interval: flow_val.eval_interval(), comment, - flow_options: user_visible_flow_options(flow_val.options()), + flow_options: OptionMap::from_filtered_string_map( + flow_val.options(), + &[FlowType::FLOW_TYPE_KEY], + ), query, }; diff --git a/src/sql/src/statements/option_map.rs b/src/sql/src/statements/option_map.rs index 61f8badd0a..d562a61ce0 100644 --- a/src/sql/src/statements/option_map.rs +++ b/src/sql/src/statements/option_map.rs @@ -49,6 +49,18 @@ impl OptionMap { } } + pub fn from_filtered_string_map( + options: &HashMap, + hidden_keys: &[&str], + ) -> Self { + Self::from( + options + .iter() + .filter(|(key, _)| !hidden_keys.contains(&key.as_str())) + .map(|(key, value)| (key.clone(), value.clone())), + ) + } + pub fn insert(&mut self, k: String, v: String) { if REDACTED_OPTIONS.contains(&k.as_str()) { self.secrets.insert(k, SecretString::new(Box::new(v))); @@ -221,6 +233,8 @@ impl VisitMut for OptionMap { #[cfg(test)] mod tests { + use std::collections::HashMap; + use crate::statements::OptionMap; #[test] @@ -237,4 +251,18 @@ mod tests { map.insert("a.b".to_string(), "中文comment\n".to_string()); assert_eq!("'a.b' = '中文comment\\n'", map.kv_pairs()[0]); } + + #[test] + fn test_from_filtered_string_map() { + let map = OptionMap::from_filtered_string_map( + &HashMap::from([ + ("visible".to_string(), "1".to_string()), + ("hidden".to_string(), "2".to_string()), + ]), + &["hidden"], + ); + + assert_eq!(map.get("visible"), Some("1")); + assert_eq!(map.get("hidden"), None); + } }