mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat(trigger): support "for" and "keep_firing_for" (#7087)
* feat: support for and keep_firing_for optiosn in create trigger * upgrade greptime-proto
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5331,7 +5331,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=710f1fa95cf68c1cf4e8b6d757bb418c4fddf4ed#710f1fa95cf68c1cf4e8b6d757bb418c4fddf4ed"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=69a6089933daa573c96808ec4bbc48f447ec6e8c#69a6089933daa573c96808ec4bbc48f447ec6e8c"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
|
||||
@@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "710f1fa95cf68c1cf4e8b6d757bb418c4fddf4ed" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69a6089933daa573c96808ec4bbc48f447ec6e8c" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -12,8 +12,7 @@ use api::v1::{
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, Result, TooLargeDurationSnafu};
|
||||
use crate::rpc::ddl::DdlTask;
|
||||
|
||||
// Create trigger
|
||||
@@ -27,7 +26,11 @@ pub struct CreateTriggerTask {
|
||||
pub labels: HashMap<String, String>,
|
||||
pub annotations: HashMap<String, String>,
|
||||
pub interval: Duration,
|
||||
pub raw_interval_expr: String,
|
||||
pub raw_interval_expr: Option<String>,
|
||||
pub r#for: Option<Duration>,
|
||||
pub for_raw_expr: Option<String>,
|
||||
pub keep_firing_for: Option<Duration>,
|
||||
pub keep_firing_for_raw_expr: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
@@ -62,10 +65,20 @@ impl TryFrom<CreateTriggerTask> for PbCreateTriggerTask {
|
||||
.map(PbNotifyChannel::from)
|
||||
.collect();
|
||||
|
||||
let interval = task
|
||||
.interval
|
||||
.try_into()
|
||||
.context(error::TooLargeDurationSnafu)?;
|
||||
let interval = task.interval.try_into().context(TooLargeDurationSnafu)?;
|
||||
let raw_interval_expr = task.raw_interval_expr.unwrap_or_default();
|
||||
|
||||
let r#for = task
|
||||
.r#for
|
||||
.map(|d| d.try_into().context(TooLargeDurationSnafu))
|
||||
.transpose()?;
|
||||
let for_raw_expr = task.for_raw_expr.unwrap_or_default();
|
||||
|
||||
let keep_firing_for = task
|
||||
.keep_firing_for
|
||||
.map(|d| d.try_into().context(TooLargeDurationSnafu))
|
||||
.transpose()?;
|
||||
let keep_firing_for_raw_expr = task.keep_firing_for_raw_expr.unwrap_or_default();
|
||||
|
||||
let expr = PbCreateTriggerExpr {
|
||||
catalog_name: task.catalog_name,
|
||||
@@ -76,7 +89,11 @@ impl TryFrom<CreateTriggerTask> for PbCreateTriggerTask {
|
||||
labels: task.labels,
|
||||
annotations: task.annotations,
|
||||
interval: Some(interval),
|
||||
raw_interval_expr: task.raw_interval_expr,
|
||||
raw_interval_expr,
|
||||
r#for,
|
||||
for_raw_expr,
|
||||
keep_firing_for,
|
||||
keep_firing_for_raw_expr,
|
||||
};
|
||||
|
||||
Ok(PbCreateTriggerTask {
|
||||
@@ -102,6 +119,26 @@ impl TryFrom<PbCreateTriggerTask> for CreateTriggerTask {
|
||||
let interval = expr.interval.context(error::MissingIntervalSnafu)?;
|
||||
let interval = interval.try_into().context(error::NegativeDurationSnafu)?;
|
||||
|
||||
let r#for = expr
|
||||
.r#for
|
||||
.map(Duration::try_from)
|
||||
.transpose()
|
||||
.context(error::NegativeDurationSnafu)?;
|
||||
|
||||
let keep_firing_for = expr
|
||||
.keep_firing_for
|
||||
.map(Duration::try_from)
|
||||
.transpose()
|
||||
.context(error::NegativeDurationSnafu)?;
|
||||
|
||||
let raw_interval_expr =
|
||||
(!expr.raw_interval_expr.is_empty()).then_some(expr.raw_interval_expr);
|
||||
|
||||
let for_raw_expr = (!expr.for_raw_expr.is_empty()).then_some(expr.for_raw_expr);
|
||||
|
||||
let keep_firing_for_raw_expr =
|
||||
(!expr.keep_firing_for_raw_expr.is_empty()).then_some(expr.keep_firing_for_raw_expr);
|
||||
|
||||
let task = CreateTriggerTask {
|
||||
catalog_name: expr.catalog_name,
|
||||
trigger_name: expr.trigger_name,
|
||||
@@ -111,7 +148,11 @@ impl TryFrom<PbCreateTriggerTask> for CreateTriggerTask {
|
||||
labels: expr.labels,
|
||||
annotations: expr.annotations,
|
||||
interval,
|
||||
raw_interval_expr: expr.raw_interval_expr,
|
||||
raw_interval_expr,
|
||||
r#for,
|
||||
for_raw_expr,
|
||||
keep_firing_for,
|
||||
keep_firing_for_raw_expr,
|
||||
};
|
||||
Ok(task)
|
||||
}
|
||||
@@ -271,7 +312,11 @@ mod tests {
|
||||
.into_iter()
|
||||
.collect(),
|
||||
interval: Duration::from_secs(60),
|
||||
raw_interval_expr: "'1 minute'::INTERVAL".to_string(),
|
||||
raw_interval_expr: Some("'1 minute'::INTERVAL".to_string()),
|
||||
r#for: Duration::from_secs(300).into(),
|
||||
for_raw_expr: Some("'5 minute'::INTERVAL".to_string()),
|
||||
keep_firing_for: Duration::from_secs(600).into(),
|
||||
keep_firing_for_raw_expr: Some("'10 minute'::INTERVAL".to_string()),
|
||||
};
|
||||
|
||||
let pb_task: PbCreateTriggerTask = original.clone().try_into().unwrap();
|
||||
@@ -306,6 +351,14 @@ mod tests {
|
||||
assert_eq!(original.labels, round_tripped.labels);
|
||||
assert_eq!(original.annotations, round_tripped.annotations);
|
||||
assert_eq!(original.interval, round_tripped.interval);
|
||||
assert_eq!(original.raw_interval_expr, round_tripped.raw_interval_expr);
|
||||
assert_eq!(original.r#for, round_tripped.r#for);
|
||||
assert_eq!(original.for_raw_expr, round_tripped.for_raw_expr);
|
||||
assert_eq!(original.keep_firing_for, round_tripped.keep_firing_for);
|
||||
assert_eq!(
|
||||
original.keep_firing_for_raw_expr,
|
||||
round_tripped.keep_firing_for_raw_expr
|
||||
);
|
||||
|
||||
// Invalid, since create_trigger is None and it's required.
|
||||
let invalid_task = PbCreateTriggerTask {
|
||||
|
||||
@@ -6,10 +6,9 @@ use api::v1::{
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ResultExt, ensure};
|
||||
use sql::ast::{ObjectName, ObjectNamePartExt};
|
||||
use sql::statements::create::trigger::{ChannelType, CreateTrigger, TriggerOn};
|
||||
use sql::statements::create::trigger::{ChannelType, CreateTrigger, DurationExpr, TriggerOn};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::error::{Result, TooLargeDurationSnafu};
|
||||
|
||||
pub fn to_create_trigger_task_expr(
|
||||
create_trigger: CreateTrigger,
|
||||
@@ -19,17 +18,13 @@ pub fn to_create_trigger_task_expr(
|
||||
trigger_name,
|
||||
if_not_exists,
|
||||
trigger_on,
|
||||
r#for,
|
||||
keep_firing_for,
|
||||
labels,
|
||||
annotations,
|
||||
channels,
|
||||
} = create_trigger;
|
||||
|
||||
let TriggerOn {
|
||||
query,
|
||||
interval,
|
||||
raw_interval_expr,
|
||||
} = trigger_on;
|
||||
|
||||
let catalog_name = query_ctx.current_catalog().to_string();
|
||||
let trigger_name = sanitize_trigger_name(trigger_name)?;
|
||||
|
||||
@@ -49,11 +44,35 @@ pub fn to_create_trigger_task_expr(
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let TriggerOn {
|
||||
query,
|
||||
query_interval,
|
||||
} = trigger_on;
|
||||
|
||||
let DurationExpr {
|
||||
duration,
|
||||
raw_expr: raw_interval_expr,
|
||||
} = query_interval;
|
||||
|
||||
let (r#for, for_raw_expr) = if let Some(f) = r#for {
|
||||
let duration = f.duration.try_into().context(TooLargeDurationSnafu)?;
|
||||
(Some(duration), f.raw_expr)
|
||||
} else {
|
||||
(None, String::new())
|
||||
};
|
||||
|
||||
let (keep_firing_for, keep_firing_for_raw_expr) = if let Some(k) = keep_firing_for {
|
||||
let duration = k.duration.try_into().context(TooLargeDurationSnafu)?;
|
||||
(Some(duration), k.raw_expr)
|
||||
} else {
|
||||
(None, String::new())
|
||||
};
|
||||
|
||||
let sql = query.to_string();
|
||||
let labels = labels.into_map();
|
||||
let annotations = annotations.into_map();
|
||||
|
||||
let interval = interval.try_into().context(error::TooLargeDurationSnafu)?;
|
||||
let interval = duration.try_into().context(TooLargeDurationSnafu)?;
|
||||
|
||||
Ok(PbCreateTriggerExpr {
|
||||
catalog_name,
|
||||
@@ -65,6 +84,10 @@ pub fn to_create_trigger_task_expr(
|
||||
annotations,
|
||||
interval: Some(interval),
|
||||
raw_interval_expr,
|
||||
r#for,
|
||||
for_raw_expr,
|
||||
keep_firing_for,
|
||||
keep_firing_for_raw_expr,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -544,7 +544,7 @@ mod tests {
|
||||
use crate::parsers::alter_parser::trigger::{apply_label_change, apply_label_replacement};
|
||||
use crate::statements::OptionMap;
|
||||
use crate::statements::alter::trigger::{LabelChange, LabelOperations};
|
||||
use crate::statements::create::trigger::TriggerOn;
|
||||
use crate::statements::create::trigger::{DurationExpr, TriggerOn};
|
||||
use crate::statements::statement::Statement;
|
||||
|
||||
#[test]
|
||||
@@ -569,12 +569,12 @@ mod tests {
|
||||
};
|
||||
let TriggerOn {
|
||||
query,
|
||||
interval,
|
||||
raw_interval_expr,
|
||||
query_interval,
|
||||
} = alter.operation.trigger_on.unwrap();
|
||||
let DurationExpr { duration, raw_expr } = query_interval;
|
||||
assert_eq!(query.to_string(), "(SELECT * FROM test_table)");
|
||||
assert_eq!(raw_interval_expr, "'5 minute'::INTERVAL");
|
||||
assert_eq!(interval, Duration::from_secs(300));
|
||||
assert_eq!(raw_expr, "'5 minute'::INTERVAL");
|
||||
assert_eq!(duration, Duration::from_secs(300));
|
||||
assert!(alter.operation.rename.is_none());
|
||||
assert!(alter.operation.label_operations.is_none());
|
||||
assert!(alter.operation.annotation_operations.is_none());
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::parser::ParserContext;
|
||||
use crate::parsers::utils::convert_month_day_nano_to_duration;
|
||||
use crate::statements::OptionMap;
|
||||
use crate::statements::create::trigger::{
|
||||
AlertManagerWebhook, ChannelType, CreateTrigger, NotifyChannel, TriggerOn,
|
||||
AlertManagerWebhook, ChannelType, CreateTrigger, DurationExpr, NotifyChannel, TriggerOn,
|
||||
};
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::util::parse_option_string;
|
||||
@@ -25,6 +25,8 @@ pub const ANNOTATIONS: &str = "ANNOTATIONS";
|
||||
pub const NOTIFY: &str = "NOTIFY";
|
||||
pub const WEBHOOK: &str = "WEBHOOK";
|
||||
pub const URL: &str = "URL";
|
||||
pub const FOR: &str = "FOR";
|
||||
pub const KEEP_FIRING_FOR: &str = "KEEP_FIRING_FOR";
|
||||
|
||||
const TIMEOUT: &str = "timeout";
|
||||
|
||||
@@ -38,13 +40,15 @@ impl<'a> ParserContext<'a> {
|
||||
/// ```sql
|
||||
/// -- CREATE TRIGGER
|
||||
/// [IF NOT EXISTS] <trigger_name>
|
||||
/// ON (<query_expression>)
|
||||
/// EVERY <interval_expression>
|
||||
/// [LABELS (<label_name>=<label_val>, ...)]
|
||||
/// [ANNOTATIONS (<annotation_name>=<annotation_val>, ...)]
|
||||
/// NOTIFY(
|
||||
/// WEBHOOK <notify_name1> URL '<url1>' [WITH (<parameter1>=<value1>, ...)],
|
||||
/// WEBHOOK <notify_name2> URL '<url2>' [WITH (<parameter2>=<value2>, ...)]
|
||||
/// ON (<query_expression>)
|
||||
/// EVERY <interval_expression>
|
||||
/// [FOR <interval_expression>]
|
||||
/// [KEEP_FIRING_FOR <interval_expression>]
|
||||
/// [LABELS (<label_name>=<label_val>, ...)]
|
||||
/// [ANNOTATIONS (<annotation_name>=<annotation_val>, ...)]
|
||||
/// NOTIFY(
|
||||
/// WEBHOOK <notify_name1> URL '<url1>' [WITH (<parameter1>=<value1>, ...)],
|
||||
/// WEBHOOK <notify_name2> URL '<url2>' [WITH (<parameter2>=<value2>, ...)]
|
||||
/// )
|
||||
/// ```
|
||||
///
|
||||
@@ -57,6 +61,8 @@ impl<'a> ParserContext<'a> {
|
||||
let mut may_labels = None;
|
||||
let mut may_annotations = None;
|
||||
let mut notify_channels = vec![];
|
||||
let mut r#for = None;
|
||||
let mut keep_firing_for = None;
|
||||
|
||||
loop {
|
||||
let next_token = self.parser.peek_token();
|
||||
@@ -81,10 +87,18 @@ impl<'a> ParserContext<'a> {
|
||||
let channels = self.parse_trigger_notify(true)?;
|
||||
notify_channels.extend(channels);
|
||||
}
|
||||
Token::Word(w) if w.value.eq_ignore_ascii_case(FOR) => {
|
||||
self.parser.next_token();
|
||||
r#for.replace(self.parse_trigger_for(true)?);
|
||||
}
|
||||
Token::Word(w) if w.value.eq_ignore_ascii_case(KEEP_FIRING_FOR) => {
|
||||
self.parser.next_token();
|
||||
keep_firing_for.replace(self.parse_trigger_keep_firing_for(true)?);
|
||||
}
|
||||
Token::EOF => break,
|
||||
_ => {
|
||||
return self.expected(
|
||||
"`ON` or `LABELS` or `ANNOTATIONS` or `NOTIFY` keyword",
|
||||
"`ON` or `LABELS` or `ANNOTATIONS` or `NOTIFY` keyword or `FOR` or `KEEP_FIRING_FOR`",
|
||||
next_token,
|
||||
);
|
||||
}
|
||||
@@ -104,6 +118,8 @@ impl<'a> ParserContext<'a> {
|
||||
trigger_name,
|
||||
if_not_exists,
|
||||
trigger_on,
|
||||
r#for,
|
||||
keep_firing_for,
|
||||
labels,
|
||||
annotations,
|
||||
channels: notify_channels,
|
||||
@@ -149,7 +165,7 @@ impl<'a> ParserContext<'a> {
|
||||
return self.expected("`EVERY` keyword", self.parser.peek_token());
|
||||
}
|
||||
|
||||
let (month_day_nano, raw_interval_expr) = self.parse_interval_month_day_nano()?;
|
||||
let (month_day_nano, raw_expr) = self.parse_interval_month_day_nano()?;
|
||||
|
||||
// Trigger Interval (month_day_nano): the months field is prohibited,
|
||||
// as the length of a month is ambiguous.
|
||||
@@ -169,13 +185,90 @@ impl<'a> ParserContext<'a> {
|
||||
interval
|
||||
};
|
||||
|
||||
let query_interval = DurationExpr {
|
||||
duration: interval,
|
||||
raw_expr,
|
||||
};
|
||||
|
||||
Ok(TriggerOn {
|
||||
query,
|
||||
interval,
|
||||
raw_interval_expr,
|
||||
query_interval,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn parse_trigger_for(
|
||||
&mut self,
|
||||
is_first_keyword_matched: bool,
|
||||
) -> Result<DurationExpr> {
|
||||
if !is_first_keyword_matched {
|
||||
if let Token::Word(w) = self.parser.peek_token().token
|
||||
&& w.value.eq_ignore_ascii_case(FOR)
|
||||
{
|
||||
self.parser.next_token();
|
||||
} else {
|
||||
return self.expected("`FOR` keyword", self.parser.peek_token());
|
||||
}
|
||||
}
|
||||
|
||||
let (month_day_nano, raw_expr) = self.parse_interval_month_day_nano()?;
|
||||
|
||||
// Trigger Interval (month_day_nano): the months field is prohibited,
|
||||
// as the length of a month is ambiguous.
|
||||
ensure!(
|
||||
month_day_nano.months == 0,
|
||||
error::InvalidIntervalSnafu {
|
||||
reason: "year and month is not supported in trigger FOR duration".to_string()
|
||||
}
|
||||
);
|
||||
|
||||
let duration = convert_month_day_nano_to_duration(month_day_nano)?;
|
||||
|
||||
let duration = if duration < Duration::from_secs(1) {
|
||||
Duration::from_secs(1)
|
||||
} else {
|
||||
duration
|
||||
};
|
||||
|
||||
Ok(DurationExpr { duration, raw_expr })
|
||||
}
|
||||
|
||||
pub(crate) fn parse_trigger_keep_firing_for(
|
||||
&mut self,
|
||||
is_first_keyword_matched: bool,
|
||||
) -> Result<DurationExpr> {
|
||||
if !is_first_keyword_matched {
|
||||
if let Token::Word(w) = self.parser.peek_token().token
|
||||
&& w.value.eq_ignore_ascii_case(KEEP_FIRING_FOR)
|
||||
{
|
||||
self.parser.next_token();
|
||||
} else {
|
||||
return self.expected("`KEEP_FIRING_FOR` keyword", self.parser.peek_token());
|
||||
}
|
||||
}
|
||||
|
||||
let (month_day_nano, raw_expr) = self.parse_interval_month_day_nano()?;
|
||||
|
||||
// Trigger Interval (month_day_nano): the months field is prohibited,
|
||||
// as the length of a month is ambiguous.
|
||||
ensure!(
|
||||
month_day_nano.months == 0,
|
||||
error::InvalidIntervalSnafu {
|
||||
reason: "year and month is not supported in trigger KEEP_FIRING_FOR duration"
|
||||
.to_string()
|
||||
}
|
||||
);
|
||||
|
||||
let duration = convert_month_day_nano_to_duration(month_day_nano)?;
|
||||
|
||||
let duration = if duration < Duration::from_secs(1) {
|
||||
Duration::from_secs(1)
|
||||
} else {
|
||||
duration
|
||||
};
|
||||
|
||||
Ok(DurationExpr { duration, raw_expr })
|
||||
}
|
||||
|
||||
/// The SQL format as follows:
|
||||
///
|
||||
/// ```sql
|
||||
@@ -436,6 +529,8 @@ IF NOT EXISTS cpu_monitor
|
||||
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1)
|
||||
EVERY '5 minute'::INTERVAL
|
||||
LABELS (label_name=label_val)
|
||||
FOR '1ms'::INTERVAL
|
||||
KEEP_FIRING_FOR '10 minute'::INTERVAL
|
||||
ANNOTATIONS (annotation_name=annotation_val)
|
||||
NOTIFY(
|
||||
WEBHOOK alert_manager_1 URL 'http://127.0.0.1:9093' WITH (timeout='1m'),
|
||||
@@ -452,6 +547,8 @@ IF NOT EXISTS cpu_monitor
|
||||
)
|
||||
LABELS (label_name=label_val)
|
||||
ANNOTATIONS (annotation_name=annotation_val)
|
||||
KEEP_FIRING_FOR '10 minute'::INTERVAL
|
||||
FOR '1ms'::INTERVAL
|
||||
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1)
|
||||
EVERY '5 minute'::INTERVAL
|
||||
"#
|
||||
@@ -476,15 +573,14 @@ IF NOT EXISTS cpu_monitor
|
||||
);
|
||||
let TriggerOn {
|
||||
query,
|
||||
interval,
|
||||
raw_interval_expr,
|
||||
query_interval,
|
||||
} = &create_trigger.trigger_on;
|
||||
assert_eq!(
|
||||
query.to_string(),
|
||||
"(SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 1)"
|
||||
);
|
||||
assert_eq!(*interval, Duration::from_secs(300));
|
||||
assert_eq!(raw_interval_expr.clone(), "'5 minute'::INTERVAL");
|
||||
assert_eq!(query_interval.duration, Duration::from_secs(300));
|
||||
assert_eq!(query_interval.raw_expr.clone(), "'5 minute'::INTERVAL");
|
||||
assert_eq!(create_trigger.labels.len(), 1);
|
||||
assert_eq!(
|
||||
create_trigger.labels.get("label_name").unwrap(),
|
||||
@@ -509,6 +605,13 @@ IF NOT EXISTS cpu_monitor
|
||||
assert_eq!(webhook2.url.to_string(), "'http://127.0.0.1:9094'");
|
||||
assert_eq!(webhook2.options.len(), 1);
|
||||
assert_eq!(webhook2.options.get("timeout").unwrap(), "2m");
|
||||
|
||||
let r#for = create_trigger.r#for.as_ref().unwrap();
|
||||
assert_eq!(r#for.duration, Duration::from_secs(1));
|
||||
assert_eq!(r#for.raw_expr, "'1ms'::INTERVAL");
|
||||
let keep_firing_for = create_trigger.keep_firing_for.as_ref().unwrap();
|
||||
assert_eq!(keep_firing_for.duration, Duration::from_secs(600));
|
||||
assert_eq!(keep_firing_for.raw_expr, "'10 minute'::INTERVAL");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -518,12 +621,11 @@ IF NOT EXISTS cpu_monitor
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let TriggerOn {
|
||||
query,
|
||||
interval,
|
||||
raw_interval_expr: raw_interval,
|
||||
query_interval: interval,
|
||||
} = ctx.parse_trigger_on(false).unwrap();
|
||||
assert_eq!(query.to_string(), "(SELECT * FROM cpu_usage)");
|
||||
assert_eq!(interval, Duration::from_secs(300));
|
||||
assert_eq!(raw_interval, "'5 minute'::INTERVAL");
|
||||
assert_eq!(interval.duration, Duration::from_secs(300));
|
||||
assert_eq!(interval.raw_expr, "'5 minute'::INTERVAL");
|
||||
|
||||
// Invalid, since missing `ON` keyword.
|
||||
let sql = "SELECT * FROM cpu_usage EVERY '5 minute'::INTERVAL";
|
||||
@@ -559,7 +661,7 @@ IF NOT EXISTS cpu_monitor
|
||||
let sql = "ON (SELECT * FROM cpu_usage) EVERY '1ms'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let trigger_on = ctx.parse_trigger_on(false).unwrap();
|
||||
assert_eq!(trigger_on.interval, Duration::from_secs(1));
|
||||
assert_eq!(trigger_on.query_interval.duration, Duration::from_secs(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -739,4 +841,66 @@ IF NOT EXISTS cpu_monitor
|
||||
assert!(validate_webhook_option(TIMEOUT));
|
||||
assert!(!validate_webhook_option("invalid_option"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_trigger_for() {
|
||||
// Normal.
|
||||
let sql = "FOR '10 minute'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let expr = ctx.parse_trigger_for(false).unwrap();
|
||||
assert_eq!(expr.duration, Duration::from_secs(600));
|
||||
assert_eq!(expr.raw_expr, "'10 minute'::INTERVAL");
|
||||
|
||||
// Invalid, missing FOR keyword.
|
||||
let sql = "'10 minute'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
assert!(ctx.parse_trigger_for(false).is_err());
|
||||
|
||||
// Invalid, year not allowed.
|
||||
let sql = "FOR '1 year'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
assert!(ctx.parse_trigger_for(false).is_err());
|
||||
|
||||
// Invalid, month not allowed.
|
||||
let sql = "FOR '1 month'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
assert!(ctx.parse_trigger_for(false).is_err());
|
||||
|
||||
// Valid, interval less than 1 second is clamped.
|
||||
let sql = "FOR '1ms'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let expr = ctx.parse_trigger_for(false).unwrap();
|
||||
assert_eq!(expr.duration, Duration::from_secs(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_trigger_keep_firing_for() {
|
||||
// Normal.
|
||||
let sql = "KEEP_FIRING_FOR '10 minute'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let expr = ctx.parse_trigger_keep_firing_for(false).unwrap();
|
||||
assert_eq!(expr.duration, Duration::from_secs(600));
|
||||
assert_eq!(expr.raw_expr, "'10 minute'::INTERVAL");
|
||||
|
||||
// Invalid, missing KEEP_FIRING_FOR keyword.
|
||||
let sql = "'10 minute'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
assert!(ctx.parse_trigger_keep_firing_for(false).is_err());
|
||||
|
||||
// Invalid, year not allowed.
|
||||
let sql = "KEEP_FIRING_FOR '1 year'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
assert!(ctx.parse_trigger_keep_firing_for(false).is_err());
|
||||
|
||||
// Invalid, month not allowed.
|
||||
let sql = "KEEP_FIRING_FOR '1 month'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
assert!(ctx.parse_trigger_keep_firing_for(false).is_err());
|
||||
|
||||
// Valid, interval less than 1 second is clamped.
|
||||
let sql = "KEEP_FIRING_FOR '1ms'::INTERVAL";
|
||||
let mut ctx = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
let expr = ctx.parse_trigger_keep_firing_for(false).unwrap();
|
||||
assert_eq!(expr.duration, Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ pub struct CreateTrigger {
|
||||
pub trigger_name: ObjectName,
|
||||
pub if_not_exists: bool,
|
||||
pub trigger_on: TriggerOn,
|
||||
pub r#for: Option<DurationExpr>,
|
||||
pub keep_firing_for: Option<DurationExpr>,
|
||||
pub labels: OptionMap,
|
||||
pub annotations: OptionMap,
|
||||
pub channels: Vec<NotifyChannel>,
|
||||
@@ -30,6 +32,14 @@ impl Display for CreateTrigger {
|
||||
writeln!(f, "{}", self.trigger_name)?;
|
||||
writeln!(f, " {}", self.trigger_on)?;
|
||||
|
||||
if let Some(r#for) = &self.r#for {
|
||||
writeln!(f, " FOR {}", r#for)?;
|
||||
}
|
||||
|
||||
if let Some(keep_firing_for) = &self.keep_firing_for {
|
||||
writeln!(f, " KEEP_FIRING_FOR {}", keep_firing_for)?;
|
||||
}
|
||||
|
||||
if !self.labels.is_empty() {
|
||||
let labels = self.labels.kv_pairs();
|
||||
writeln!(f, " LABELS ({})", format_list_comma!(labels))?;
|
||||
@@ -73,30 +83,48 @@ impl Display for NotifyChannel {
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a duration expression with its parsed `Duration` and the original
|
||||
/// raw string. And the struct implements `Visit` and `VisitMut` traits as no-op.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize)]
|
||||
pub struct DurationExpr {
|
||||
pub duration: Duration,
|
||||
pub raw_expr: String,
|
||||
}
|
||||
|
||||
impl Display for DurationExpr {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
if self.raw_expr.is_empty() {
|
||||
// Fallback to display duration if raw_expr is empty.
|
||||
// Display in seconds since we limit the min-duration to 1 second
|
||||
// in SQL parser.
|
||||
write!(f, "{} seconds", self.duration.as_secs())
|
||||
} else {
|
||||
write!(f, "{}", self.raw_expr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for DurationExpr {
|
||||
fn visit<V: Visitor>(&self, _visitor: &mut V) -> ControlFlow<V::Break> {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
}
|
||||
|
||||
impl VisitMut for DurationExpr {
|
||||
fn visit<V: VisitorMut>(&mut self, _visitor: &mut V) -> ControlFlow<V::Break> {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
|
||||
pub struct TriggerOn {
|
||||
pub query: Box<Query>,
|
||||
pub interval: Duration,
|
||||
pub raw_interval_expr: String,
|
||||
pub query_interval: DurationExpr,
|
||||
}
|
||||
|
||||
impl Display for TriggerOn {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "ON {} EVERY {}", self.query, self.raw_interval_expr)
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for TriggerOn {
|
||||
fn visit<V: Visitor>(&self, visitor: &mut V) -> ControlFlow<V::Break> {
|
||||
Visit::visit(&self.query, visitor)?;
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
}
|
||||
|
||||
impl VisitMut for TriggerOn {
|
||||
fn visit<V: VisitorMut>(&mut self, visitor: &mut V) -> ControlFlow<V::Break> {
|
||||
VisitMut::visit(&mut self.query, visitor)?;
|
||||
ControlFlow::Continue(())
|
||||
write!(f, "ON {} EVERY {}", self.query, self.query_interval)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,6 +142,8 @@ pub struct AlertManagerWebhook {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::dialect::GreptimeDbDialect;
|
||||
use crate::parser::{ParseOptions, ParserContext};
|
||||
use crate::statements::statement::Statement;
|
||||
@@ -122,6 +152,8 @@ mod tests {
|
||||
fn test_display_create_trigger() {
|
||||
let sql = r#"CREATE TRIGGER IF NOT EXISTS cpu_monitor
|
||||
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 2) EVERY '1day 5 minute'::INTERVAL
|
||||
FOR '5 minute'::INTERVAL
|
||||
KEEP_FIRING_FOR '10 minute'::INTERVAL
|
||||
LABELS (label_name=label_val)
|
||||
ANNOTATIONS (annotation_name=annotation_val)
|
||||
NOTIFY
|
||||
@@ -141,6 +173,8 @@ WEBHOOK alert_manager2 URL 'http://127.0.0.1:9093' WITH (timeout='1m')
|
||||
let formatted = format!("{}", trigger);
|
||||
let expected = r#"CREATE TRIGGER IF NOT EXISTS cpu_monitor
|
||||
ON (SELECT host AS host_label, cpu, memory FROM machine_monitor WHERE cpu > 2) EVERY '1day 5 minute'::INTERVAL
|
||||
FOR '5 minute'::INTERVAL
|
||||
KEEP_FIRING_FOR '10 minute'::INTERVAL
|
||||
LABELS (label_name = 'label_val')
|
||||
ANNOTATIONS (annotation_name = 'annotation_val')
|
||||
NOTIFY(
|
||||
@@ -149,4 +183,19 @@ WEBHOOK alert_manager2 URL 'http://127.0.0.1:9093' WITH (timeout='1m')
|
||||
)"#;
|
||||
assert_eq!(expected, formatted);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duration_expr_display() {
|
||||
let duration_expr = super::DurationExpr {
|
||||
duration: Duration::from_secs(300),
|
||||
raw_expr: "'5 minute'::INTERVAL".to_string(),
|
||||
};
|
||||
assert_eq!(duration_expr.to_string(), "'5 minute'::INTERVAL");
|
||||
|
||||
let duration_expr_no_raw = super::DurationExpr {
|
||||
duration: Duration::from_secs(600),
|
||||
raw_expr: "".to_string(),
|
||||
};
|
||||
assert_eq!(duration_expr_no_raw.to_string(), "600 seconds");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user