From d290bc58adac9273af90a736c7ffdbcfe7b670f0 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 14 Apr 2026 15:21:19 +0800 Subject: [PATCH] feat: parse create flow with Signed-off-by: discord9 --- .../system_schema/information_schema/flows.rs | 13 +++ src/common/meta/src/ddl/activate_flow.rs | 27 +++--- src/query/src/sql.rs | 11 +++ src/sql/src/parsers/create_parser.rs | 86 +++++++++++++++++++ src/sql/src/statements/create.rs | 6 ++ 5 files changed, 129 insertions(+), 14 deletions(-) diff --git a/src/catalog/src/system_schema/information_schema/flows.rs b/src/catalog/src/system_schema/information_schema/flows.rs index a751716620..8d153a1756 100644 --- a/src/catalog/src/system_schema/information_schema/flows.rs +++ b/src/catalog/src/system_schema/information_schema/flows.rs @@ -16,6 +16,7 @@ use std::sync::{Arc, Weak}; use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID; use common_error::ext::BoxedError; +use common_meta::ddl::create_flow::FlowType; use common_meta::key::FlowId; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::flow::flow_info::FlowInfoValue; @@ -54,6 +55,17 @@ use crate::system_schema::utils; const INIT_CAPACITY: usize = 42; +fn user_visible_flow_options( + options: &std::collections::HashMap, +) -> sql::statements::OptionMap { + sql::statements::OptionMap::from( + options + .iter() + .filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY) + .map(|(key, value)| (key.clone(), value.clone())), + ) +} + // rows of information_schema.flows // pk is (flow_name, flow_id, table_catalog) pub const FLOW_NAME: &str = "flow_name"; @@ -165,6 +177,7 @@ impl InformationSchemaFlows { expire_after: flow_info.expire_after(), eval_interval: flow_info.eval_interval(), comment, + flow_options: user_visible_flow_options(flow_info.options()), query, }; diff --git a/src/common/meta/src/ddl/activate_flow.rs b/src/common/meta/src/ddl/activate_flow.rs index e39bc72418..551e61750d 100644 --- a/src/common/meta/src/ddl/activate_flow.rs +++ b/src/common/meta/src/ddl/activate_flow.rs @@ -151,21 +151,20 @@ impl ActivatePendingFlowProcedure { return Ok(Status::done()); } - if get_flow_type(current_flow_info.get_inner_ref()) == FlowType::Batching { - if let Some(reason) = + if get_flow_type(current_flow_info.get_inner_ref()) == FlowType::Batching + && let Some(reason) = validate_batching_activation(&self.context, &resolution.resolved_table_ids).await? - { - update_pending_flow_metadata( - &self.context, - self.data.flow_id, - ¤t_flow_info, - resolution.resolved_table_ids, - vec![], - Some(reason), - ) - .await?; - return Ok(Status::done()); - } + { + update_pending_flow_metadata( + &self.context, + self.data.flow_id, + ¤t_flow_info, + resolution.resolved_table_ids, + vec![], + Some(reason), + ) + .await?; + return Ok(Status::done()); } self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?; diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 49e26c92ca..c46e9bdf8a 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -33,6 +33,7 @@ use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::build_backend; use common_datasource::util::find_dir_and_filename; use common_meta::SchemaOptions; +use common_meta::ddl::create_flow::FlowType; use common_meta::key::flow::flow_info::FlowInfoValue; use common_query::Output; use common_query::prelude::greptime_timestamp; @@ -78,6 +79,15 @@ const VIEWS_COLUMN: &str = "Views"; const FLOWS_COLUMN: &str = "Flows"; const FIELD_COLUMN: &str = "Field"; const TABLE_TYPE_COLUMN: &str = "Table_type"; + +fn user_visible_flow_options(options: &HashMap) -> OptionMap { + OptionMap::from( + options + .iter() + .filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY) + .map(|(key, value)| (key.clone(), value.clone())), + ) +} const COLUMN_NAME_COLUMN: &str = "Column"; const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type"; const COLUMN_TYPE_COLUMN: &str = "Type"; @@ -1062,6 +1072,7 @@ pub fn show_create_flow( expire_after: flow_val.expire_after(), eval_interval: flow_val.eval_interval(), comment, + flow_options: user_visible_flow_options(flow_val.options()), query, }; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index d69a1af61d..99b4625a50 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -339,6 +339,14 @@ impl<'a> ParserContext<'a> { None }; + let flow_options = self + .parser + .parse_options(Keyword::WITH) + .context(SyntaxSnafu)? + .into_iter() + .map(parse_option_string) + .collect::>>()?; + self.parser .expect_keyword(Keyword::AS) .context(SyntaxSnafu)?; @@ -353,6 +361,7 @@ impl<'a> ParserContext<'a> { expire_after, eval_interval, comment, + flow_options: OptionMap::new(flow_options), query, })) } @@ -1256,6 +1265,20 @@ mod tests { use crate::dialect::GreptimeDbDialect; use crate::parser::ParseOptions; + fn string_option_map( + entries: impl IntoIterator, + ) -> OptionMap { + OptionMap::new(entries.into_iter().map(|(key, value)| { + ( + key.to_string(), + OptionValue::try_new(Expr::Value( + Value::SingleQuotedString(value.to_string()).into(), + )) + .unwrap(), + ) + })) + } + #[test] fn test_parse_create_table_like() { let sql = "CREATE TABLE t1 LIKE t2"; @@ -1498,6 +1521,8 @@ mod tests { pub expire_after: Option, /// Comment string pub comment: Option, + /// Flow creation options + pub flow_options: OptionMap, } let testcases = vec![ ( @@ -1518,6 +1543,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: true, expire_after: Some(300), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1538,6 +1564,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: true, expire_after: Some(300), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1558,6 +1585,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: true, expire_after: Some(300), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1578,6 +1606,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: true, expire_after: Some(300), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1597,6 +1626,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: false, expire_after: Some(2 * 86400 + 3600 + 2 * 60), comment: None, + flow_options: OptionMap::default(), }, ), ( @@ -1616,6 +1646,7 @@ select max(c1), min(c2) from schema_2.table_2;", if_not_exists: false, expire_after: Some(600), // 10 minutes in seconds comment: None, + flow_options: OptionMap::default(), }, ), ( @@ -1636,6 +1667,27 @@ select max(c1), min(c2) from schema_2.table_2;", if_not_exists: true, expire_after: Some(7200), // 2 hours in seconds comment: Some("lowercase test".to_string()), + flow_options: OptionMap::default(), + }, + ), + ( + r" +CREATE FLOW task_5 +SINK TO schema_1.table_1 +WITH (defer_on_missing_source = 'true') +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName::from(vec![Ident::new("task_5")]), + sink_table_name: ObjectName::from(vec![ + Ident::new("schema_1"), + Ident::new("table_1"), + ]), + or_replace: false, + if_not_exists: false, + expire_after: None, + comment: None, + flow_options: string_option_map([("defer_on_missing_source", "true")]), }, ), ]; @@ -1651,6 +1703,7 @@ select max(c1), min(c2) from schema_2.table_2;", expire_after: expected.expire_after, eval_interval: None, comment: expected.comment, + flow_options: expected.flow_options, // ignore query parse result query: create_task.query.clone(), }; @@ -1696,6 +1749,8 @@ select max(c1), min(c2) from schema_2.table_2;", pub eval_interval: Option, /// Comment string pub comment: Option, + /// Flow creation options + pub flow_options: OptionMap, } // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT` @@ -1719,6 +1774,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(300), eval_interval: None, comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1740,6 +1796,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(300), eval_interval: None, comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1762,6 +1819,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(300), eval_interval: Some(10), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1784,6 +1842,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(300), eval_interval: Some(10), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1806,6 +1865,32 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(2 * 86400 + 3600 + 2 * 60), eval_interval: None, comment: None, + flow_options: OptionMap::default(), + }, + ), + ( + r" +CREATE FLOW task_3 +SINK TO schema_1.table_1 +EVAL INTERVAL '10 seconds' +WITH (defer_on_missing_source = 'true', foo = 'bar') +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]), + sink_table_name: ObjectName(vec![ + ObjectNamePart::Identifier(Ident::new("schema_1")), + ObjectNamePart::Identifier(Ident::new("table_1")), + ]), + or_replace: false, + if_not_exists: false, + expire_after: None, + eval_interval: Some(10), + comment: None, + flow_options: string_option_map([ + ("defer_on_missing_source", "true"), + ("foo", "bar"), + ]), }, ), ]; @@ -1821,6 +1906,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: expected.expire_after, eval_interval: expected.eval_interval, comment: expected.comment, + flow_options: expected.flow_options, // ignore query parse result query: create_task.query.clone(), }; diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 817b31518d..055a5984bd 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -612,6 +612,8 @@ pub struct CreateFlow { pub eval_interval: Option, /// Comment string pub comment: Option, + /// Flow creation options from `WITH (...)` + pub flow_options: OptionMap, /// SQL statement pub query: Box, } @@ -669,6 +671,10 @@ impl Display for CreateFlow { if let Some(comment) = &self.comment { writeln!(f, "COMMENT '{}'", comment)?; } + if !self.flow_options.is_empty() { + let options = self.flow_options.kv_pairs(); + writeln!(f, "WITH ({})", format_list_comma!(options))?; + } write!(f, "AS {}", &self.query) } }