diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 03c7afc460..84c96cc7cd 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -35,8 +35,7 @@ use query::QueryEngineRef; use query::query_engine::DefaultSerializer; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt, ensure}; -use sql::parser::{ParseOptions, ParserContext}; -use sql::statements::statement::Statement; +use sql::parsers::utils::is_tql; use store_api::mito_engine_options::MERGE_MODE_KEY; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::adapter::DfTableProviderAdapter; @@ -84,22 +83,14 @@ pub struct TaskConfig { } fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result { - let stmts = - ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default()) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - - ensure!( - stmts.len() == 1, - InvalidQuerySnafu { - reason: format!("Expect only one statement, found {}", stmts.len()) - } - ); - let stmt = &stmts[0]; - match stmt { - Statement::Tql(_) => Ok(QueryType::Tql), - _ => Ok(QueryType::Sql), - } + let is_tql = is_tql(query_ctx.sql_dialect(), query) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + Ok(if is_tql { + QueryType::Tql + } else { + QueryType::Sql + }) } fn is_merge_mode_last_non_null(options: &HashMap) -> bool { diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index 2139ff34b1..b2ec96a76b 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -1158,6 +1158,105 @@ TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http ); } + #[test] + fn test_create_flow_tql_cte_source_tables() { + let sql = r#" +CREATE FLOW calc_cte +SINK TO metric_cte_sink +EVAL INTERVAL '1m' +AS +WITH tql(ts, the_value) AS ( + TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte +) +SELECT * FROM tql; +"#; + + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + + let to_dot_sep = + |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name); + assert_eq!(1, expr.source_table_names.len()); + assert_eq!( + "greptime.public.metric_cte", + to_dot_sep(expr.source_table_names[0].clone()) + ); + } + + #[test] + fn test_create_flow_tql_cte_source_tables_quoted_cte_name() { + let sql = r#" +CREATE FLOW calc_cte +SINK TO metric_cte_sink +EVAL INTERVAL '1m' +AS +WITH "TQL"(ts, the_value) AS ( + TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte +) +SELECT * FROM "TQL"; +"#; + + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + + let to_dot_sep = + |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name); + assert_eq!(1, expr.source_table_names.len()); + assert_eq!( + "greptime.public.metric_cte", + to_dot_sep(expr.source_table_names[0].clone()) + ); + } + + #[test] + fn test_create_flow_tql_cte_source_tables_same_name() { + let sql = r#" +CREATE FLOW calc_cte +SINK TO metric_cte_sink +EVAL INTERVAL '1m' +AS +WITH tql(ts, the_value) AS ( + TQL EVAL (now() - '1m'::interval, now(), '5s') tql +) +SELECT * FROM tql; +"#; + + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + + let to_dot_sep = + |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name); + assert_eq!(1, expr.source_table_names.len()); + assert_eq!( + "greptime.public.tql", + to_dot_sep(expr.source_table_names[0].clone()) + ); + } + #[test] fn test_create_flow_expr() { let sql = r" diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 2114ccd5e1..0a24eb3713 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -68,6 +68,7 @@ use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; use snafu::{OptionExt, ResultExt, ensure}; use sql::parser::{ParseOptions, ParserContext}; +use sql::parsers::utils::is_tql; use sql::statements::OptionMap; #[cfg(feature = "enterprise")] use sql::statements::alter::trigger::AlterTrigger; @@ -716,6 +717,13 @@ impl StatementExecutor { ); let stmt = &stmts[0]; + if is_tql(query_ctx.sql_dialect(), &expr.sql) + .map_err(BoxedError::new) + .context(ExternalSnafu)? + { + return Ok(FlowType::Batching); + } + // support tql parse too let plan = match stmt { // prom ql is only supported in batching mode diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index da3b6159fb..d69a1af61d 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -38,9 +38,9 @@ use table::requests::validate_database_option; use crate::ast::{ColumnDef, Ident, ObjectNamePartExt}; use crate::error::{ - self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu, - InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu, - UnexpectedSnafu, UnsupportedSnafu, + self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu, + InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, + SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu, }; use crate::parser::{FLOW, ParserContext}; use crate::parsers::tql_parser; @@ -343,7 +343,7 @@ impl<'a> ParserContext<'a> { .expect_keyword(Keyword::AS) .context(SyntaxSnafu)?; - let query = Box::new(self.parse_sql_or_tql(true)?); + let query = Box::new(self.parse_flow_sql_or_tql(true)?); Ok(Statement::CreateFlow(CreateFlow { flow_name, @@ -357,14 +357,20 @@ impl<'a> ParserContext<'a> { })) } - fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result { + fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result { let start_loc = self.parser.peek_token().span.start; let start_index = location_to_index(self.sql, &start_loc); + let starts_with_with = matches!( + self.parser.peek_token().token, + Token::Word(w) if w.keyword == Keyword::WITH + ); + // only accept sql or tql let query = match self.parser.peek_token().token { Token::Word(w) => match w.keyword { Keyword::SELECT => self.parse_query(), + Keyword::WITH => self.parse_with_tql_with_now(require_now_expr), Keyword::NoKeyword if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL => { @@ -376,6 +382,23 @@ impl<'a> ParserContext<'a> { _ => self.unsupported(self.peek_token_as_string()), }?; + if starts_with_with { + let Statement::Query(query) = &query else { + return InvalidFlowQuerySnafu { + reason: "Expect a query after WITH".to_string(), + } + .fail(); + }; + + if !utils::is_simple_tql_cte_query(query) { + return InvalidFlowQuerySnafu { + reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW" + .to_string(), + } + .fail(); + } + } + let end_token = self.parser.peek_token(); let raw_query = if end_token == Token::EOF { @@ -386,6 +409,7 @@ impl<'a> ParserContext<'a> { &self.sql[start_index..end_index.min(self.sql.len())] }; let raw_query = raw_query.trim_end_matches(";"); + let query = SqlOrTql::try_from_statement(query, raw_query)?; Ok(query) } @@ -1808,6 +1832,125 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", } } + #[test] + fn test_parse_create_flow_with_tql_cte_query() { + let sql = r#" +CREATE FLOW calc_reqs_cte +SINK TO cnt_reqs_cte +EVAL INTERVAL '1m' +AS +WITH tql(the_timestamp, the_value) AS ( + TQL EVAL (now() - '1m'::interval, now(), '5s') metric +) +SELECT * FROM tql; +"#; + + let stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + let Statement::CreateFlow(create_flow) = &stmts[0] else { + panic!("unexpected stmt: {:?}", stmts[0]); + }; + + let query = create_flow.query.to_string(); + assert!(query.to_uppercase().contains("WITH")); + assert!(query.to_uppercase().contains("TQL EVAL")); + } + + #[test] + fn test_parse_create_flow_with_sql_cte_is_unsupported() { + let sql = r#" +CREATE FLOW f +SINK TO s +AS +WITH cte AS (SELECT 1) SELECT * FROM cte; +"#; + + let err = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap_err(); + let msg = err.to_string(); + assert!(msg.to_uppercase().contains("WITH"), "err: {msg}"); + } + + #[test] + fn test_parse_create_flow_with_tql_cte_requires_now_expr() { + let sql = r#" +CREATE FLOW f +SINK TO s +EVAL INTERVAL '1m' +AS +WITH tql(ts, val) AS ( + TQL EVAL (0, 15, '5s') metric +) +SELECT * FROM tql; +"#; + + let err = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap_err(); + + let msg = format!("{err:?}"); + assert!( + msg.contains("Expected expression containing `now()`"), + "unexpected err: {msg}" + ); + } + + #[test] + fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() { + let sql = r#" +CREATE FLOW f +SINK TO s +AS +WITH tql(ts, val) AS ( + TQL EVAL (now() - '1m'::interval, now(), '5s') metric +) +SELECT ts FROM tql; +"#; + + let err = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap_err(); + assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}"); + } + + #[test] + fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() { + let sql = r#" +CREATE FLOW f +SINK TO s +AS +WITH tql(ts, val) AS ( + TQL EVAL (now() - '1m'::interval, now(), '5s') metric +) +SELECT * FROM tql WHERE ts > 0; +"#; + + let err = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap_err(); + assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}"); + } + + #[test] + fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() { + let sql = r#" +CREATE FLOW f +SINK TO s +AS +WITH s1 AS (SELECT 1), + tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric) +SELECT * FROM tql; +"#; + + let err = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap_err(); + assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}"); + } + #[test] fn test_create_flow_no_month() { let sql = r" diff --git a/src/sql/src/parsers/create_parser/trigger.rs b/src/sql/src/parsers/create_parser/trigger.rs index 1130632afe..8699ceb834 100644 --- a/src/sql/src/parsers/create_parser/trigger.rs +++ b/src/sql/src/parsers/create_parser/trigger.rs @@ -1,7 +1,6 @@ use std::time::Duration; use snafu::{OptionExt, ResultExt, ensure}; -use sqlparser::ast::Query; use sqlparser::keywords::Keyword; use sqlparser::parser::Parser; use sqlparser::tokenizer::Token; @@ -16,6 +15,7 @@ use crate::statements::create::SqlOrTql; use crate::statements::create::trigger::{ AlertManagerWebhook, ChannelType, CreateTrigger, DurationExpr, NotifyChannel, TriggerOn, }; +use crate::statements::query::Query; use crate::statements::statement::Statement; use crate::util::{location_to_index, parse_option_string}; @@ -259,7 +259,8 @@ impl<'a> ParserContext<'a> { ); let raw_sql = &self.sql[start_index..end_index]; - Ok((*sql_query, raw_sql.trim().to_string())) + let query = (*sql_query).try_into()?; + Ok((query, raw_sql.trim().to_string())) } pub(crate) fn parse_trigger_for( diff --git a/src/sql/src/parsers/utils.rs b/src/sql/src/parsers/utils.rs index 9c3637ebd2..0306bd859d 100644 --- a/src/sql/src/parsers/utils.rs +++ b/src/sql/src/parsers/utils.rs @@ -47,11 +47,13 @@ use crate::error::{ Result, SimplificationSnafu, SyntaxSnafu, }; use crate::parser::{ParseOptions, ParserContext}; +use crate::parsers::with_tql_parser::CteContent; use crate::statements::OptionMap; +use crate::statements::query::Query; use crate::statements::statement::Statement; use crate::util::{OptionValue, parse_option_string}; -/// Check if the given SQL query is a TQL statement. +/// Check if the given SQL query is a TQL statement. Simple tql cte query is also considered as TQL statement. pub fn is_tql(dialect: &dyn Dialect, sql: &str) -> Result { let stmts = ParserContext::create_with_dialect(sql, dialect, ParseOptions::default())?; @@ -64,10 +66,123 @@ pub fn is_tql(dialect: &dyn Dialect, sql: &str) -> Result { let stmt = &stmts[0]; match stmt { Statement::Tql(_) => Ok(true), + Statement::Query(query) => Ok(is_simple_tql_cte_query(query)), _ => Ok(false), } } +pub(crate) fn is_simple_tql_cte_query(query: &Query) -> bool { + use crate::parser::ParserContext; + + let Some(hybrid_cte) = &query.hybrid_cte else { + return false; + }; + + if !has_only_hybrid_tql_cte(query) { + return false; + } + + let Some(cte) = hybrid_cte.cte_tables.first() else { + return false; + }; + if hybrid_cte.cte_tables.len() != 1 || !matches!(cte.content, CteContent::Tql(_)) { + return false; + } + + let Some(reference) = extract_simple_select_star_reference(query) else { + return false; + }; + + let reference = ParserContext::canonicalize_identifier(reference).value; + let cte_name = ParserContext::canonicalize_identifier(cte.name.clone()).value; + reference == cte_name +} + +fn has_only_hybrid_tql_cte(query: &Query) -> bool { + query + .inner + .with + .as_ref() + .is_none_or(|with| with.cte_tables.is_empty()) +} + +fn extract_simple_select_star_reference(query: &Query) -> Option { + use sqlparser::ast::{SetExpr, TableFactor}; + + if !is_plain_query_root(&query.inner) { + return None; + } + + let SetExpr::Select(select) = &*query.inner.body else { + return None; + }; + if !is_plain_select(select) || !is_plain_wildcard_projection(select.projection.as_slice()) { + return None; + } + + let [table_with_joins] = select.from.as_slice() else { + return None; + }; + if !table_with_joins.joins.is_empty() { + return None; + } + + let TableFactor::Table { name, .. } = &table_with_joins.relation else { + return None; + }; + if name.0.len() != 1 { + return None; + } + + name.0[0].as_ident().cloned() +} + +fn is_plain_query_root(query: &sqlparser::ast::Query) -> bool { + query.order_by.is_none() + && query.limit_clause.is_none() + && query.fetch.is_none() + && query.locks.is_empty() + && query.for_clause.is_none() + && query.settings.is_none() + && query.format_clause.is_none() + && query.pipe_operators.is_empty() +} + +fn is_plain_select(select: &sqlparser::ast::Select) -> bool { + use sqlparser::ast::GroupByExpr; + + select.distinct.is_none() + && select.top.is_none() + && select.exclude.is_none() + && select.into.is_none() + && select.lateral_views.is_empty() + && select.prewhere.is_none() + && select.selection.is_none() + && matches!(select.group_by, GroupByExpr::Expressions(ref exprs, _) if exprs.is_empty()) + && select.cluster_by.is_empty() + && select.distribute_by.is_empty() + && select.sort_by.is_empty() + && select.having.is_none() + && select.named_window.is_empty() + && select.qualify.is_none() + && select.value_table_mode.is_none() + && select.connect_by.is_empty() +} + +fn is_plain_wildcard_projection(projection: &[sqlparser::ast::SelectItem]) -> bool { + use sqlparser::ast::SelectItem; + + matches!( + projection, + [SelectItem::Wildcard(options)] + if options.opt_ilike.is_none() + && options.opt_exclude.is_none() + && options.opt_except.is_none() + && options.opt_replace.is_none() + && options.opt_rename.is_none() + ) +} + /// Convert a parser expression to a scalar value. This function will try the /// best to resolve and reduce constants. Exprs like `1 + 1` or `now()` can be /// handled properly. @@ -294,6 +409,74 @@ mod tests { use datatypes::arrow::datatypes::TimestampNanosecondType; use super::*; + use crate::dialect::GreptimeDbDialect; + use crate::parser::{ParseOptions, ParserContext}; + use crate::statements::statement::Statement; + + #[test] + fn test_is_tql() { + let dialect = GreptimeDbDialect {}; + + assert!(is_tql(&dialect, "TQL EVAL (0, 10, '1s') cpu_usage_total").unwrap()); + assert!(!is_tql(&dialect, "SELECT 1").unwrap()); + + let tql_cte = r#" +WITH tql_cte(ts, val) AS ( + TQL EVAL (0, 15, '5s') metric +) +SELECT * FROM tql_cte +"#; + assert!(is_tql(&dialect, tql_cte).unwrap()); + + let rename_cols = r#" +WITH tql (the_timestamp, the_value) AS ( + TQL EVAL (0, 40, '10s') metric +) +SELECT * FROM tql +"#; + assert!(is_tql(&dialect, rename_cols).unwrap()); + let stmts = + ParserContext::create_with_dialect(rename_cols, &dialect, ParseOptions::default()) + .unwrap(); + let Statement::Query(q) = &stmts[0] else { + panic!("Expected Query statement"); + }; + let hybrid = q.hybrid_cte.as_ref().expect("Expected hybrid cte"); + assert_eq!(hybrid.cte_tables.len(), 1); + assert_eq!(hybrid.cte_tables[0].columns.len(), 2); + assert_eq!(hybrid.cte_tables[0].columns[0].to_string(), "the_timestamp"); + assert_eq!(hybrid.cte_tables[0].columns[1].to_string(), "the_value"); + + let sql_cte = r#" +WITH cte AS (SELECT 1) +SELECT * FROM cte +"#; + assert!(!is_tql(&dialect, sql_cte).unwrap()); + + let extra_sql_cte = r#" +WITH sql_cte AS (SELECT 1), tql_cte(ts, val) AS ( + TQL EVAL (0, 15, '5s') metric +) +SELECT * FROM tql_cte +"#; + assert!(!is_tql(&dialect, extra_sql_cte).unwrap()); + + let not_select_star = r#" +WITH tql_cte(ts, val) AS ( + TQL EVAL (0, 15, '5s') metric +) +SELECT ts FROM tql_cte +"#; + assert!(!is_tql(&dialect, not_select_star).unwrap()); + + let with_filter = r#" +WITH tql_cte(ts, val) AS ( + TQL EVAL (0, 15, '5s') metric +) +SELECT * FROM tql_cte WHERE ts > 0 +"#; + assert!(!is_tql(&dialect, with_filter).unwrap()); + } /// Keep this test to make sure we are using datafusion's `ExprSimplifier` correctly. #[test] diff --git a/src/sql/src/parsers/with_tql_parser.rs b/src/sql/src/parsers/with_tql_parser.rs index bd1de322d2..609caaf3b0 100644 --- a/src/sql/src/parsers/with_tql_parser.rs +++ b/src/sql/src/parsers/with_tql_parser.rs @@ -27,7 +27,7 @@ use sqlparser_derive::{Visit, VisitMut}; use crate::dialect::GreptimeDbDialect; use crate::error::{self, Result}; -use crate::parser::{ParseOptions, ParserContext}; +use crate::parser::ParserContext; use crate::parsers::tql_parser; use crate::statements::query::Query; use crate::statements::statement::Statement; @@ -97,6 +97,10 @@ impl fmt::Display for HybridCteWith { impl ParserContext<'_> { /// Parse a WITH clause that may contain TQL CTEs or SQL CTEs. pub(crate) fn parse_with_tql(&mut self) -> Result { + self.parse_with_tql_with_now(false) + } + + pub(crate) fn parse_with_tql_with_now(&mut self, require_now_expr: bool) -> Result { // Consume the WITH token self.parser .expect_keyword(Keyword::WITH) @@ -110,7 +114,7 @@ impl ParserContext<'_> { let mut sql_cte_tables = Vec::new(); loop { - let cte = self.parse_hybrid_cte()?; + let cte = self.parse_hybrid_cte(require_now_expr)?; match cte.content { CteContent::Sql(body) => sql_cte_tables.push(Cte { alias: TableAlias { @@ -161,7 +165,7 @@ impl ParserContext<'_> { } /// Parse a single CTE that can be either SQL or TQL - fn parse_hybrid_cte(&mut self) -> Result { + fn parse_hybrid_cte(&mut self, require_now_expr: bool) -> Result { // Parse CTE name let name = self.parser.parse_identifier().context(error::SyntaxSnafu)?; let name = Self::canonicalize_identifier(name); @@ -182,7 +186,7 @@ impl ParserContext<'_> { .expect_token(&Token::LParen) .context(error::SyntaxSnafu)?; - let content = self.parse_cte_content()?; + let content = self.parse_cte_content(require_now_expr)?; self.parser .expect_token(&Token::RParen) @@ -196,14 +200,14 @@ impl ParserContext<'_> { } /// Determine if CTE contains TQL or SQL and parse accordingly - fn parse_cte_content(&mut self) -> Result { + fn parse_cte_content(&mut self, require_now_expr: bool) -> Result { // Check if the next token is TQL if let Token::Word(w) = &self.parser.peek_token().token && w.keyword == Keyword::NoKeyword && w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL { - let tql = self.parse_tql_content_in_cte()?; + let tql = self.parse_tql_content_in_cte(require_now_expr)?; return Ok(CteContent::Tql(tql)); } @@ -219,7 +223,7 @@ impl ParserContext<'_> { /// can handle it normally. /// /// Only `TQL EVAL` is supported inside CTEs. - fn parse_tql_content_in_cte(&mut self) -> Result { + fn parse_tql_content_in_cte(&mut self, require_now_expr: bool) -> Result { // Consume and get the position of the TQL keyword let tql_token = self.parser.next_token(); if tql_token.token == Token::EOF { @@ -271,21 +275,10 @@ impl ParserContext<'_> { let tql_string = &self.sql[start_index..end_index]; let tql_string = tql_string.trim(); - // Parse the TQL string using the standard TQL parser - let mut stmts = ParserContext::create_with_dialect( - tql_string, - &GreptimeDbDialect {}, - ParseOptions::default(), - )?; + let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, tql_string)?; + let statement = parser_ctx.parse_tql(require_now_expr)?; - if stmts.len() != 1 { - return Err(error::InvalidSqlSnafu { - msg: "Expected a single TQL statement inside CTE".to_string(), - } - .build()); - } - - match stmts.remove(0) { + match statement { Statement::Tql(Tql::Eval(eval)) => Ok(Tql::Eval(eval)), Statement::Tql(_) => Err(error::InvalidSqlSnafu { msg: "Only TQL EVAL is supported in CTEs".to_string(), diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 2bfb05bc4b..817b31518d 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -27,7 +27,7 @@ use datatypes::types::StructType; use itertools::Itertools; use serde::Serialize; use snafu::{OptionExt, ResultExt}; -use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query}; +use sqlparser::ast::{ColumnOptionDef, DataType, Expr}; use sqlparser_derive::{Visit, VisitMut}; use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue}; @@ -35,6 +35,7 @@ use crate::error::{ InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu, }; +use crate::statements::query::Query as GtQuery; use crate::statements::statement::Statement; use crate::statements::tql::Tql; use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type}; @@ -618,7 +619,7 @@ pub struct CreateFlow { /// Either a sql query or a tql query #[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)] pub enum SqlOrTql { - Sql(Query, String), + Sql(GtQuery, String), Tql(Tql, String), } @@ -637,9 +638,7 @@ impl SqlOrTql { original_query: &str, ) -> std::result::Result { match value { - Statement::Query(query) => { - Ok(Self::Sql((*query).try_into()?, original_query.to_string())) - } + Statement::Query(query) => Ok(Self::Sql(*query, original_query.to_string())), Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())), _ => InvalidFlowQuerySnafu { reason: format!("Expect either sql query or promql query, found {:?}", value), diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index 1f533468e0..bbf5ce3277 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -33,7 +33,10 @@ use sqlparser_derive::{Visit, VisitMut}; use crate::ast::ObjectNamePartExt; use crate::error::{InvalidExprAsOptionValueSnafu, InvalidSqlSnafu, Result}; +use crate::parser::ParserContext; +use crate::parsers::with_tql_parser::CteContent; use crate::statements::create::SqlOrTql; +use crate::statements::query::Query; use crate::statements::tql::Tql; const SCHEMA_MATCHER: &str = "__schema__"; @@ -191,13 +194,57 @@ pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator extract_tables_from_set_expr(&query.body, &mut names), + SqlOrTql::Sql(query, _) => { + extract_tables_from_set_expr(&query.inner.body, &mut names); + extract_tables_from_hybrid_cte_query(query, &mut names); + } SqlOrTql::Tql(tql, _) => extract_tables_from_tql(tql, &mut names), } names.into_iter() } +fn extract_tables_from_hybrid_cte_query(query: &Query, sql_names: &mut HashSet) { + let mut tql_names = HashSet::new(); + let mut cte_names: HashSet = HashSet::new(); + if let Some(hybrid_cte) = &query.hybrid_cte { + for cte in &hybrid_cte.cte_tables { + cte_names.insert(ParserContext::canonicalize_identifier(cte.name.clone()).value); + if let CteContent::Tql(tql) = &cte.content { + extract_tables_from_tql(tql, &mut tql_names); + } + } + } + + if let Some(with) = &query.inner.with { + for cte in &with.cte_tables { + cte_names.insert(ParserContext::canonicalize_identifier(cte.alias.name.clone()).value); + } + } + + remove_cte_names(sql_names, &cte_names); + + sql_names.extend(tql_names); +} + +fn remove_cte_names(names: &mut HashSet, cte_names: &HashSet) { + if cte_names.is_empty() { + return; + } + + names.retain(|name| { + if name.0.len() != 1 { + return true; + } + let Some(ident) = name.0[0].as_ident() else { + return true; + }; + + let canonical = ParserContext::canonicalize_identifier(ident.clone()).value; + !cte_names.contains(&canonical) + }); +} + fn extract_tables_from_tql(tql: &Tql, names: &mut HashSet) { let promql = match tql { Tql::Eval(eval) => &eval.query, diff --git a/tests/cases/distributed/flow-tql/flow_tql_cte.result b/tests/cases/distributed/flow-tql/flow_tql_cte.result new file mode 100644 index 0000000000..8c3347050e --- /dev/null +++ b/tests/cases/distributed/flow-tql/flow_tql_cte.result @@ -0,0 +1,194 @@ +CREATE TABLE metric_cte ( + ts timestamp(3) time index, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE metric_cte VALUES + (0::Timestamp, 0), + (5000::Timestamp, 1), + (10000::Timestamp, 2), + (15000::Timestamp, 3); + +Affected Rows: 4 + +CREATE FLOW calc_cte SINK TO metric_cte_sink EVAL INTERVAL '1m' AS +WITH tql (ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM tql; + +Affected Rows: 0 + +CREATE TABLE tql ( + ts timestamp(3) time index, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE tql VALUES + (0::Timestamp, 10), + (5000::Timestamp, 20), + (10000::Timestamp, 30), + (15000::Timestamp, 40); + +Affected Rows: 4 + +-- Fail due to case sensitivity of CTE name +CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS +WITH "TQL"(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM TQL; + +Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW + +CREATE FLOW calc_cte_non_star SINK TO metric_cte_non_star_sink EVAL INTERVAL '1m' AS +WITH tql(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT ts FROM tql; + +Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW + +CREATE FLOW calc_cte_filter SINK TO metric_cte_filter_sink EVAL INTERVAL '1m' AS +WITH tql(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM tql WHERE ts > 0::timestamp; + +Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW + +CREATE FLOW calc_cte_mixed SINK TO metric_cte_mixed_sink EVAL INTERVAL '1m' AS +WITH s1 AS (SELECT 1), + tql(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte + ) +SELECT * FROM tql; + +Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW + +CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS +WITH "TQL"(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM "TQL"; + +Affected Rows: 0 + +SHOW CREATE TABLE metric_cte_sink; + ++-----------------+---------------------------------------------------+ +| Table | Create Table | ++-----------------+---------------------------------------------------+ +| metric_cte_sink | CREATE TABLE IF NOT EXISTS "metric_cte_sink" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "the_value" DOUBLE NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-----------------+---------------------------------------------------+ + +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte'; + ++----------------------------+ +| source_table_names | ++----------------------------+ +| greptime.public.metric_cte | ++----------------------------+ + +SHOW CREATE TABLE metric_cte_join_sink; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| metric_cte_join_sink | CREATE TABLE IF NOT EXISTS "metric_cte_join_sink" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "the_value" DOUBLE NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++----------------------+-----------------------------------------------------+ + +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte_case'; + ++----------------------------+ +| source_table_names | ++----------------------------+ +| greptime.public.metric_cte | ++----------------------------+ + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_cte'); + ++------------------------------+ +| ADMIN FLUSH_FLOW('calc_cte') | ++------------------------------+ +| FLOW_FLUSHED | ++------------------------------+ + +SELECT * FROM metric_cte_sink ORDER BY ts; + ++---------------------+-----------+ +| ts | the_value | ++---------------------+-----------+ +| 1970-01-01T00:00:00 | 0.0 | +| 1970-01-01T00:00:05 | 1.0 | +| 1970-01-01T00:00:10 | 2.0 | +| 1970-01-01T00:00:15 | 3.0 | ++---------------------+-----------+ + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_cte_case'); + ++-----------------------------------+ +| ADMIN FLUSH_FLOW('calc_cte_case') | ++-----------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------+ + +SELECT * FROM metric_cte_join_sink ORDER BY ts; + ++---------------------+-----------+ +| ts | the_value | ++---------------------+-----------+ +| 1970-01-01T00:00:00 | 0.0 | +| 1970-01-01T00:00:05 | 1.0 | +| 1970-01-01T00:00:10 | 2.0 | +| 1970-01-01T00:00:15 | 3.0 | ++---------------------+-----------+ + +DROP FLOW calc_cte; + +Affected Rows: 0 + +DROP FLOW calc_cte_case; + +Affected Rows: 0 + +DROP TABLE metric_cte; + +Affected Rows: 0 + +DROP TABLE tql; + +Affected Rows: 0 + +DROP TABLE metric_cte_sink; + +Affected Rows: 0 + +DROP TABLE metric_cte_join_sink; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/flow-tql/flow_tql_cte.sql b/tests/cases/distributed/flow-tql/flow_tql_cte.sql new file mode 120000 index 0000000000..7e772ce751 --- /dev/null +++ b/tests/cases/distributed/flow-tql/flow_tql_cte.sql @@ -0,0 +1 @@ +../../standalone/flow-tql/flow_tql_cte.sql \ No newline at end of file diff --git a/tests/cases/standalone/flow-tql/flow_tql_cte.result b/tests/cases/standalone/flow-tql/flow_tql_cte.result new file mode 100644 index 0000000000..8c3347050e --- /dev/null +++ b/tests/cases/standalone/flow-tql/flow_tql_cte.result @@ -0,0 +1,194 @@ +CREATE TABLE metric_cte ( + ts timestamp(3) time index, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE metric_cte VALUES + (0::Timestamp, 0), + (5000::Timestamp, 1), + (10000::Timestamp, 2), + (15000::Timestamp, 3); + +Affected Rows: 4 + +CREATE FLOW calc_cte SINK TO metric_cte_sink EVAL INTERVAL '1m' AS +WITH tql (ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM tql; + +Affected Rows: 0 + +CREATE TABLE tql ( + ts timestamp(3) time index, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE tql VALUES + (0::Timestamp, 10), + (5000::Timestamp, 20), + (10000::Timestamp, 30), + (15000::Timestamp, 40); + +Affected Rows: 4 + +-- Fail due to case sensitivity of CTE name +CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS +WITH "TQL"(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM TQL; + +Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW + +CREATE FLOW calc_cte_non_star SINK TO metric_cte_non_star_sink EVAL INTERVAL '1m' AS +WITH tql(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT ts FROM tql; + +Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW + +CREATE FLOW calc_cte_filter SINK TO metric_cte_filter_sink EVAL INTERVAL '1m' AS +WITH tql(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM tql WHERE ts > 0::timestamp; + +Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW + +CREATE FLOW calc_cte_mixed SINK TO metric_cte_mixed_sink EVAL INTERVAL '1m' AS +WITH s1 AS (SELECT 1), + tql(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte + ) +SELECT * FROM tql; + +Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW + +CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS +WITH "TQL"(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM "TQL"; + +Affected Rows: 0 + +SHOW CREATE TABLE metric_cte_sink; + ++-----------------+---------------------------------------------------+ +| Table | Create Table | ++-----------------+---------------------------------------------------+ +| metric_cte_sink | CREATE TABLE IF NOT EXISTS "metric_cte_sink" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "the_value" DOUBLE NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-----------------+---------------------------------------------------+ + +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte'; + ++----------------------------+ +| source_table_names | ++----------------------------+ +| greptime.public.metric_cte | ++----------------------------+ + +SHOW CREATE TABLE metric_cte_join_sink; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| metric_cte_join_sink | CREATE TABLE IF NOT EXISTS "metric_cte_join_sink" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "the_value" DOUBLE NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++----------------------+-----------------------------------------------------+ + +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte_case'; + ++----------------------------+ +| source_table_names | ++----------------------------+ +| greptime.public.metric_cte | ++----------------------------+ + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_cte'); + ++------------------------------+ +| ADMIN FLUSH_FLOW('calc_cte') | ++------------------------------+ +| FLOW_FLUSHED | ++------------------------------+ + +SELECT * FROM metric_cte_sink ORDER BY ts; + ++---------------------+-----------+ +| ts | the_value | ++---------------------+-----------+ +| 1970-01-01T00:00:00 | 0.0 | +| 1970-01-01T00:00:05 | 1.0 | +| 1970-01-01T00:00:10 | 2.0 | +| 1970-01-01T00:00:15 | 3.0 | ++---------------------+-----------+ + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_cte_case'); + ++-----------------------------------+ +| ADMIN FLUSH_FLOW('calc_cte_case') | ++-----------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------+ + +SELECT * FROM metric_cte_join_sink ORDER BY ts; + ++---------------------+-----------+ +| ts | the_value | ++---------------------+-----------+ +| 1970-01-01T00:00:00 | 0.0 | +| 1970-01-01T00:00:05 | 1.0 | +| 1970-01-01T00:00:10 | 2.0 | +| 1970-01-01T00:00:15 | 3.0 | ++---------------------+-----------+ + +DROP FLOW calc_cte; + +Affected Rows: 0 + +DROP FLOW calc_cte_case; + +Affected Rows: 0 + +DROP TABLE metric_cte; + +Affected Rows: 0 + +DROP TABLE tql; + +Affected Rows: 0 + +DROP TABLE metric_cte_sink; + +Affected Rows: 0 + +DROP TABLE metric_cte_join_sink; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow-tql/flow_tql_cte.sql b/tests/cases/standalone/flow-tql/flow_tql_cte.sql new file mode 100644 index 0000000000..3da409b93e --- /dev/null +++ b/tests/cases/standalone/flow-tql/flow_tql_cte.sql @@ -0,0 +1,84 @@ +CREATE TABLE metric_cte ( + ts timestamp(3) time index, + val DOUBLE, +); + +INSERT INTO TABLE metric_cte VALUES + (0::Timestamp, 0), + (5000::Timestamp, 1), + (10000::Timestamp, 2), + (15000::Timestamp, 3); + +CREATE FLOW calc_cte SINK TO metric_cte_sink EVAL INTERVAL '1m' AS +WITH tql (ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM tql; + +CREATE TABLE tql ( + ts timestamp(3) time index, + val DOUBLE, +); + +INSERT INTO TABLE tql VALUES + (0::Timestamp, 10), + (5000::Timestamp, 20), + (10000::Timestamp, 30), + (15000::Timestamp, 40); + +-- Fail due to case sensitivity of CTE name +CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS +WITH "TQL"(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM TQL; + +CREATE FLOW calc_cte_non_star SINK TO metric_cte_non_star_sink EVAL INTERVAL '1m' AS +WITH tql(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT ts FROM tql; + +CREATE FLOW calc_cte_filter SINK TO metric_cte_filter_sink EVAL INTERVAL '1m' AS +WITH tql(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM tql WHERE ts > 0::timestamp; + +CREATE FLOW calc_cte_mixed SINK TO metric_cte_mixed_sink EVAL INTERVAL '1m' AS +WITH s1 AS (SELECT 1), + tql(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte + ) +SELECT * FROM tql; + +CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS +WITH "TQL"(ts, the_value) AS ( + TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte +) +SELECT * FROM "TQL"; + +SHOW CREATE TABLE metric_cte_sink; + +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte'; + +SHOW CREATE TABLE metric_cte_join_sink; + +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte_case'; + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_cte'); + +SELECT * FROM metric_cte_sink ORDER BY ts; + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_cte_case'); + +SELECT * FROM metric_cte_join_sink ORDER BY ts; + +DROP FLOW calc_cte; +DROP FLOW calc_cte_case; +DROP TABLE metric_cte; +DROP TABLE tql; +DROP TABLE metric_cte_sink; +DROP TABLE metric_cte_join_sink;