fix: show create flow's expire after (#6641)

* fix: show create flow's expire after

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-04 13:03:14 +08:00
committed by Zhenchi
parent 0d603bfc96
commit 9ccc8da231
8 changed files with 162 additions and 6 deletions

1
Cargo.lock generated
View File

@@ -11963,6 +11963,7 @@ dependencies = [
"itertools 0.14.0",
"jsonb",
"lazy_static",
"pretty_assertions",
"regex",
"serde",
"serde_json",

View File

@@ -173,6 +173,7 @@ parking_lot = "0.12"
parquet = { version = "54.2", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.6", features = ["ser"] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
@@ -194,8 +195,7 @@ rstest = "0.25"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
# It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms.
rustls = { version = "0.23.25", default-features = false }
rustls = { version = "0.23.25", default-features = false } # It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms.
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"

View File

@@ -66,6 +66,6 @@ wkt = { version = "0.11", optional = true }
[dev-dependencies]
approx = "0.5"
futures.workspace = true
pretty_assertions = "1.4.0"
pretty_assertions.workspace = true
serde = { version = "1.0", features = ["derive"] }
tokio.workspace = true

View File

@@ -78,7 +78,7 @@ tonic.workspace = true
[dev-dependencies]
catalog.workspace = true
common-catalog.workspace = true
pretty_assertions = "1.4.0"
pretty_assertions.workspace = true
prost.workspace = true
query.workspace = true
serde_json = "1.0"

View File

@@ -86,7 +86,7 @@ nalgebra.workspace = true
num = "0.4"
num-traits = "0.2"
paste.workspace = true
pretty_assertions = "1.4.0"
pretty_assertions.workspace = true
rand.workspace = true
session = { workspace = true, features = ["testing"] }
statrs = "0.16"

View File

@@ -45,3 +45,4 @@ uuid.workspace = true
[dev-dependencies]
common-datasource.workspace = true
pretty_assertions.workspace = true

View File

@@ -1328,6 +1328,160 @@ mod tests {
}
}
#[test]
fn test_parse_create_flow_more_testcases() {
use pretty_assertions::assert_eq;
fn parse_create_flow(sql: &str) -> CreateFlow {
let stmts = ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap();
assert_eq!(1, stmts.len());
match &stmts[0] {
Statement::CreateFlow(c) => c.clone(),
_ => unreachable!(),
}
}
struct CreateFlowWoutQuery {
/// Flow name
pub flow_name: ObjectName,
/// Output (sink) table name
pub sink_table_name: ObjectName,
/// Whether to replace existing task
pub or_replace: bool,
/// Create if not exist
pub if_not_exists: bool,
/// `EXPIRE AFTER`
/// Duration in second as `i64`
pub expire_after: Option<i64>,
/// Comment string
pub comment: Option<String>,
}
let testcases = vec![
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER INTERVAL '5 minutes'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::new("task_1")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER INTERVAL '300 s'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::new("task_1")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER '5 minutes'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::new("task_1")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER '300 s'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::new("task_1")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE FLOW `task_2`
SINK TO schema_1.table_1
EXPIRE AFTER '1 month 2 days 1h 2 min'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![Ident::with_quote('`', "task_2")]),
sink_table_name: ObjectName(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: false,
if_not_exists: false,
expire_after: Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60),
comment: None,
},
),
];
for (sql, expected) in testcases {
let create_task = parse_create_flow(sql);
let expected = CreateFlow {
flow_name: expected.flow_name,
sink_table_name: expected.sink_table_name,
or_replace: expected.or_replace,
if_not_exists: expected.if_not_exists,
expire_after: expected.expire_after,
comment: expected.comment,
// ignore query parse result
query: create_task.query.clone(),
};
assert_eq!(create_task, expected, "input sql is:\n{sql}");
let show_create = create_task.to_string();
let recreated = parse_create_flow(&show_create);
assert_eq!(recreated, expected, "input sql is:\n{show_create}");
}
}
#[test]
fn test_parse_create_flow() {
let sql = r"

View File

@@ -436,7 +436,7 @@ impl Display for CreateFlow {
writeln!(f, "{}", &self.flow_name)?;
writeln!(f, "SINK TO {}", &self.sink_table_name)?;
if let Some(expire_after) = &self.expire_after {
writeln!(f, "EXPIRE AFTER {} ", expire_after)?;
writeln!(f, "EXPIRE AFTER '{} s' ", expire_after)?;
}
if let Some(comment) = &self.comment {
writeln!(f, "COMMENT '{}'", comment)?;