refactor: enhanced trigger interval (#6740)

* refactor: enhance trigger interval

* update greptime-proto

* fix: build
This commit is contained in:
fys
2025-08-18 12:03:26 +08:00
committed by GitHub
parent f9d2a89a0c
commit 326198162e
17 changed files with 248 additions and 101 deletions

View File

@@ -12,6 +12,7 @@ enterprise = []
[dependencies]
api.workspace = true
arrow-buffer.workspace = true
chrono.workspace = true
common-base.workspace = true
common-catalog.workspace = true

View File

@@ -314,15 +314,6 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "enterprise")]
#[snafu(display("The execution interval cannot be negative"))]
NegativeInterval {
#[snafu(source)]
error: std::num::TryFromIntError,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "enterprise")]
#[snafu(display("Must specify at least one notify channel"))]
MissingNotifyChannel {
@@ -387,9 +378,7 @@ impl ErrorExt for Error {
InvalidTriggerName { .. } => StatusCode::InvalidArguments,
#[cfg(feature = "enterprise")]
InvalidTriggerWebhookOption { .. } | NegativeInterval { .. } => {
StatusCode::InvalidArguments
}
InvalidTriggerWebhookOption { .. } => StatusCode::InvalidArguments,
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
ConvertToGrpcDataType { source, .. } => source.status_code(),

View File

@@ -55,8 +55,7 @@ impl<'a> ParserContext<'a> {
let trigger_name = self.intern_parse_table_name()?;
let mut new_trigger_name = None;
let mut new_query = None;
let mut new_interval = None;
let mut trigger_on = None;
let mut label_ops = None;
let mut annotation_ops = None;
let mut notify_ops = None;
@@ -78,13 +77,9 @@ impl<'a> ParserContext<'a> {
}
Token::Word(w) if w.value.eq_ignore_ascii_case(ON) => {
self.parser.next_token();
let (query, interval) = self.parse_trigger_on(true)?;
ensure!(
new_query.is_none() && new_interval.is_none(),
DuplicateClauseSnafu { clause: ON }
);
new_query.replace(query);
new_interval.replace(interval);
let new_trigger_on = self.parse_trigger_on(true)?;
ensure!(trigger_on.is_none(), DuplicateClauseSnafu { clause: ON });
trigger_on.replace(new_trigger_on);
}
Token::Word(w) if w.value.eq_ignore_ascii_case(LABELS) => {
self.parser.next_token();
@@ -230,8 +225,7 @@ impl<'a> ParserContext<'a> {
}
if new_trigger_name.is_none()
&& new_query.is_none()
&& new_interval.is_none()
&& trigger_on.is_none()
&& label_ops.is_none()
&& annotation_ops.is_none()
&& notify_ops.is_none()
@@ -241,8 +235,7 @@ impl<'a> ParserContext<'a> {
let operation = AlterTriggerOperation {
rename: new_trigger_name,
new_query,
new_interval,
trigger_on,
label_operations: label_ops,
annotation_operations: annotation_ops,
notify_channel_operations: notify_ops,
@@ -544,10 +537,13 @@ fn apply_notify_change(
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::dialect::GreptimeDbDialect;
use crate::parser::ParserContext;
use crate::parsers::alter_parser::trigger::{apply_label_change, apply_label_replacement};
use crate::statements::alter::trigger::{LabelChange, LabelOperations};
use crate::statements::create::trigger::TriggerOn;
use crate::statements::statement::Statement;
use crate::statements::OptionMap;
@@ -571,9 +567,14 @@ mod tests {
let Statement::AlterTrigger(alter) = stmt else {
panic!("Expected AlterTrigger statement");
};
assert!(alter.operation.new_query.is_some());
assert!(alter.operation.new_interval.is_some());
assert_eq!(alter.operation.new_interval.unwrap(), 300);
let TriggerOn {
query,
interval,
raw_interval_expr,
} = alter.operation.trigger_on.unwrap();
assert_eq!(query.to_string(), "(SELECT * FROM test_table)");
assert_eq!(raw_interval_expr, "'5 minute'::INTERVAL");
assert_eq!(interval, Duration::from_secs(300));
assert!(alter.operation.rename.is_none());
assert!(alter.operation.label_operations.is_none());
assert!(alter.operation.annotation_operations.is_none());

View File

@@ -16,7 +16,9 @@
pub mod trigger;
use std::collections::HashMap;
use std::time::Duration;
use arrow_buffer::IntervalMonthDayNano;
use common_catalog::consts::default_engine;
use datafusion_common::ScalarValue;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
@@ -58,6 +60,8 @@ pub const AFTER: &str = "AFTER";
pub const INVERTED: &str = "INVERTED";
pub const SKIPPING: &str = "SKIPPING";
pub type RawIntervalExpr = String;
/// Parses create [table] statement
impl<'a> ParserContext<'a> {
pub(crate) fn parse_create(&mut self) -> Result<Statement> {
@@ -348,7 +352,59 @@ impl<'a> ParserContext<'a> {
/// Parse the interval expr to duration in seconds.
fn parse_interval(&mut self) -> Result<i64> {
let interval = self.parse_interval_month_day_nano()?.0;
Ok(
interval.nanoseconds / 1_000_000_000
+ interval.days as i64 * 60 * 60 * 24
+ interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
// this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
// which we use in database to parse i.e. ttl interval and many other intervals
)
}
/// Parses an interval expression and converts it to a standard Rust [`Duration`]
/// and a raw interval expression string.
pub fn parse_interval_to_duration(&mut self) -> Result<(Duration, RawIntervalExpr)> {
let (interval, raw_interval_expr) = self.parse_interval_month_day_nano()?;
let months: i64 = interval.months.into();
let days: i64 = interval.days.into();
let months_in_seconds: i64 = months * 60 * 60 * 24 * 3044 / 1000;
let days_in_seconds: i64 = days * 60 * 60 * 24;
let seconds_from_nanos = interval.nanoseconds / 1_000_000_000;
let total_seconds = months_in_seconds + days_in_seconds + seconds_from_nanos;
let mut nanos_remainder = interval.nanoseconds % 1_000_000_000;
let mut adjusted_seconds = total_seconds;
if nanos_remainder < 0 {
nanos_remainder += 1_000_000_000;
adjusted_seconds -= 1;
}
ensure!(
adjusted_seconds >= 0,
InvalidIntervalSnafu {
reason: "must be a positive interval",
}
);
// Cast safety: `adjusted_seconds` is guaranteed to be non-negative before.
let adjusted_seconds = adjusted_seconds as u64;
// Cast safety: `nanos_remainder` is smaller than 1_000_000_000 which
// is checked above.
let nanos_remainder = nanos_remainder as u32;
Ok((
Duration::new(adjusted_seconds, nanos_remainder),
raw_interval_expr,
))
}
/// Parse interval expr to [`IntervalMonthDayNano`].
fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
let raw_interval_expr = interval_expr.to_string();
let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone())?
.cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
.ok()
@@ -356,13 +412,7 @@ impl<'a> ParserContext<'a> {
reason: format!("cannot cast {} to interval type", interval_expr),
})?;
if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
Ok(
interval.nanoseconds / 1_000_000_000
+ interval.days as i64 * 60 * 60 * 24
+ interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
// this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
// which we use in database to parse i.e. ttl interval and many other intervals
)
Ok((interval, raw_interval_expr))
} else {
unreachable!()
}

View File

@@ -1,7 +1,6 @@
use std::collections::HashMap;
use snafu::{ensure, OptionExt, ResultExt};
use sqlparser::ast::Query;
use sqlparser::keywords::Keyword;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::Token;
@@ -10,7 +9,7 @@ use crate::error;
use crate::error::Result;
use crate::parser::ParserContext;
use crate::statements::create::trigger::{
AlertManagerWebhook, ChannelType, CreateTrigger, NotifyChannel,
AlertManagerWebhook, ChannelType, CreateTrigger, NotifyChannel, TriggerOn,
};
use crate::statements::statement::Statement;
use crate::statements::OptionMap;
@@ -52,8 +51,7 @@ impl<'a> ParserContext<'a> {
let if_not_exists = self.parse_if_not_exist()?;
let trigger_name = self.intern_parse_table_name()?;
let mut may_query = None;
let mut may_interval = None;
let mut may_trigger_on = None;
let mut may_labels = None;
let mut may_annotations = None;
let mut notify_channels = vec![];
@@ -63,9 +61,8 @@ impl<'a> ParserContext<'a> {
match next_token.token {
Token::Word(w) if w.value.eq_ignore_ascii_case(ON) => {
self.parser.next_token();
let (query, interval) = self.parse_trigger_on(true)?;
may_query.replace(query);
may_interval.replace(interval);
let trigger_on = self.parse_trigger_on(true)?;
may_trigger_on.replace(trigger_on);
}
Token::Word(w) if w.value.eq_ignore_ascii_case(LABELS) => {
self.parser.next_token();
@@ -92,8 +89,7 @@ impl<'a> ParserContext<'a> {
}
}
let query = may_query.context(error::MissingClauseSnafu { name: ON })?;
let interval = may_interval.context(error::MissingClauseSnafu { name: ON })?;
let trigger_on = may_trigger_on.context(error::MissingClauseSnafu { name: ON })?;
let labels = may_labels.unwrap_or_default();
let annotations = may_annotations.unwrap_or_default();
@@ -105,8 +101,7 @@ impl<'a> ParserContext<'a> {
let create_trigger = CreateTrigger {
trigger_name,
if_not_exists,
query,
interval,
trigger_on,
labels,
annotations,
channels: notify_channels,
@@ -125,10 +120,7 @@ impl<'a> ParserContext<'a> {
///
/// - `is_first_keyword_matched`: indicates whether the first keyword `ON`
/// has been matched.
pub(crate) fn parse_trigger_on(
&mut self,
is_first_keyword_matched: bool,
) -> Result<(Box<Query>, u64)> {
pub(crate) fn parse_trigger_on(&mut self, is_first_keyword_matched: bool) -> Result<TriggerOn> {
if !is_first_keyword_matched {
if let Token::Word(w) = self.parser.peek_token().token
&& w.value.eq_ignore_ascii_case(ON)
@@ -149,12 +141,13 @@ impl<'a> ParserContext<'a> {
return self.expected("`EVERY` keyword", self.parser.peek_token());
}
let interval = self
.parse_interval()?
.try_into()
.context(error::NegativeIntervalSnafu)?;
let (interval, raw_interval_expr) = self.parse_interval_to_duration()?;
Ok((query, interval))
Ok(TriggerOn {
query,
interval,
raw_interval_expr,
})
}
/// The SQL format as follows:
@@ -380,6 +373,8 @@ impl<'a> ParserContext<'a> {
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::dialect::GreptimeDbDialect;
use crate::statements::create::trigger::ChannelType;
@@ -452,10 +447,20 @@ IF NOT EXISTS cpu_monitor
assert!(create_trigger.if_not_exists);
assert_eq!(create_trigger.trigger_name.to_string(), "cpu_monitor");
assert_eq!(
create_trigger.query.to_string(),
create_trigger.trigger_on.query.to_string(),
"(SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1)"
);
assert_eq!(create_trigger.interval, 300);
let TriggerOn {
query,
interval,
raw_interval_expr,
} = &create_trigger.trigger_on;
assert_eq!(
query.to_string(),
"(SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1)"
);
assert_eq!(*interval, Duration::from_secs(300));
assert_eq!(raw_interval_expr.to_string(), "'5 minute'::INTERVAL");
assert_eq!(create_trigger.labels.len(), 1);
assert_eq!(
create_trigger.labels.get("label_name").unwrap(),
@@ -487,9 +492,14 @@ IF NOT EXISTS cpu_monitor
// Normal.
let sql = "ON (SELECT * FROM cpu_usage) EVERY '5 minute'::INTERVAL";
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
let (query, interval) = ctx.parse_trigger_on(false).unwrap();
let TriggerOn {
query,
interval,
raw_interval_expr: raw_interval,
} = ctx.parse_trigger_on(false).unwrap();
assert_eq!(query.to_string(), "(SELECT * FROM cpu_usage)");
assert_eq!(interval, 300);
assert_eq!(interval, Duration::from_secs(300));
assert_eq!(raw_interval, "'5 minute'::INTERVAL");
// Invalid, since missing `ON` keyword.
let sql = "SELECT * FROM cpu_usage EVERY '5 minute'::INTERVAL";

View File

@@ -1,10 +1,10 @@
use std::fmt::{Display, Formatter};
use serde::Serialize;
use sqlparser::ast::{ObjectName, Query};
use sqlparser::ast::ObjectName;
use sqlparser_derive::{Visit, VisitMut};
use crate::statements::create::trigger::NotifyChannel;
use crate::statements::create::trigger::{NotifyChannel, TriggerOn};
use crate::statements::OptionMap;
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
@@ -16,9 +16,7 @@ pub struct AlterTrigger {
#[derive(Debug, Default, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
pub struct AlterTriggerOperation {
pub rename: Option<String>,
pub new_query: Option<Box<Query>>,
/// The new interval of exec query. Unit is second.
pub new_interval: Option<u64>,
pub trigger_on: Option<TriggerOn>,
pub label_operations: Option<LabelOperations>,
pub annotation_operations: Option<AnnotationOperations>,
pub notify_channel_operations: Option<NotifyChannelOperations>,
@@ -35,12 +33,9 @@ impl Display for AlterTrigger {
write!(f, "RENAME TO {}", new_name)?;
}
if let Some((new_query, new_interval)) =
operation.new_query.as_ref().zip(operation.new_interval)
{
if let Some(trigger_on) = &operation.trigger_on {
writeln!(f)?;
write!(f, "ON {}", new_query)?;
write!(f, " EVERY {} SECONDS", new_interval)?;
write!(f, "{}", trigger_on)?;
}
if let Some(label_ops) = &operation.label_operations {
@@ -319,7 +314,7 @@ ADD NOTIFY
let formatted = format!("{}", trigger);
let expected = r#"ALTER TRIGGER my_trigger
RENAME TO new_trigger
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 2) EVERY 300 SECONDS
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 2) EVERY '5 minute'::INTERVAL
ADD LABELS (k1 = 'v1', k2 = 'v2')
DROP LABELS (k3, k4)
SET ANNOTATIONS (a1 = 'v1', a2 = 'v2')

View File

@@ -1,8 +1,10 @@
use std::fmt::{Display, Formatter};
use std::ops::ControlFlow;
use std::time::Duration;
use itertools::Itertools;
use serde::Serialize;
use sqlparser::ast::Query;
use sqlparser::ast::{Query, Visit, VisitMut, Visitor, VisitorMut};
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{Ident, ObjectName};
@@ -13,10 +15,7 @@ use crate::statements::OptionMap;
pub struct CreateTrigger {
pub trigger_name: ObjectName,
pub if_not_exists: bool,
/// SQL statement executed periodically.
pub query: Box<Query>,
/// The interval of exec query. Unit is second.
pub interval: u64,
pub trigger_on: TriggerOn,
pub labels: OptionMap,
pub annotations: OptionMap,
pub channels: Vec<NotifyChannel>,
@@ -29,8 +28,7 @@ impl Display for CreateTrigger {
write!(f, "IF NOT EXISTS ")?;
}
writeln!(f, "{}", self.trigger_name)?;
write!(f, "ON {} ", self.query)?;
writeln!(f, "EVERY {} SECONDS", self.interval)?;
writeln!(f, "{}", self.trigger_on)?;
if !self.labels.is_empty() {
let labels = self.labels.kv_pairs();
@@ -73,6 +71,33 @@ impl Display for NotifyChannel {
}
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize)]
pub struct TriggerOn {
pub query: Box<Query>,
pub interval: Duration,
pub raw_interval_expr: String,
}
impl Display for TriggerOn {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ON {} EVERY {}", self.query, self.raw_interval_expr)
}
}
impl Visit for TriggerOn {
fn visit<V: Visitor>(&self, visitor: &mut V) -> ControlFlow<V::Break> {
Visit::visit(&self.query, visitor)?;
ControlFlow::Continue(())
}
}
impl VisitMut for TriggerOn {
fn visit<V: VisitorMut>(&mut self, visitor: &mut V) -> ControlFlow<V::Break> {
VisitMut::visit(&mut self.query, visitor)?;
ControlFlow::Continue(())
}
}
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
pub enum ChannelType {
/// Alert manager webhook options.
@@ -94,7 +119,7 @@ mod tests {
#[test]
fn test_display_create_trigger() {
let sql = r#"CREATE TRIGGER IF NOT EXISTS cpu_monitor
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 2) EVERY '5 minute'::INTERVAL
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 2) EVERY '1day 5 minute'::INTERVAL
LABELS (label_name=label_val)
ANNOTATIONS (annotation_name=annotation_val)
NOTIFY
@@ -110,7 +135,7 @@ NOTIFY
};
let formatted = format!("{}", trigger);
let expected = r#"CREATE TRIGGER IF NOT EXISTS cpu_monitor
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 2) EVERY 300 SECONDS
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 2) EVERY '1day 5 minute'::INTERVAL
LABELS (label_name = 'label_val')
ANNOTATIONS (annotation_name = 'annotation_val')
NOTIFY(