From 379420abfcaf8ef60d542dcc530ea110f53ddd26 Mon Sep 17 00:00:00 2001 From: fys <40801205+fengys1996@users.noreply.github.com> Date: Thu, 22 Jan 2026 20:07:21 +0800 Subject: [PATCH] feat: support TQL parsing in CREATE TRIGGER (#7599) * feat: support tql in trigger create parse * support cte in parse_sql_or_tql * add parse_parenthesized_sql_or_tql method * allow dead code for parse_parenthesized_tql * revert some * fix: code review If the end index is greater than sql.len, it should return an error, not silently default to sql.len. * add more tests * Improve error readability * fix: cr --- src/sql/src/parsers/create_parser/trigger.rs | 110 ++++++++++++++- src/sql/src/parsers/tql_parser.rs | 140 ++++++++++++++++++- src/sql/src/parsers/with_tql_parser.rs | 8 +- src/sql/src/statements/create/trigger.rs | 38 ++++- 4 files changed, 282 insertions(+), 14 deletions(-) diff --git a/src/sql/src/parsers/create_parser/trigger.rs b/src/sql/src/parsers/create_parser/trigger.rs index d93cb26f33..1130632afe 100644 --- a/src/sql/src/parsers/create_parser/trigger.rs +++ b/src/sql/src/parsers/create_parser/trigger.rs @@ -1,6 +1,7 @@ use std::time::Duration; use snafu::{OptionExt, ResultExt, ensure}; +use sqlparser::ast::Query; use sqlparser::keywords::Keyword; use sqlparser::parser::Parser; use sqlparser::tokenizer::Token; @@ -8,13 +9,15 @@ use sqlparser::tokenizer::Token; use crate::error; use crate::error::Result; use crate::parser::ParserContext; +use crate::parsers::tql_parser; use crate::parsers::utils::convert_month_day_nano_to_duration; use crate::statements::OptionMap; +use crate::statements::create::SqlOrTql; use crate::statements::create::trigger::{ AlertManagerWebhook, ChannelType, CreateTrigger, DurationExpr, NotifyChannel, TriggerOn, }; use crate::statements::statement::Statement; -use crate::util::parse_option_string; +use crate::util::{location_to_index, parse_option_string}; /// Some keywords about trigger. pub const ON: &str = "ON"; @@ -155,7 +158,13 @@ impl<'a> ParserContext<'a> { } } - let query = self.parser.parse_query().context(error::SyntaxSnafu)?; + if let Token::LParen = self.parser.peek_token().token { + self.parser.next_token(); + } else { + return self.expected("`(` after `ON`", self.parser.peek_token()); + } + + let query = self.parse_parenthesized_sql_or_tql(true)?; if let Token::Word(w) = self.parser.peek_token().token && w.value.eq_ignore_ascii_case(EVERY) @@ -196,6 +205,63 @@ impl<'a> ParserContext<'a> { }) } + fn parse_parenthesized_sql_or_tql(&mut self, is_lparen_consumed: bool) -> Result { + if !is_lparen_consumed { + self.parser + .expect_token(&Token::LParen) + .context(error::SyntaxSnafu)?; + } + + if let Token::Word(w) = self.parser.peek_token().token + && w.keyword == Keyword::NoKeyword + && w.quote_style.is_none() + && w.value.to_uppercase() == tql_parser::TQL + { + let (tql, raw_query) = self.parse_parenthesized_tql(true, true, true)?; + Ok(SqlOrTql::Tql(tql, raw_query)) + } else { + let (query, raw_query) = self.parse_parenthesized_sql(true)?; + Ok(SqlOrTql::Sql(query, raw_query)) + } + } + + fn parse_parenthesized_sql(&mut self, is_lparen_consumed: bool) -> Result<(Query, String)> { + if !is_lparen_consumed { + self.parser + .expect_token(&Token::LParen) + .context(error::SyntaxSnafu)?; + } + + let sql_len = self.sql.len(); + let start_loc = self.parser.peek_token().span.start; + let start_index = location_to_index(self.sql, &start_loc); + + ensure!( + start_index <= sql_len, + error::InvalidSqlSnafu { + msg: format!("Invalid location (index {} > len {})", start_index, sql_len), + } + ); + + let sql_query = self.parser.parse_query().context(error::SyntaxSnafu)?; + + let end_token = self.parser.peek_token(); + self.parser + .expect_token(&Token::RParen) + .context(error::SyntaxSnafu)?; + + let end_index = location_to_index(self.sql, &end_token.span.start); + ensure!( + end_index <= sql_len, + error::InvalidSqlSnafu { + msg: format!("Invalid location (index {} > len {})", end_index, sql_len), + } + ); + let raw_sql = &self.sql[start_index..end_index]; + + Ok((*sql_query, raw_sql.trim().to_string())) + } + pub(crate) fn parse_trigger_for( &mut self, is_first_keyword_matched: bool, @@ -588,7 +654,7 @@ IF NOT EXISTS cpu_monitor assert_eq!(create_trigger.trigger_name.to_string(), "cpu_monitor"); assert_eq!( create_trigger.trigger_on.query.to_string(), - "(SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1)" + "SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1" ); let TriggerOn { query, @@ -596,7 +662,7 @@ IF NOT EXISTS cpu_monitor } = &create_trigger.trigger_on; assert_eq!( query.to_string(), - "(SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1)" + "SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1" ); assert_eq!(query_interval.duration, Duration::from_secs(300)); assert_eq!(query_interval.raw_expr.clone(), "'5 minute'::INTERVAL"); @@ -642,10 +708,20 @@ IF NOT EXISTS cpu_monitor query, query_interval: interval, } = ctx.parse_trigger_on(false).unwrap(); - assert_eq!(query.to_string(), "(SELECT * FROM cpu_usage)"); + assert_eq!(query.to_string(), "SELECT * FROM cpu_usage"); assert_eq!(interval.duration, Duration::from_secs(300)); assert_eq!(interval.raw_expr, "'5 minute'::INTERVAL"); + // Invalid, since missing `(` after `ON`. + let sql = "ON SELECT * FROM cpu_usage EVERY '5 minute'::INTERVAL"; + let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + assert!(ctx.parse_trigger_on(false).is_err()); + + // Invalid, since missing `)` after query expression. + let sql = "ON (SELECT * FROM cpu_usage EVERY '5 minute'::INTERVAL"; + let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + assert!(ctx.parse_trigger_on(false).is_err()); + // Invalid, since missing `ON` keyword. let sql = "SELECT * FROM cpu_usage EVERY '5 minute'::INTERVAL"; let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); @@ -683,6 +759,30 @@ IF NOT EXISTS cpu_monitor assert_eq!(trigger_on.query_interval.duration, Duration::from_secs(1)); } + #[test] + fn test_parse_parenthesized_sql_or_tql_sql_branch() { + let sql = "(SELECT 1)"; + let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + let parsed = ctx.parse_parenthesized_sql_or_tql(false).unwrap(); + let expected = "SELECT 1"; + match parsed { + SqlOrTql::Sql(_, raw) => assert_eq!(raw, expected), + _ => panic!("Expected SQL branch"), + } + } + + #[test] + fn test_parse_parenthesized_sql_or_tql_tql_branch() { + let sql = "(TQL EVAL (now() - now(), now(), '1s') cpu_usage)"; + let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + let parsed = ctx.parse_parenthesized_sql_or_tql(false).unwrap(); + let expected = "TQL EVAL (now() - now(), now(), '1s') cpu_usage"; + match parsed { + SqlOrTql::Tql(_, raw) => assert_eq!(raw, expected), + _ => panic!("Expected TQL branch"), + } + } + #[test] fn test_parse_trigger_labels() { // Normal. diff --git a/src/sql/src/parsers/tql_parser.rs b/src/sql/src/parsers/tql_parser.rs index bb5e858e4a..a918bbcbe0 100644 --- a/src/sql/src/parsers/tql_parser.rs +++ b/src/sql/src/parsers/tql_parser.rs @@ -96,6 +96,100 @@ impl ParserContext<'_> { } } + /// Parse a parenthesized TQL statement and return [`Tql`] and raw text. + /// + /// # Parameters + /// + /// - `is_lparen_consumed`: whether the leading `(` has already been consumed by the caller. + /// - `require_now_expr`: whether time params must contain `now()` (same as [`Self::parse_tql`]). + /// - `only_eval`: if `true`, rejects non-`TQL EVAL/EVALUATE` statements. + /// + /// # Examples + /// - (TQL EVAL (0, 10, '1s') cpu_usage) + #[cfg(feature = "enterprise")] + pub(crate) fn parse_parenthesized_tql( + &mut self, + is_lparen_consumed: bool, + require_now_expr: bool, + only_eval: bool, + ) -> Result<(Tql, String)> { + use crate::error::InvalidSqlSnafu; + + if !is_lparen_consumed { + self.parser + .expect_token(&Token::LParen) + .context(error::SyntaxSnafu)?; + } + + let tql_token = self.parser.peek_token(); + let start_location = tql_token.span.start; + let mut paren_depth = 0usize; + let end_location; + + loop { + let token_with_span = self.parser.peek_token(); + + if token_with_span.token == Token::EOF { + return Err(InvalidSqlSnafu { + msg: "Unexpected end of input while parsing TQL".to_string(), + } + .build()); + } + + if token_with_span.token == Token::RParen && paren_depth == 0 { + end_location = token_with_span.span.start; + self.parser.next_token(); + break; + } + + let consumed = self.parser.next_token(); + match consumed.token { + Token::LParen => paren_depth += 1, + Token::RParen => paren_depth = paren_depth.saturating_sub(1), + _ => {} + } + } + + let sql_len = self.sql.len(); + let start_index = location_to_index(self.sql, &start_location); + snafu::ensure!( + start_index <= sql_len, + error::InvalidSqlSnafu { + msg: format!("Invalid location (index {} > len {})", start_index, sql_len), + } + ); + + let end_index = location_to_index(self.sql, &end_location); + snafu::ensure!( + end_index <= sql_len, + error::InvalidSqlSnafu { + msg: format!("Invalid location (index {} > len {})", end_index, sql_len), + } + ); + + let tql_sql = &self.sql[start_index..end_index]; + let tql_sql = tql_sql.trim(); + let raw_query = tql_sql.trim_end_matches(';'); + + let mut parser_ctx = ParserContext::new(&crate::dialect::GreptimeDbDialect {}, tql_sql)?; + let statement = parser_ctx.parse_tql(require_now_expr)?; + + match statement { + Statement::Tql(tql) => match (only_eval, tql) { + (true, Tql::Eval(eval)) => Ok((Tql::Eval(eval), raw_query.to_string())), + (true, _) => Err(InvalidSqlSnafu { + msg: "Only TQL EVAL is supported in this context".to_string(), + } + .build()), + (false, tql) => Ok((tql, raw_query.to_string())), + }, + _ => Err(InvalidSqlSnafu { + msg: "Expected TQL statement", + } + .build()), + } + } + /// `require_now_expr` indicates whether the start&end must contain a `now()` function. fn parse_tql_params( &mut self, @@ -312,7 +406,7 @@ mod tests { use super::*; use crate::dialect::GreptimeDbDialect; - use crate::parser::ParseOptions; + use crate::parser::{ParseOptions, ParserContext}; fn parse_into_statement(sql: &str) -> Statement { let mut result = @@ -1281,4 +1375,48 @@ mod tests { result.output_msg() ); } + + #[cfg(feature = "enterprise")] + #[test] + fn test_parse_parenthesized_tql_only_eval_allowed() { + let sql = "(TQL EXPLAIN http_requests_total)"; + let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + let result = ctx.parse_parenthesized_tql(false, false, true); + assert!(result.is_err()); + } + + #[cfg(feature = "enterprise")] + #[test] + fn test_parse_parenthesized_tql_allow_non_eval_when_flag_false() { + let sql = "(TQL EXPLAIN http_requests_total)"; + let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + + let (tql, raw) = ctx + .parse_parenthesized_tql(false, false, false) + .expect("should allow non-eval when flag is false"); + + match tql { + Tql::Explain(_) => {} + _ => panic!("Expected TQL Explain variant"), + } + assert!(raw.contains("TQL EXPLAIN")); + } + + #[cfg(feature = "enterprise")] + #[test] + fn test_parse_parenthesized_tql_without_lparen() { + let sql = "TQL EVAL http_requests_total)"; + let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + let result = ctx.parse_parenthesized_tql(false, false, true); + assert!(result.is_err()); + } + + #[cfg(feature = "enterprise")] + #[test] + fn test_parse_parenthesized_tql_without_rparen() { + let sql = "(TQL EVAL http_requests_total"; + let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + let result = ctx.parse_parenthesized_tql(false, false, true); + assert!(result.is_err()); + } } diff --git a/src/sql/src/parsers/with_tql_parser.rs b/src/sql/src/parsers/with_tql_parser.rs index e52e3103a4..ca44d10de7 100644 --- a/src/sql/src/parsers/with_tql_parser.rs +++ b/src/sql/src/parsers/with_tql_parser.rs @@ -311,9 +311,9 @@ mod tests { // Test that parentheses within the TQL query don't interfere with CTE parsing let sql = r#" WITH tql_cte AS ( - TQL EVAL (0, 100, '5s') + TQL EVAL (0, 100, '5s') sum(rate(http_requests_total[1m])) + (max(cpu_usage) * (1 + 0.5)) - ) + ) SELECT * FROM tql_cte "#; @@ -348,10 +348,10 @@ mod tests { #[test] fn test_parse_hybrid_cte_sql_and_tql() { let sql = r#" - WITH + WITH sql_cte(ts, value, label) AS (SELECT timestamp, val, name FROM metrics), tql_cte(time, metric_value) AS (TQL EVAL (0, 100, '5s') cpu_usage) - SELECT s.ts, s.value, t.metric_value + SELECT s.ts, s.value, t.metric_value FROM sql_cte s JOIN tql_cte t ON s.ts = t.time "#; diff --git a/src/sql/src/statements/create/trigger.rs b/src/sql/src/statements/create/trigger.rs index a6b76c2211..146913c0cf 100644 --- a/src/sql/src/statements/create/trigger.rs +++ b/src/sql/src/statements/create/trigger.rs @@ -4,12 +4,12 @@ use std::time::Duration; use itertools::Itertools; use serde::Serialize; -use sqlparser::ast::{Query, Visit, VisitMut, Visitor, VisitorMut}; +use sqlparser::ast::{Visit, VisitMut, Visitor, VisitorMut}; use sqlparser_derive::{Visit, VisitMut}; use crate::ast::{Ident, ObjectName}; use crate::statements::OptionMap; -use crate::statements::create::{COMMA_SEP, INDENT}; +use crate::statements::create::{COMMA_SEP, INDENT, SqlOrTql}; #[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)] pub struct CreateTrigger { @@ -118,13 +118,13 @@ impl VisitMut for DurationExpr { #[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)] pub struct TriggerOn { - pub query: Box, + pub query: SqlOrTql, pub query_interval: DurationExpr, } impl Display for TriggerOn { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "ON {} EVERY {}", self.query, self.query_interval) + write!(f, "ON ({}) EVERY {}", self.query, self.query_interval) } } @@ -146,6 +146,7 @@ mod tests { use crate::dialect::GreptimeDbDialect; use crate::parser::{ParseOptions, ParserContext}; + use crate::statements::create::SqlOrTql; use crate::statements::statement::Statement; #[test] @@ -198,4 +199,33 @@ WEBHOOK alert_manager2 URL 'http://127.0.0.1:9093' WITH (timeout='1m') }; assert_eq!(duration_expr_no_raw.to_string(), "600 seconds"); } + + #[test] + fn test_parse_trigger_with_tql_query() { + let sql = r#"CREATE TRIGGER cpu_monitor +ON (TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') cpu_usage_total) +EVERY '5 minute'::INTERVAL +NOTIFY( + WEBHOOK alert_manager URL 'http://127.0.0.1:9093' WITH (timeout='1m') +) +"#; + + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, result.len()); + + let Statement::CreateTrigger(trigger) = &result[0] else { + panic!("Expected CreateTrigger statement"); + }; + + match &trigger.trigger_on.query { + SqlOrTql::Tql(_, raw) => { + assert!(raw.contains("TQL EVAL")); + assert!(raw.ends_with("cpu_usage_total")); + assert!(!raw.trim_end().ends_with(')')); + } + _ => panic!("Expected TQL query in trigger"), + } + } }