diff --git a/src/catalog/src/system_schema/information_schema/flows.rs b/src/catalog/src/system_schema/information_schema/flows.rs index 219a405e76..8002b17888 100644 --- a/src/catalog/src/system_schema/information_schema/flows.rs +++ b/src/catalog/src/system_schema/information_schema/flows.rs @@ -16,6 +16,7 @@ use std::sync::{Arc, Weak}; use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID; use common_error::ext::BoxedError; +use common_meta::ddl::create_flow::FlowType; use common_meta::key::FlowId; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::flow::flow_info::FlowInfoValue; @@ -168,6 +169,10 @@ impl InformationSchemaFlows { expire_after: flow_info.expire_after(), eval_interval: flow_info.eval_interval(), comment, + flow_options: sql::statements::OptionMap::from_filtered_string_map( + flow_info.options(), + &[FlowType::FLOW_TYPE_KEY], + ), query, }; diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index b2ec96a76b..378122030c 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -1047,10 +1047,22 @@ pub fn to_create_flow_task_expr( eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }), comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), - flow_options: Default::default(), + flow_options: stringify_flow_options(create_flow.flow_options)?, }) } +fn stringify_flow_options(flow_options: OptionMap) -> Result> { + let options_len = flow_options.len(); + let flow_options = flow_options.into_map(); + ensure!( + flow_options.len() == options_len, + InvalidSqlSnafu { + err_msg: "flow options only support scalar string-compatible values".to_string(), + } + ); + Ok(flow_options) +} + /// sanitize the flow name, remove possible quotes fn sanitize_flow_name(mut flow_name: ObjectName) -> Result { ensure!( @@ -1065,6 +1077,8 @@ fn sanitize_flow_name(mut flow_name: ObjectName) -> Result { #[cfg(test)] mod tests { + use std::collections::HashMap; + use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions}; use datatypes::value::Value; use session::context::{QueryContext, QueryContextBuilder}; @@ -1327,6 +1341,75 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; to_dot_sep(expr.source_table_names[0].clone()) ); assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql); + assert!(expr.flow_options.is_empty()); + + let sql = r" +CREATE FLOW task_3 +SINK TO schema_1.table_1 +WITH (defer_on_missing_source = 'true', foo = 'bar') +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + assert_eq!( + expr.flow_options, + HashMap::from([ + ("defer_on_missing_source".to_string(), "true".to_string()), + ("foo".to_string(), "bar".to_string()), + ]) + ); + + let sql = r" +CREATE FLOW task_4 +SINK TO schema_1.table_1 +WITH (defer_on_missing_source = true) +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + assert_eq!( + expr.flow_options, + HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)]) + ); + + let sql = r" +CREATE FLOW task_5 +SINK TO schema_1.table_1 +WITH (defer_on_missing_source = [true]) +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let res = to_create_flow_task_expr(create_flow, &QueryContext::arc()); + assert!(res.is_err()); + assert!( + res.unwrap_err() + .to_string() + .contains("flow options only support scalar string-compatible values") + ); let sql = r" CREATE FLOW abc.`task_2` diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 0a24eb3713..6fd6e4adb4 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -113,6 +113,9 @@ struct DdlSubmitOptions { timeout: Duration, } +const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source"; +const ALLOWED_FLOW_OPTIONS: [&str; 1] = [DEFER_ON_MISSING_SOURCE_KEY]; + fn build_procedure_id_output(procedure_id: Vec) -> Result { let procedure_id = String::from_utf8_lossy(&procedure_id).to_string(); let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id])); @@ -152,6 +155,55 @@ fn parse_ddl_options(options: &OptionMap) -> Result { Ok(DdlSubmitOptions { wait, timeout }) } +fn supported_flow_options() -> String { + ALLOWED_FLOW_OPTIONS.join(", ") +} + +fn normalize_flow_bool_option(key: &str, value: &str) -> Result { + value + .trim() + .to_ascii_lowercase() + .parse::() + .map(|value| value.to_string()) + .map_err(|_| { + InvalidSqlSnafu { + err_msg: format!("invalid flow option '{key}': '{value}'"), + } + .build() + }) +} + +fn validate_and_normalize_flow_options( + options: HashMap, +) -> Result> { + options + .into_iter() + .map(|(key, value)| { + if key == FlowType::FLOW_TYPE_KEY { + return InvalidSqlSnafu { + err_msg: format!("flow option '{key}' is reserved for internal use"), + } + .fail(); + } + + let normalized_value = match key.as_str() { + DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?, + _ => { + return InvalidSqlSnafu { + err_msg: format!( + "unknown flow option '{key}', supported options: {}", + supported_flow_options() + ), + } + .fail(); + } + }; + + Ok((key, normalized_value)) + }) + .collect() +} + impl StatementExecutor { pub fn catalog_manager(&self) -> CatalogManagerRef { self.catalog_manager.clone() @@ -626,20 +678,18 @@ impl StatementExecutor { async fn create_flow_procedure( &self, - expr: CreateFlowExpr, + mut expr: CreateFlowExpr, query_context: QueryContextRef, ) -> Result { + expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?; + let flow_type = self .determine_flow_type(&expr, query_context.clone()) .await?; info!("determined flow={} type: {:#?}", expr.flow_name, flow_type); - let expr = { - let mut expr = expr; - expr.flow_options - .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string()); - expr - }; + expr.flow_options + .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string()); let task = CreateFlowTask::try_from(PbCreateFlowTask { create_flow: Some(expr), @@ -2334,6 +2384,94 @@ mod test { assert_eq!(Duration::from_secs(300), ddl_options.timeout); } + #[test] + fn test_validate_and_normalize_flow_options_empty() { + assert!( + validate_and_normalize_flow_options(HashMap::new()) + .unwrap() + .is_empty() + ); + } + + #[test] + fn test_validate_and_normalize_flow_options_valid() { + let options = + HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string())]); + + assert_eq!( + validate_and_normalize_flow_options(options).unwrap(), + HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),)]) + ); + } + + #[test] + fn test_validate_and_normalize_flow_options_unknown_option() { + let err = validate_and_normalize_flow_options(HashMap::from([( + "foo".to_string(), + "bar".to_string(), + )])) + .unwrap_err(); + + assert!( + err.to_string() + .contains("unknown flow option 'foo', supported options: defer_on_missing_source") + ); + } + + #[test] + fn test_validate_and_normalize_flow_options_reserved_option() { + let err = validate_and_normalize_flow_options(HashMap::from([( + FlowType::FLOW_TYPE_KEY.to_string(), + FlowType::BATCHING.to_string(), + )])) + .unwrap_err(); + + assert!( + err.to_string() + .contains("flow option 'flow_type' is reserved for internal use") + ); + } + + #[test] + fn test_validate_and_normalize_flow_options_invalid_bool() { + let err = validate_and_normalize_flow_options(HashMap::from([( + DEFER_ON_MISSING_SOURCE_KEY.to_string(), + "not-a-bool".to_string(), + )])) + .unwrap_err(); + + assert!( + err.to_string() + .contains("invalid flow option 'defer_on_missing_source': 'not-a-bool'") + ); + } + + #[test] + fn test_validate_and_normalize_flow_options_rejects_redacted_invalid_input() { + let sql = r" +CREATE FLOW task_6 +SINK TO schema_1.table_1 +WITH (access_key_id = [true]) +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = + expr_helper::to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + let err = validate_and_normalize_flow_options(expr.flow_options).unwrap_err(); + + assert!(err.to_string().contains( + "unknown flow option 'access_key_id', supported options: defer_on_missing_source" + )); + } + #[test] fn test_name_is_match() { assert!(!NAME_PATTERN_REG.is_match("/adaf")); diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 74f8b13fea..d7d7399a5a 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -33,6 +33,7 @@ use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::build_backend; use common_datasource::util::find_dir_and_filename; use common_meta::SchemaOptions; +use common_meta::ddl::create_flow::FlowType; use common_meta::key::flow::flow_info::FlowInfoValue; use common_query::Output; use common_query::prelude::greptime_timestamp; @@ -78,6 +79,7 @@ const VIEWS_COLUMN: &str = "Views"; const FLOWS_COLUMN: &str = "Flows"; const FIELD_COLUMN: &str = "Field"; const TABLE_TYPE_COLUMN: &str = "Table_type"; + const COLUMN_NAME_COLUMN: &str = "Column"; const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type"; const COLUMN_TYPE_COLUMN: &str = "Type"; @@ -1056,6 +1058,10 @@ pub fn show_create_flow( expire_after: flow_val.expire_after(), eval_interval: flow_val.eval_interval(), comment, + flow_options: OptionMap::from_filtered_string_map( + flow_val.options(), + &[FlowType::FLOW_TYPE_KEY], + ), query, }; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index b83c2032db..a82590c603 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -68,6 +68,17 @@ pub const VECTOR: &str = "VECTOR"; pub type RawIntervalExpr = String; +// Preserve raw CREATE FLOW option entries until operator-side validation. +// Do not use `OptionMap::new()` here: it can drop non-string values for +// redacted keys before the flow option allowlist rejects them. +fn flow_option_map(options: HashMap) -> OptionMap { + let mut flow_options = OptionMap::default(); + for (key, value) in options { + flow_options.insert_options(&key, value); + } + flow_options +} + /// Parses create [table] statement impl<'a> ParserContext<'a> { pub(crate) fn parse_create(&mut self) -> Result { @@ -339,6 +350,14 @@ impl<'a> ParserContext<'a> { None }; + let flow_options = self + .parser + .parse_options(Keyword::WITH) + .context(SyntaxSnafu)? + .into_iter() + .map(parse_option_string) + .collect::>>()?; + self.parser .expect_keyword(Keyword::AS) .context(SyntaxSnafu)?; @@ -353,6 +372,7 @@ impl<'a> ParserContext<'a> { expire_after, eval_interval, comment, + flow_options: flow_option_map(flow_options), query, })) } @@ -1256,6 +1276,20 @@ mod tests { use crate::dialect::GreptimeDbDialect; use crate::parser::ParseOptions; + fn string_option_map( + entries: impl IntoIterator, + ) -> OptionMap { + OptionMap::new(entries.into_iter().map(|(key, value)| { + ( + key.to_string(), + OptionValue::try_new(Expr::Value( + Value::SingleQuotedString(value.to_string()).into(), + )) + .unwrap(), + ) + })) + } + #[test] fn test_parse_create_table_like() { let sql = "CREATE TABLE t1 LIKE t2"; @@ -1498,6 +1532,8 @@ mod tests { pub expire_after: Option, /// Comment string pub comment: Option, + /// Flow creation options + pub flow_options: OptionMap, } let testcases = vec![ ( @@ -1518,6 +1554,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: true, expire_after: Some(300), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1538,6 +1575,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: true, expire_after: Some(300), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1558,6 +1596,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: true, expire_after: Some(300), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1578,6 +1617,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: true, expire_after: Some(300), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1597,6 +1637,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", if_not_exists: false, expire_after: Some(2 * 86400 + 3600 + 2 * 60), comment: None, + flow_options: OptionMap::default(), }, ), ( @@ -1616,6 +1657,7 @@ select max(c1), min(c2) from schema_2.table_2;", if_not_exists: false, expire_after: Some(600), // 10 minutes in seconds comment: None, + flow_options: OptionMap::default(), }, ), ( @@ -1636,6 +1678,27 @@ select max(c1), min(c2) from schema_2.table_2;", if_not_exists: true, expire_after: Some(7200), // 2 hours in seconds comment: Some("lowercase test".to_string()), + flow_options: OptionMap::default(), + }, + ), + ( + r" +CREATE FLOW task_5 +SINK TO schema_1.table_1 +WITH (defer_on_missing_source = 'true') +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName::from(vec![Ident::new("task_5")]), + sink_table_name: ObjectName::from(vec![ + Ident::new("schema_1"), + Ident::new("table_1"), + ]), + or_replace: false, + if_not_exists: false, + expire_after: None, + comment: None, + flow_options: string_option_map([("defer_on_missing_source", "true")]), }, ), ]; @@ -1651,6 +1714,7 @@ select max(c1), min(c2) from schema_2.table_2;", expire_after: expected.expire_after, eval_interval: None, comment: expected.comment, + flow_options: expected.flow_options, // ignore query parse result query: create_task.query.clone(), }; @@ -1696,6 +1760,8 @@ select max(c1), min(c2) from schema_2.table_2;", pub eval_interval: Option, /// Comment string pub comment: Option, + /// Flow creation options + pub flow_options: OptionMap, } // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT` @@ -1719,6 +1785,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(300), eval_interval: None, comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1740,6 +1807,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(300), eval_interval: None, comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1762,6 +1830,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(300), eval_interval: Some(10), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1784,6 +1853,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(300), eval_interval: Some(10), comment: Some("test comment".to_string()), + flow_options: OptionMap::default(), }, ), ( @@ -1806,6 +1876,32 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: Some(2 * 86400 + 3600 + 2 * 60), eval_interval: None, comment: None, + flow_options: OptionMap::default(), + }, + ), + ( + r" +CREATE FLOW task_3 +SINK TO schema_1.table_1 +EVAL INTERVAL '10 seconds' +WITH (defer_on_missing_source = 'true', foo = 'bar') +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]), + sink_table_name: ObjectName(vec![ + ObjectNamePart::Identifier(Ident::new("schema_1")), + ObjectNamePart::Identifier(Ident::new("table_1")), + ]), + or_replace: false, + if_not_exists: false, + expire_after: None, + eval_interval: Some(10), + comment: None, + flow_options: string_option_map([ + ("defer_on_missing_source", "true"), + ("foo", "bar"), + ]), }, ), ]; @@ -1821,6 +1917,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", expire_after: expected.expire_after, eval_interval: expected.eval_interval, comment: expected.comment, + flow_options: expected.flow_options, // ignore query parse result query: create_task.query.clone(), }; diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index f54fee1844..74ab8aee18 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -615,6 +615,8 @@ pub struct CreateFlow { pub eval_interval: Option, /// Comment string pub comment: Option, + /// Flow creation options from `WITH (...)` + pub flow_options: OptionMap, /// SQL statement pub query: Box, } @@ -672,6 +674,10 @@ impl Display for CreateFlow { if let Some(comment) = &self.comment { writeln!(f, "COMMENT '{}'", comment)?; } + if !self.flow_options.is_empty() { + let options = self.flow_options.kv_pairs(); + writeln!(f, "WITH ({})", format_list_comma!(options))?; + } write!(f, "AS {}", &self.query) } } diff --git a/src/sql/src/statements/option_map.rs b/src/sql/src/statements/option_map.rs index 61f8badd0a..d562a61ce0 100644 --- a/src/sql/src/statements/option_map.rs +++ b/src/sql/src/statements/option_map.rs @@ -49,6 +49,18 @@ impl OptionMap { } } + pub fn from_filtered_string_map( + options: &HashMap, + hidden_keys: &[&str], + ) -> Self { + Self::from( + options + .iter() + .filter(|(key, _)| !hidden_keys.contains(&key.as_str())) + .map(|(key, value)| (key.clone(), value.clone())), + ) + } + pub fn insert(&mut self, k: String, v: String) { if REDACTED_OPTIONS.contains(&k.as_str()) { self.secrets.insert(k, SecretString::new(Box::new(v))); @@ -221,6 +233,8 @@ impl VisitMut for OptionMap { #[cfg(test)] mod tests { + use std::collections::HashMap; + use crate::statements::OptionMap; #[test] @@ -237,4 +251,18 @@ mod tests { map.insert("a.b".to_string(), "中文comment\n".to_string()); assert_eq!("'a.b' = '中文comment\\n'", map.kv_pairs()[0]); } + + #[test] + fn test_from_filtered_string_map() { + let map = OptionMap::from_filtered_string_map( + &HashMap::from([ + ("visible".to_string(), "1".to_string()), + ("hidden".to_string(), "2".to_string()), + ]), + &["hidden"], + ); + + assert_eq!(map.get("visible"), Some("1")); + assert_eq!(map.get("hidden"), None); + } } diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index d18c95afde..113822cd18 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -442,6 +442,46 @@ DROP FLOW filter_numbers_show; Affected Rows: 0 +CREATE FLOW filter_numbers_show +SINK TO out_num_cnt_show +WITH (defer_on_missing_source = true) +AS SELECT number AS n1 FROM numbers_input_show where number > 10; + +Affected Rows: 0 + +SHOW CREATE FLOW filter_numbers_show; + ++---------------------+------------------------------------------------------------------+ +| Flow | Create Flow | ++---------------------+------------------------------------------------------------------+ +| filter_numbers_show | CREATE FLOW IF NOT EXISTS filter_numbers_show | +| | SINK TO public.out_num_cnt_show | +| | WITH (defer_on_missing_source = 'true') | +| | AS SELECT number AS n1 FROM numbers_input_show WHERE number > 10 | ++---------------------+------------------------------------------------------------------+ + +SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++------------------------------------------------------------------+ +| flow_definition | ++------------------------------------------------------------------+ +| CREATE FLOW IF NOT EXISTS filter_numbers_show | +| SINK TO public.out_num_cnt_show | +| WITH (defer_on_missing_source = 'true') | +| AS SELECT number AS n1 FROM numbers_input_show WHERE number > 10 | ++------------------------------------------------------------------+ + +CREATE FLOW invalid_flow_option_show +SINK TO out_num_cnt_show +WITH (access_key_id = [true]) +AS SELECT number AS n1 FROM numbers_input_show where number > 10; + +Error: 1004(InvalidArguments), Invalid SQL, error: unknown flow option 'access_key_id', supported options: defer_on_missing_source + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + drop table out_num_cnt_show; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index fdbc7f9cf1..9856a6041f 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -168,6 +168,22 @@ SELECT number FROM out_num_cnt_show; DROP FLOW filter_numbers_show; +CREATE FLOW filter_numbers_show +SINK TO out_num_cnt_show +WITH (defer_on_missing_source = true) +AS SELECT number AS n1 FROM numbers_input_show where number > 10; + +SHOW CREATE FLOW filter_numbers_show; + +SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +CREATE FLOW invalid_flow_option_show +SINK TO out_num_cnt_show +WITH (access_key_id = [true]) +AS SELECT number AS n1 FROM numbers_input_show where number > 10; + +DROP FLOW filter_numbers_show; + drop table out_num_cnt_show; drop table numbers_input_show;