fix: show create flow's expire after (#6641)

* fix: show create flow's expire after

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-04 13:03:14 +08:00
committed by GitHub
parent bed0c1e55f
commit a3e55565dc
8 changed files with 162 additions and 6 deletions

View File

@@ -45,3 +45,4 @@ uuid.workspace = true
[dev-dependencies]
common-datasource.workspace = true
pretty_assertions.workspace = true

View File

@@ -1328,6 +1328,160 @@ mod tests {
}
}
#[test]
fn test_parse_create_flow_more_testcases() {
use pretty_assertions::assert_eq;
fn parse_create_flow(sql: &str) -> CreateFlow {
let stmts = ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap();
assert_eq!(1, stmts.len());
match &stmts[0] {
Statement::CreateFlow(c) => c.clone(),
_ => unreachable!(),
}
}
struct CreateFlowWoutQuery {
/// Flow name
pub flow_name: ObjectName,
/// Output (sink) table name
pub sink_table_name: ObjectName,
/// Whether to replace existing task
pub or_replace: bool,
/// Create if not exist
pub if_not_exists: bool,
/// `EXPIRE AFTER`
/// Duration in second as `i64`
pub expire_after: Option<i64>,
/// Comment string
pub comment: Option<String>,
}
let testcases = vec![
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER INTERVAL '5 minutes'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::new("task_1")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER INTERVAL '300 s'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::new("task_1")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER '5 minutes'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::new("task_1")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER '300 s'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::new("task_1")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE FLOW `task_2`
SINK TO schema_1.table_1
EXPIRE AFTER '1 month 2 days 1h 2 min'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::with_quote('`', "task_2")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: false,
if_not_exists: false,
expire_after: Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60),
comment: None,
},
),
];
for (sql, expected) in testcases {
let create_task = parse_create_flow(sql);
let expected = CreateFlow {
flow_name: expected.flow_name,
sink_table_name: expected.sink_table_name,
or_replace: expected.or_replace,
if_not_exists: expected.if_not_exists,
expire_after: expected.expire_after,
comment: expected.comment,
// ignore query parse result
query: create_task.query.clone(),
};
assert_eq!(create_task, expected, "input sql is:\n{sql}");
let show_create = create_task.to_string();
let recreated = parse_create_flow(&show_create);
assert_eq!(recreated, expected, "input sql is:\n{show_create}");
}
}
#[test]
fn test_parse_create_flow() {
let sql = r"

View File

@@ -436,7 +436,7 @@ impl Display for CreateFlow {
writeln!(f, "{}", &self.flow_name)?;
writeln!(f, "SINK TO {}", &self.sink_table_name)?;
if let Some(expire_after) = &self.expire_after {
writeln!(f, "EXPIRE AFTER {} ", expire_after)?;
writeln!(f, "EXPIRE AFTER '{} s' ", expire_after)?;
}
if let Some(comment) = &self.comment {
writeln!(f, "COMMENT '{}'", comment)?;