diff --git a/Cargo.lock b/Cargo.lock index b00f44b9f7..8e4e463fb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11963,6 +11963,7 @@ dependencies = [ "itertools 0.14.0", "jsonb", "lazy_static", + "pretty_assertions", "regex", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 4606d4a712..16c7c268f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,6 +173,7 @@ parking_lot = "0.12" parquet = { version = "54.2", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" pin-project = "1.0" +pretty_assertions = "1.4.0" prometheus = { version = "0.13.3", features = ["process"] } promql-parser = { version = "0.6", features = ["ser"] } prost = { version = "0.13", features = ["no-recursion-limit"] } @@ -194,8 +195,7 @@ rstest = "0.25" rstest_reuse = "0.7" rust_decimal = "1.33" rustc-hash = "2.0" -# It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms. -rustls = { version = "0.23.25", default-features = false } +rustls = { version = "0.23.25", default-features = false } # It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms. serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["float_roundtrip"] } serde_with = "3" diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 710a02ad8f..a40bc6c9b3 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -66,6 +66,6 @@ wkt = { version = "0.11", optional = true } [dev-dependencies] approx = "0.5" futures.workspace = true -pretty_assertions = "1.4.0" +pretty_assertions.workspace = true serde = { version = "1.0", features = ["derive"] } tokio.workspace = true diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 1aae903453..5ec3d3da2b 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -78,7 +78,7 @@ tonic.workspace = true [dev-dependencies] catalog.workspace = true common-catalog.workspace = true -pretty_assertions = "1.4.0" +pretty_assertions.workspace = true prost.workspace = true query.workspace = true serde_json = "1.0" diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 9a396f3f8a..adc9b812e4 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -86,7 +86,7 @@ nalgebra.workspace = true num = "0.4" num-traits = "0.2" paste.workspace = true -pretty_assertions = "1.4.0" +pretty_assertions.workspace = true rand.workspace = true session = { workspace = true, features = ["testing"] } statrs = "0.16" diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index b50fbab8db..9fbae28893 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -45,3 +45,4 @@ uuid.workspace = true [dev-dependencies] common-datasource.workspace = true +pretty_assertions.workspace = true diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 30446ee73f..442ea4714f 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -1328,6 +1328,160 @@ mod tests { } } + #[test] + fn test_parse_create_flow_more_testcases() { + use pretty_assertions::assert_eq; + fn parse_create_flow(sql: &str) -> CreateFlow { + let stmts = ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default(), + ) + .unwrap(); + assert_eq!(1, stmts.len()); + match &stmts[0] { + Statement::CreateFlow(c) => c.clone(), + _ => unreachable!(), + } + } + struct CreateFlowWoutQuery { + /// Flow name + pub flow_name: ObjectName, + /// Output (sink) table name + pub sink_table_name: ObjectName, + /// Whether to replace existing task + pub or_replace: bool, + /// Create if not exist + pub if_not_exists: bool, + /// `EXPIRE AFTER` + /// Duration in second as `i64` + pub expire_after: Option, + /// Comment string + pub comment: Option, + } + let testcases = vec![ + ( + r" +CREATE OR REPLACE FLOW IF NOT EXISTS task_1 +SINK TO schema_1.table_1 +EXPIRE AFTER INTERVAL '5 minutes' +COMMENT 'test comment' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![Ident::new("task_1")]), + sink_table_name: ObjectName(vec![ + Ident::new("schema_1"), + Ident::new("table_1"), + ]), + or_replace: true, + if_not_exists: true, + expire_after: Some(300), + comment: Some("test comment".to_string()), + }, + ), + ( + r" +CREATE OR REPLACE FLOW IF NOT EXISTS task_1 +SINK TO schema_1.table_1 +EXPIRE AFTER INTERVAL '300 s' +COMMENT 'test comment' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![Ident::new("task_1")]), + sink_table_name: ObjectName(vec![ + Ident::new("schema_1"), + Ident::new("table_1"), + ]), + or_replace: true, + if_not_exists: true, + expire_after: Some(300), + comment: Some("test comment".to_string()), + }, + ), + ( + r" +CREATE OR REPLACE FLOW IF NOT EXISTS task_1 +SINK TO schema_1.table_1 +EXPIRE AFTER '5 minutes' +COMMENT 'test comment' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![Ident::new("task_1")]), + sink_table_name: ObjectName(vec![ + Ident::new("schema_1"), + Ident::new("table_1"), + ]), + or_replace: true, + if_not_exists: true, + expire_after: Some(300), + comment: Some("test comment".to_string()), + }, + ), + ( + r" +CREATE OR REPLACE FLOW IF NOT EXISTS task_1 +SINK TO schema_1.table_1 +EXPIRE AFTER '300 s' +COMMENT 'test comment' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![Ident::new("task_1")]), + sink_table_name: ObjectName(vec![ + Ident::new("schema_1"), + Ident::new("table_1"), + ]), + or_replace: true, + if_not_exists: true, + expire_after: Some(300), + comment: Some("test comment".to_string()), + }, + ), + ( + r" +CREATE FLOW `task_2` +SINK TO schema_1.table_1 +EXPIRE AFTER '1 month 2 days 1h 2 min' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![Ident::with_quote('`', "task_2")]), + sink_table_name: ObjectName(vec![ + Ident::new("schema_1"), + Ident::new("table_1"), + ]), + or_replace: false, + if_not_exists: false, + expire_after: Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60), + comment: None, + }, + ), + ]; + + for (sql, expected) in testcases { + let create_task = parse_create_flow(sql); + + let expected = CreateFlow { + flow_name: expected.flow_name, + sink_table_name: expected.sink_table_name, + or_replace: expected.or_replace, + if_not_exists: expected.if_not_exists, + expire_after: expected.expire_after, + comment: expected.comment, + // ignore query parse result + query: create_task.query.clone(), + }; + + assert_eq!(create_task, expected, "input sql is:\n{sql}"); + let show_create = create_task.to_string(); + let recreated = parse_create_flow(&show_create); + assert_eq!(recreated, expected, "input sql is:\n{show_create}"); + } + } + #[test] fn test_parse_create_flow() { let sql = r" diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 2550554306..81bb317c6a 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -436,7 +436,7 @@ impl Display for CreateFlow { writeln!(f, "{}", &self.flow_name)?; writeln!(f, "SINK TO {}", &self.sink_table_name)?; if let Some(expire_after) = &self.expire_after { - writeln!(f, "EXPIRE AFTER {} ", expire_after)?; + writeln!(f, "EXPIRE AFTER '{} s' ", expire_after)?; } if let Some(comment) = &self.comment { writeln!(f, "COMMENT '{}'", comment)?;