mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 12:00:40 +00:00
feat: support TQL parsing in CREATE TRIGGER (#7599)
* feat: support tql in trigger create parse * support cte in parse_sql_or_tql * add parse_parenthesized_sql_or_tql method * allow dead code for parse_parenthesized_tql * revert some * fix: code review If the end index is greater than sql.len, it should return an error, not silently default to sql.len. * add more tests * Improve error readability * fix: cr
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use sqlparser::ast::Query;
|
||||
use sqlparser::keywords::Keyword;
|
||||
use sqlparser::parser::Parser;
|
||||
use sqlparser::tokenizer::Token;
|
||||
@@ -8,13 +9,15 @@ use sqlparser::tokenizer::Token;
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::parser::ParserContext;
|
||||
use crate::parsers::tql_parser;
|
||||
use crate::parsers::utils::convert_month_day_nano_to_duration;
|
||||
use crate::statements::OptionMap;
|
||||
use crate::statements::create::SqlOrTql;
|
||||
use crate::statements::create::trigger::{
|
||||
AlertManagerWebhook, ChannelType, CreateTrigger, DurationExpr, NotifyChannel, TriggerOn,
|
||||
};
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::util::parse_option_string;
|
||||
use crate::util::{location_to_index, parse_option_string};
|
||||
|
||||
/// Some keywords about trigger.
|
||||
pub const ON: &str = "ON";
|
||||
@@ -155,7 +158,13 @@ impl<'a> ParserContext<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
let query = self.parser.parse_query().context(error::SyntaxSnafu)?;
|
||||
if let Token::LParen = self.parser.peek_token().token {
|
||||
self.parser.next_token();
|
||||
} else {
|
||||
return self.expected("`(` after `ON`", self.parser.peek_token());
|
||||
}
|
||||
|
||||
let query = self.parse_parenthesized_sql_or_tql(true)?;
|
||||
|
||||
if let Token::Word(w) = self.parser.peek_token().token
|
||||
&& w.value.eq_ignore_ascii_case(EVERY)
|
||||
@@ -196,6 +205,63 @@ impl<'a> ParserContext<'a> {
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_parenthesized_sql_or_tql(&mut self, is_lparen_consumed: bool) -> Result<SqlOrTql> {
|
||||
if !is_lparen_consumed {
|
||||
self.parser
|
||||
.expect_token(&Token::LParen)
|
||||
.context(error::SyntaxSnafu)?;
|
||||
}
|
||||
|
||||
if let Token::Word(w) = self.parser.peek_token().token
|
||||
&& w.keyword == Keyword::NoKeyword
|
||||
&& w.quote_style.is_none()
|
||||
&& w.value.to_uppercase() == tql_parser::TQL
|
||||
{
|
||||
let (tql, raw_query) = self.parse_parenthesized_tql(true, true, true)?;
|
||||
Ok(SqlOrTql::Tql(tql, raw_query))
|
||||
} else {
|
||||
let (query, raw_query) = self.parse_parenthesized_sql(true)?;
|
||||
Ok(SqlOrTql::Sql(query, raw_query))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_parenthesized_sql(&mut self, is_lparen_consumed: bool) -> Result<(Query, String)> {
|
||||
if !is_lparen_consumed {
|
||||
self.parser
|
||||
.expect_token(&Token::LParen)
|
||||
.context(error::SyntaxSnafu)?;
|
||||
}
|
||||
|
||||
let sql_len = self.sql.len();
|
||||
let start_loc = self.parser.peek_token().span.start;
|
||||
let start_index = location_to_index(self.sql, &start_loc);
|
||||
|
||||
ensure!(
|
||||
start_index <= sql_len,
|
||||
error::InvalidSqlSnafu {
|
||||
msg: format!("Invalid location (index {} > len {})", start_index, sql_len),
|
||||
}
|
||||
);
|
||||
|
||||
let sql_query = self.parser.parse_query().context(error::SyntaxSnafu)?;
|
||||
|
||||
let end_token = self.parser.peek_token();
|
||||
self.parser
|
||||
.expect_token(&Token::RParen)
|
||||
.context(error::SyntaxSnafu)?;
|
||||
|
||||
let end_index = location_to_index(self.sql, &end_token.span.start);
|
||||
ensure!(
|
||||
end_index <= sql_len,
|
||||
error::InvalidSqlSnafu {
|
||||
msg: format!("Invalid location (index {} > len {})", end_index, sql_len),
|
||||
}
|
||||
);
|
||||
let raw_sql = &self.sql[start_index..end_index];
|
||||
|
||||
Ok((*sql_query, raw_sql.trim().to_string()))
|
||||
}
|
||||
|
||||
pub(crate) fn parse_trigger_for(
|
||||
&mut self,
|
||||
is_first_keyword_matched: bool,
|
||||
@@ -588,7 +654,7 @@ IF NOT EXISTS cpu_monitor
|
||||
assert_eq!(create_trigger.trigger_name.to_string(), "cpu_monitor");
|
||||
assert_eq!(
|
||||
create_trigger.trigger_on.query.to_string(),
|
||||
"(SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1)"
|
||||
"SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1"
|
||||
);
|
||||
let TriggerOn {
|
||||
query,
|
||||
@@ -596,7 +662,7 @@ IF NOT EXISTS cpu_monitor
|
||||
} = &create_trigger.trigger_on;
|
||||
assert_eq!(
|
||||
query.to_string(),
|
||||
"(SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1)"
|
||||
"SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1"
|
||||
);
|
||||
assert_eq!(query_interval.duration, Duration::from_secs(300));
|
||||
assert_eq!(query_interval.raw_expr.clone(), "'5 minute'::INTERVAL");
|
||||
@@ -642,10 +708,20 @@ IF NOT EXISTS cpu_monitor
|
||||
query,
|
||||
query_interval: interval,
|
||||
} = ctx.parse_trigger_on(false).unwrap();
|
||||
assert_eq!(query.to_string(), "(SELECT * FROM cpu_usage)");
|
||||
assert_eq!(query.to_string(), "SELECT * FROM cpu_usage");
|
||||
assert_eq!(interval.duration, Duration::from_secs(300));
|
||||
assert_eq!(interval.raw_expr, "'5 minute'::INTERVAL");
|
||||
|
||||
// Invalid, since missing `(` after `ON`.
|
||||
let sql = "ON SELECT * FROM cpu_usage EVERY '5 minute'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
assert!(ctx.parse_trigger_on(false).is_err());
|
||||
|
||||
// Invalid, since missing `)` after query expression.
|
||||
let sql = "ON (SELECT * FROM cpu_usage EVERY '5 minute'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
assert!(ctx.parse_trigger_on(false).is_err());
|
||||
|
||||
// Invalid, since missing `ON` keyword.
|
||||
let sql = "SELECT * FROM cpu_usage EVERY '5 minute'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
@@ -683,6 +759,30 @@ IF NOT EXISTS cpu_monitor
|
||||
assert_eq!(trigger_on.query_interval.duration, Duration::from_secs(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_parenthesized_sql_or_tql_sql_branch() {
|
||||
let sql = "(SELECT 1)";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let parsed = ctx.parse_parenthesized_sql_or_tql(false).unwrap();
|
||||
let expected = "SELECT 1";
|
||||
match parsed {
|
||||
SqlOrTql::Sql(_, raw) => assert_eq!(raw, expected),
|
||||
_ => panic!("Expected SQL branch"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_parenthesized_sql_or_tql_tql_branch() {
|
||||
let sql = "(TQL EVAL (now() - now(), now(), '1s') cpu_usage)";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let parsed = ctx.parse_parenthesized_sql_or_tql(false).unwrap();
|
||||
let expected = "TQL EVAL (now() - now(), now(), '1s') cpu_usage";
|
||||
match parsed {
|
||||
SqlOrTql::Tql(_, raw) => assert_eq!(raw, expected),
|
||||
_ => panic!("Expected TQL branch"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_trigger_labels() {
|
||||
// Normal.
|
||||
|
||||
@@ -96,6 +96,100 @@ impl ParserContext<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a parenthesized TQL statement and return [`Tql`] and raw text.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// - `is_lparen_consumed`: whether the leading `(` has already been consumed by the caller.
|
||||
/// - `require_now_expr`: whether time params must contain `now()` (same as [`Self::parse_tql`]).
|
||||
/// - `only_eval`: if `true`, rejects non-`TQL EVAL/EVALUATE` statements.
|
||||
///
|
||||
/// # Examples
|
||||
/// - (TQL EVAL (0, 10, '1s') cpu_usage)
|
||||
#[cfg(feature = "enterprise")]
|
||||
pub(crate) fn parse_parenthesized_tql(
|
||||
&mut self,
|
||||
is_lparen_consumed: bool,
|
||||
require_now_expr: bool,
|
||||
only_eval: bool,
|
||||
) -> Result<(Tql, String)> {
|
||||
use crate::error::InvalidSqlSnafu;
|
||||
|
||||
if !is_lparen_consumed {
|
||||
self.parser
|
||||
.expect_token(&Token::LParen)
|
||||
.context(error::SyntaxSnafu)?;
|
||||
}
|
||||
|
||||
let tql_token = self.parser.peek_token();
|
||||
let start_location = tql_token.span.start;
|
||||
let mut paren_depth = 0usize;
|
||||
let end_location;
|
||||
|
||||
loop {
|
||||
let token_with_span = self.parser.peek_token();
|
||||
|
||||
if token_with_span.token == Token::EOF {
|
||||
return Err(InvalidSqlSnafu {
|
||||
msg: "Unexpected end of input while parsing TQL".to_string(),
|
||||
}
|
||||
.build());
|
||||
}
|
||||
|
||||
if token_with_span.token == Token::RParen && paren_depth == 0 {
|
||||
end_location = token_with_span.span.start;
|
||||
self.parser.next_token();
|
||||
break;
|
||||
}
|
||||
|
||||
let consumed = self.parser.next_token();
|
||||
match consumed.token {
|
||||
Token::LParen => paren_depth += 1,
|
||||
Token::RParen => paren_depth = paren_depth.saturating_sub(1),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let sql_len = self.sql.len();
|
||||
let start_index = location_to_index(self.sql, &start_location);
|
||||
snafu::ensure!(
|
||||
start_index <= sql_len,
|
||||
error::InvalidSqlSnafu {
|
||||
msg: format!("Invalid location (index {} > len {})", start_index, sql_len),
|
||||
}
|
||||
);
|
||||
|
||||
let end_index = location_to_index(self.sql, &end_location);
|
||||
snafu::ensure!(
|
||||
end_index <= sql_len,
|
||||
error::InvalidSqlSnafu {
|
||||
msg: format!("Invalid location (index {} > len {})", end_index, sql_len),
|
||||
}
|
||||
);
|
||||
|
||||
let tql_sql = &self.sql[start_index..end_index];
|
||||
let tql_sql = tql_sql.trim();
|
||||
let raw_query = tql_sql.trim_end_matches(';');
|
||||
|
||||
let mut parser_ctx = ParserContext::new(&crate::dialect::GreptimeDbDialect {}, tql_sql)?;
|
||||
let statement = parser_ctx.parse_tql(require_now_expr)?;
|
||||
|
||||
match statement {
|
||||
Statement::Tql(tql) => match (only_eval, tql) {
|
||||
(true, Tql::Eval(eval)) => Ok((Tql::Eval(eval), raw_query.to_string())),
|
||||
(true, _) => Err(InvalidSqlSnafu {
|
||||
msg: "Only TQL EVAL is supported in this context".to_string(),
|
||||
}
|
||||
.build()),
|
||||
(false, tql) => Ok((tql, raw_query.to_string())),
|
||||
},
|
||||
_ => Err(InvalidSqlSnafu {
|
||||
msg: "Expected TQL statement",
|
||||
}
|
||||
.build()),
|
||||
}
|
||||
}
|
||||
|
||||
/// `require_now_expr` indicates whether the start&end must contain a `now()` function.
|
||||
fn parse_tql_params(
|
||||
&mut self,
|
||||
@@ -312,7 +406,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::dialect::GreptimeDbDialect;
|
||||
use crate::parser::ParseOptions;
|
||||
use crate::parser::{ParseOptions, ParserContext};
|
||||
|
||||
fn parse_into_statement(sql: &str) -> Statement {
|
||||
let mut result =
|
||||
@@ -1281,4 +1375,48 @@ mod tests {
|
||||
result.output_msg()
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
#[test]
|
||||
fn test_parse_parenthesized_tql_only_eval_allowed() {
|
||||
let sql = "(TQL EXPLAIN http_requests_total)";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let result = ctx.parse_parenthesized_tql(false, false, true);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
#[test]
|
||||
fn test_parse_parenthesized_tql_allow_non_eval_when_flag_false() {
|
||||
let sql = "(TQL EXPLAIN http_requests_total)";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
|
||||
let (tql, raw) = ctx
|
||||
.parse_parenthesized_tql(false, false, false)
|
||||
.expect("should allow non-eval when flag is false");
|
||||
|
||||
match tql {
|
||||
Tql::Explain(_) => {}
|
||||
_ => panic!("Expected TQL Explain variant"),
|
||||
}
|
||||
assert!(raw.contains("TQL EXPLAIN"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
#[test]
|
||||
fn test_parse_parenthesized_tql_without_lparen() {
|
||||
let sql = "TQL EVAL http_requests_total)";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let result = ctx.parse_parenthesized_tql(false, false, true);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
#[test]
|
||||
fn test_parse_parenthesized_tql_without_rparen() {
|
||||
let sql = "(TQL EVAL http_requests_total";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let result = ctx.parse_parenthesized_tql(false, false, true);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,9 +311,9 @@ mod tests {
|
||||
// Test that parentheses within the TQL query don't interfere with CTE parsing
|
||||
let sql = r#"
|
||||
WITH tql_cte AS (
|
||||
TQL EVAL (0, 100, '5s')
|
||||
TQL EVAL (0, 100, '5s')
|
||||
sum(rate(http_requests_total[1m])) + (max(cpu_usage) * (1 + 0.5))
|
||||
)
|
||||
)
|
||||
SELECT * FROM tql_cte
|
||||
"#;
|
||||
|
||||
@@ -348,10 +348,10 @@ mod tests {
|
||||
#[test]
|
||||
fn test_parse_hybrid_cte_sql_and_tql() {
|
||||
let sql = r#"
|
||||
WITH
|
||||
WITH
|
||||
sql_cte(ts, value, label) AS (SELECT timestamp, val, name FROM metrics),
|
||||
tql_cte(time, metric_value) AS (TQL EVAL (0, 100, '5s') cpu_usage)
|
||||
SELECT s.ts, s.value, t.metric_value
|
||||
SELECT s.ts, s.value, t.metric_value
|
||||
FROM sql_cte s JOIN tql_cte t ON s.ts = t.time
|
||||
"#;
|
||||
|
||||
|
||||
@@ -4,12 +4,12 @@ use std::time::Duration;
|
||||
|
||||
use itertools::Itertools;
|
||||
use serde::Serialize;
|
||||
use sqlparser::ast::{Query, Visit, VisitMut, Visitor, VisitorMut};
|
||||
use sqlparser::ast::{Visit, VisitMut, Visitor, VisitorMut};
|
||||
use sqlparser_derive::{Visit, VisitMut};
|
||||
|
||||
use crate::ast::{Ident, ObjectName};
|
||||
use crate::statements::OptionMap;
|
||||
use crate::statements::create::{COMMA_SEP, INDENT};
|
||||
use crate::statements::create::{COMMA_SEP, INDENT, SqlOrTql};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
|
||||
pub struct CreateTrigger {
|
||||
@@ -118,13 +118,13 @@ impl VisitMut for DurationExpr {
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
|
||||
pub struct TriggerOn {
|
||||
pub query: Box<Query>,
|
||||
pub query: SqlOrTql,
|
||||
pub query_interval: DurationExpr,
|
||||
}
|
||||
|
||||
impl Display for TriggerOn {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "ON {} EVERY {}", self.query, self.query_interval)
|
||||
write!(f, "ON ({}) EVERY {}", self.query, self.query_interval)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,6 +146,7 @@ mod tests {
|
||||
|
||||
use crate::dialect::GreptimeDbDialect;
|
||||
use crate::parser::{ParseOptions, ParserContext};
|
||||
use crate::statements::create::SqlOrTql;
|
||||
use crate::statements::statement::Statement;
|
||||
|
||||
#[test]
|
||||
@@ -198,4 +199,33 @@ WEBHOOK alert_manager2 URL 'http://127.0.0.1:9093' WITH (timeout='1m')
|
||||
};
|
||||
assert_eq!(duration_expr_no_raw.to_string(), "600 seconds");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_trigger_with_tql_query() {
|
||||
let sql = r#"CREATE TRIGGER cpu_monitor
|
||||
ON (TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') cpu_usage_total)
|
||||
EVERY '5 minute'::INTERVAL
|
||||
NOTIFY(
|
||||
WEBHOOK alert_manager URL 'http://127.0.0.1:9093' WITH (timeout='1m')
|
||||
)
|
||||
"#;
|
||||
|
||||
let result =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap();
|
||||
assert_eq!(1, result.len());
|
||||
|
||||
let Statement::CreateTrigger(trigger) = &result[0] else {
|
||||
panic!("Expected CreateTrigger statement");
|
||||
};
|
||||
|
||||
match &trigger.trigger_on.query {
|
||||
SqlOrTql::Tql(_, raw) => {
|
||||
assert!(raw.contains("TQL EVAL"));
|
||||
assert!(raw.ends_with("cpu_usage_total"));
|
||||
assert!(!raw.trim_end().ends_with(')'));
|
||||
}
|
||||
_ => panic!("Expected TQL query in trigger"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user