diff --git a/Cargo.lock b/Cargo.lock index 37b8bb7736..c707a69ed3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5299,7 +5299,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=df2bb74b5990c159dfd5b7a344eecf8f4307af64#df2bb74b5990c159dfd5b7a344eecf8f4307af64" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=66eb089afa6baaa3ddfafabd0a4abbe317d012c3#66eb089afa6baaa3ddfafabd0a4abbe317d012c3" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index 47748c4e40..5b79aef59b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -145,7 +145,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "df2bb74b5990c159dfd5b7a344eecf8f4307af64" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "66eb089afa6baaa3ddfafabd0a4abbe317d012c3" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index f1401df5fe..6123b34d3d 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -242,6 +242,7 @@ mod tests { flow_name: "my_flow".to_string(), raw_sql: "sql".to_string(), expire_after: Some(300), + eval_interval_secs: None, comment: "comment".to_string(), options: Default::default(), created_time: chrono::Utc::now(), diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index ff3f713b6f..5f5424faf0 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -445,6 +445,10 @@ impl From<&CreateFlowData> for CreateRequest { create_if_not_exists: true, or_replace: value.task.or_replace, expire_after: value.task.expire_after.map(|value| ExpireAfter { value }), + eval_interval: value + .task + .eval_interval_secs + .map(|seconds| api::v1::EvalInterval { seconds }), comment: value.task.comment.clone(), sql: value.task.sql.clone(), flow_options: value.task.flow_options.clone(), @@ -464,6 +468,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa flow_name, sink_table_name, expire_after, + eval_interval_secs: eval_interval, comment, sql, flow_options: mut options, @@ -503,6 +508,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa flow_name, raw_sql: sql, expire_after, + eval_interval_secs: eval_interval, comment, options, created_time: create_time, diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index dcbdd85279..f9968bebc9 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -45,6 +45,7 @@ pub(crate) fn test_create_flow_task( or_replace: false, create_if_not_exists, expire_after: Some(300), + eval_interval_secs: None, comment: "".to_string(), sql: "select 1".to_string(), flow_options: Default::default(), @@ -189,6 +190,7 @@ fn create_test_flow_task_for_serialization() -> CreateFlowTask { or_replace: false, create_if_not_exists: false, expire_after: None, + eval_interval_secs: None, comment: "test comment".to_string(), sql: "SELECT * FROM source_table".to_string(), flow_options: HashMap::new(), diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index e4dcffa698..0320022751 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -464,6 +464,7 @@ mod tests { flownode_ids, raw_sql: "raw".to_string(), expire_after: Some(300), + eval_interval_secs: None, comment: "hi".to_string(), options: Default::default(), created_time: chrono::Utc::now(), @@ -638,6 +639,7 @@ mod tests { flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_after: Some(300), + eval_interval_secs: None, comment: "hi".to_string(), options: Default::default(), created_time: chrono::Utc::now(), @@ -1013,6 +1015,7 @@ mod tests { flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_after: Some(300), + eval_interval_secs: None, comment: "hi".to_string(), options: Default::default(), created_time: chrono::Utc::now(), diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index d93ec92c32..9ec60830f4 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -135,6 +135,12 @@ pub struct FlowInfoValue { /// The expr of expire. /// Duration in seconds as `i64`. pub expire_after: Option, + /// The eval interval. + /// Duration in seconds as `i64`. + /// If `None`, will automatically decide when to evaluate the flow. + /// If `Some`, it will be evaluated every `eval_interval` seconds. + #[serde(default)] + pub eval_interval_secs: Option, /// The comment. pub comment: String, /// The options. @@ -191,6 +197,10 @@ impl FlowInfoValue { self.expire_after } + pub fn eval_interval(&self) -> Option { + self.eval_interval_secs + } + pub fn comment(&self) -> &String { &self.comment } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 5273a7aa32..ef0d532914 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -34,8 +34,8 @@ use api::v1::meta::{ }; use api::v1::{ AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, - CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter, - Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr, + CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, + ExpireAfter, Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr, }; use base64::engine::general_purpose; use base64::Engine as _; @@ -1125,6 +1125,7 @@ pub struct CreateFlowTask { pub create_if_not_exists: bool, /// Duration in seconds. Data older than this duration will not be used. pub expire_after: Option, + pub eval_interval_secs: Option, pub comment: String, pub sql: String, pub flow_options: HashMap, @@ -1142,6 +1143,7 @@ impl TryFrom for CreateFlowTask { or_replace, create_if_not_exists, expire_after, + eval_interval, comment, sql, flow_options, @@ -1161,6 +1163,7 @@ impl TryFrom for CreateFlowTask { or_replace, create_if_not_exists, expire_after: expire_after.map(|e| e.value), + eval_interval_secs: eval_interval.map(|e| e.seconds), comment, sql, flow_options, @@ -1178,6 +1181,7 @@ impl From for PbCreateFlowTask { or_replace, create_if_not_exists, expire_after, + eval_interval_secs: eval_interval, comment, sql, flow_options, @@ -1192,6 +1196,7 @@ impl From for PbCreateFlowTask { or_replace, create_if_not_exists, expire_after: expire_after.map(|value| ExpireAfter { value }), + eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }), comment, sql, flow_options, diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 5ec3d3da2b..9e0204d917 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -63,6 +63,7 @@ prost.workspace = true query.workspace = true rand.workspace = true serde.workspace = true +serde_json.workspace = true servers.workspace = true session.workspace = true smallvec.workspace = true @@ -81,6 +82,5 @@ common-catalog.workspace = true pretty_assertions.workspace = true prost.workspace = true query.workspace = true -serde_json = "1.0" session.workspace = true table.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index ff26387986..3e92fc08e5 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -773,6 +773,7 @@ impl StreamingEngine { create_if_not_exists, or_replace, expire_after, + eval_interval: _, comment, sql, flow_options, diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index dfb54a382a..1be268221f 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -318,6 +318,7 @@ impl FlowDualEngine { create_if_not_exists: true, or_replace: true, expire_after: info.expire_after(), + eval_interval: info.eval_interval(), comment: Some(info.comment().clone()), sql: info.raw_sql().clone(), flow_options: info.options().clone(), @@ -770,6 +771,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { sink_table_name: Some(sink_table_name), create_if_not_exists, expire_after, + eval_interval, comment, sql, flow_options, @@ -789,6 +791,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { create_if_not_exists, or_replace, expire_after, + eval_interval: eval_interval.map(|e| e.seconds), comment: Some(comment), sql: sql.clone(), flow_options, diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index f8d2398520..7dc6b4c1bc 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use api::v1::flow::{DirtyWindowRequests, FlowResponse}; use catalog::CatalogManagerRef; @@ -30,6 +31,7 @@ use common_telemetry::{debug, info}; use common_time::TimeToLive; use query::QueryEngineRef; use snafu::{ensure, OptionExt, ResultExt}; +use sql::parsers::utils::is_tql; use store_api::storage::{RegionId, TableId}; use tokio::sync::{oneshot, RwLock}; @@ -40,8 +42,8 @@ use crate::batching_mode::utils::sql_to_df_plan; use crate::batching_mode::BatchingModeOptions; use crate::engine::FlowEngine; use crate::error::{ - ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, - UnexpectedSnafu, UnsupportedSnafu, + CreateFlowSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, InvalidQuerySnafu, + TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, }; use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW; use crate::{CreateFlowArgs, Error, FlowId, TableName}; @@ -335,6 +337,7 @@ impl BatchingEngine { create_if_not_exists, or_replace, expire_after, + eval_interval, comment: _, sql, flow_options, @@ -361,6 +364,25 @@ impl BatchingEngine { } } + let query_ctx = query_ctx.context({ + UnexpectedSnafu { + reason: "Query context is None".to_string(), + } + })?; + let query_ctx = Arc::new(query_ctx); + + // optionally set a eval interval for the flow + if eval_interval.is_none() + && is_tql(query_ctx.sql_dialect(), &sql) + .map_err(BoxedError::new) + .context(CreateFlowSnafu { sql: &sql })? + { + InvalidQuerySnafu { + reason: "TQL query requires EVAL INTERVAL to be set".to_string(), + } + .fail()?; + } + let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY); ensure!( @@ -374,13 +396,6 @@ impl BatchingEngine { } ); - let Some(query_ctx) = query_ctx else { - UnexpectedSnafu { - reason: "Query context is None".to_string(), - } - .fail()? - }; - let query_ctx = Arc::new(query_ctx); let mut source_table_names = Vec::with_capacity(2); for src_id in source_table_ids { // also check table option to see if ttl!=instant @@ -442,6 +457,7 @@ impl BatchingEngine { catalog_manager: self.catalog_manager.clone(), shutdown_rx: rx, batch_opts: self.batch_opts.clone(), + flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)), }; let task = BatchingTask::try_new(task_args)?; diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index ff97a8a410..4ed42a45fa 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -75,11 +75,12 @@ pub struct TaskConfig { pub time_window_expr: Option, /// in seconds pub expire_after: Option, - sink_table_name: [String; 3], + pub sink_table_name: [String; 3], pub source_table_names: HashSet<[String; 3]>, - catalog_manager: CatalogManagerRef, - query_type: QueryType, - batch_opts: Arc, + pub catalog_manager: CatalogManagerRef, + pub query_type: QueryType, + pub batch_opts: Arc, + pub flow_eval_interval: Option, } fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result { @@ -102,7 +103,7 @@ fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result { pub catalog_manager: CatalogManagerRef, pub shutdown_rx: oneshot::Receiver<()>, pub batch_opts: Arc, + pub flow_eval_interval: Option, } pub struct PlanInfo { @@ -150,6 +152,7 @@ impl BatchingTask { catalog_manager, shutdown_rx, batch_opts, + flow_eval_interval, }: TaskArgs<'_>, ) -> Result { Ok(Self { @@ -164,6 +167,7 @@ impl BatchingTask { output_schema: plan.schema().clone(), query_type: determine_query_type(query, &query_ctx)?, batch_opts, + flow_eval_interval, }), state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))), }) @@ -452,6 +456,13 @@ impl BatchingTask { ) { let flow_id_str = self.config.flow_id.to_string(); let mut max_window_cnt = None; + let mut interval = self + .config + .flow_eval_interval + .map(|d| tokio::time::interval(d)); + if let Some(tick) = &mut interval { + tick.tick().await; // pass the first tick immediately + } loop { // first check if shutdown signal is received // if so, break the loop @@ -499,24 +510,33 @@ impl BatchingTask { max_window_cnt = max_window_cnt.map(|cnt| { (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query) }); - let sleep_until = { - let state = self.state.write().unwrap(); - let time_window_size = self - .config - .time_window_expr - .as_ref() - .and_then(|t| *t.time_window_size()); + // here use proper ticking if set eval interval + if let Some(eval_interval) = &mut interval { + eval_interval.tick().await; + } else { + // if not explicitly set, just automatically calculate next start time + // using time window size and more args + let sleep_until = { + let state = self.state.write().unwrap(); - state.get_next_start_query_time( - self.config.flow_id, - &time_window_size, - min_refresh, - Some(self.config.batch_opts.query_timeout), - self.config.batch_opts.experimental_max_filter_num_per_query, - ) + let time_window_size = self + .config + .time_window_expr + .as_ref() + .and_then(|t| *t.time_window_size()); + + state.get_next_start_query_time( + self.config.flow_id, + &time_window_size, + min_refresh, + Some(self.config.batch_opts.query_timeout), + self.config.batch_opts.experimental_max_filter_num_per_query, + ) + }; + + tokio::time::sleep_until(sleep_until).await; }; - tokio::time::sleep_until(sleep_until).await; } // no new data, sleep for some time before checking for new data Ok(None) => { diff --git a/src/flow/src/engine.rs b/src/flow/src/engine.rs index 1687819666..00566ceeaa 100644 --- a/src/flow/src/engine.rs +++ b/src/flow/src/engine.rs @@ -70,6 +70,7 @@ pub struct CreateFlowArgs { pub create_if_not_exists: bool, pub or_replace: bool, pub expire_after: Option, + pub eval_interval: Option, pub comment: Option, pub sql: String, pub flow_options: HashMap, diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index c854f5a9ce..dc06ddf052 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -796,6 +796,8 @@ pub fn to_create_flow_task_expr( }) .collect::>>()?; + let eval_interval = create_flow.eval_interval; + Ok(CreateFlowExpr { catalog_name: query_ctx.current_catalog().to_string(), flow_name: sanitize_flow_name(create_flow.flow_name)?, @@ -804,9 +806,10 @@ pub fn to_create_flow_task_expr( or_replace: create_flow.or_replace, create_if_not_exists: create_flow.if_not_exists, expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }), + eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }), comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), - flow_options: HashMap::new(), + flow_options: Default::default(), }) } @@ -839,6 +842,18 @@ mod tests { let sql = r#" CREATE FLOW calc_reqs SINK TO cnt_reqs AS TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + + assert!( + stmt.is_err(), + "Expected error for invalid TQL EVAL parameters: {:#?}", + stmt + ); + + let sql = r#" +CREATE FLOW calc_reqs SINK TO cnt_reqs AS +TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#; let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) .unwrap() @@ -860,7 +875,7 @@ TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#; ); assert!(expr.source_table_names.is_empty()); assert_eq!( - r#"TQL EVAL (0, 15, '5s') count_values("status_code", http_requests)"#, + r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#, expr.sql ); } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index d90151207e..8cf64dbffc 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::any::Any; -use std::time::Duration; +use std::time::{Duration, TryFromFloatSecsError}; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; @@ -344,6 +344,15 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to convert float seconds to duration, raw: {}", raw))] + TryIntoDuration { + raw: String, + #[snafu(source)] + error: TryFromFloatSecsError, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -368,7 +377,8 @@ impl ErrorExt for Error { | UnsupportedVariable { .. } | ColumnSchemaNoDefault { .. } | CteColumnSchemaMismatch { .. } - | ConvertValue { .. } => StatusCode::InvalidArguments, + | ConvertValue { .. } + | TryIntoDuration { .. } => StatusCode::InvalidArguments, BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 82d5975e50..9e89ada3a9 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -33,7 +33,7 @@ use sql::statements::statement::Statement; use crate::error::{ AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, - QueryParseSnafu, Result, UnimplementedSnafu, + QueryParseSnafu, Result, TryIntoDurationSnafu, UnimplementedSnafu, }; use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED}; @@ -206,7 +206,8 @@ impl QueryLanguageParser { // also report rfc3339 error if float parsing fails .map_err(|_| rfc3339_result.unwrap_err())?; - let duration = Duration::from_secs_f64(secs); + let duration = + Duration::try_from_secs_f64(secs).context(TryIntoDurationSnafu { raw: timestamp })?; SystemTime::UNIX_EPOCH .checked_add(duration) .context(AddSystemTimeOverflowSnafu { duration }) diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index a96184d463..ec241162bd 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -1019,6 +1019,7 @@ pub fn show_create_flow( or_replace: false, if_not_exists: true, expire_after: flow_val.expire_after(), + eval_interval: flow_val.eval_interval(), comment, query, }; diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index b9afb9b4f5..6e2a880348 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -170,7 +170,7 @@ impl ParserContext<'_> { Keyword::NoKeyword if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL => { - self.parse_tql() + self.parse_tql(false) } Keyword::DECLARE => self.parse_declare_cursor(), diff --git a/src/sql/src/parsers.rs b/src/sql/src/parsers.rs index 1d9aeb7dec..e3c41c49b2 100644 --- a/src/sql/src/parsers.rs +++ b/src/sql/src/parsers.rs @@ -31,5 +31,5 @@ pub(crate) mod set_var_parser; pub(crate) mod show_parser; pub(crate) mod tql_parser; pub(crate) mod truncate_parser; -pub(crate) mod utils; +pub mod utils; pub mod with_tql_parser; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index b06ff1a0c7..b20b778cd1 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -39,6 +39,7 @@ use crate::error::{ SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu, }; use crate::parser::{ParserContext, FLOW}; +use crate::parsers::tql_parser; use crate::parsers::utils::{ self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option, }; @@ -295,7 +296,16 @@ impl<'a> ParserContext<'a> { .parser .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)]) { - Some(self.parse_interval()?) + Some(self.parse_interval_no_month("EXPIRE AFTER")?) + } else { + None + }; + + let eval_interval = if self + .parser + .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")]) + { + Some(self.parse_interval_no_month("EVAL INTERVAL")?) } else { None }; @@ -321,10 +331,39 @@ impl<'a> ParserContext<'a> { .expect_keyword(Keyword::AS) .context(SyntaxSnafu)?; + let query = Box::new(self.parse_sql_or_tql(true)?); + + Ok(Statement::CreateFlow(CreateFlow { + flow_name, + sink_table_name: output_table_name, + or_replace, + if_not_exists, + expire_after, + eval_interval, + comment, + query, + })) + } + + fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result { let start_loc = self.parser.peek_token().span.start; let start_index = location_to_index(self.sql, &start_loc); - let query = self.parse_statement()?; + // only accept sql or tql + let query = match self.parser.peek_token().token { + Token::Word(w) => match w.keyword { + Keyword::SELECT => self.parse_query(), + Keyword::NoKeyword + if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL => + { + self.parse_tql(require_now_expr) + } + + _ => self.unsupported(self.peek_token_as_string()), + }, + _ => self.unsupported(self.peek_token_as_string()), + }?; + let end_token = self.parser.peek_token(); let raw_query = if end_token == Token::EOF { @@ -335,23 +374,19 @@ impl<'a> ParserContext<'a> { &self.sql[start_index..end_index.min(self.sql.len())] }; let raw_query = raw_query.trim_end_matches(";"); - - let query = Box::new(SqlOrTql::try_from_statement(query, raw_query)?); - - Ok(Statement::CreateFlow(CreateFlow { - flow_name, - sink_table_name: output_table_name, - or_replace, - if_not_exists, - expire_after, - comment, - query, - })) + let query = SqlOrTql::try_from_statement(query, raw_query)?; + Ok(query) } /// Parse the interval expr to duration in seconds. - fn parse_interval(&mut self) -> Result { + fn parse_interval_no_month(&mut self, context: &str) -> Result { let interval = self.parse_interval_month_day_nano()?.0; + if interval.months != 0 { + return InvalidIntervalSnafu { + reason: format!("Interval with months is not allowed in {context}"), + } + .fail(); + } Ok( interval.nanoseconds / 1_000_000_000 + interval.days as i64 * 60 * 60 * 24 @@ -365,7 +400,7 @@ impl<'a> ParserContext<'a> { fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> { let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?; let raw_interval_expr = interval_expr.to_string(); - let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone())? + let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)? .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano)) .ok() .with_context(|| InvalidIntervalSnafu { @@ -1113,7 +1148,7 @@ mod tests { use common_catalog::consts::FILE_ENGINE; use common_error::ext::ErrorExt; use sqlparser::ast::ColumnOption::NotNull; - use sqlparser::ast::{BinaryOperator, Expr, ObjectName, Value}; + use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value}; use sqlparser::dialect::GenericDialect; use sqlparser::tokenizer::Tokenizer; @@ -1450,7 +1485,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", r" CREATE FLOW `task_2` SINK TO schema_1.table_1 -EXPIRE AFTER '1 month 2 days 1h 2 min' +EXPIRE AFTER '2 days 1h 2 min' AS SELECT max(c1), min(c2) FROM schema_2.table_2;", CreateFlowWoutQuery { @@ -1461,7 +1496,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", ]), or_replace: false, if_not_exists: false, - expire_after: Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60), + expire_after: Some(2 * 86400 + 3600 + 2 * 60), comment: None, }, ), @@ -1476,6 +1511,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", or_replace: expected.or_replace, if_not_exists: expected.if_not_exists, expire_after: expected.expire_after, + eval_interval: None, comment: expected.comment, // ignore query parse result query: create_task.query.clone(), @@ -1490,35 +1526,176 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;", #[test] fn test_parse_create_flow() { - let sql = r" + use pretty_assertions::assert_eq; + fn parse_create_flow(sql: &str) -> CreateFlow { + let stmts = ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default(), + ) + .unwrap(); + assert_eq!(1, stmts.len()); + match &stmts[0] { + Statement::CreateFlow(c) => c.clone(), + _ => panic!("{:?}", stmts[0]), + } + } + struct CreateFlowWoutQuery { + /// Flow name + pub flow_name: ObjectName, + /// Output (sink) table name + pub sink_table_name: ObjectName, + /// Whether to replace existing task + pub or_replace: bool, + /// Create if not exist + pub if_not_exists: bool, + /// `EXPIRE AFTER` + /// Duration in second as `i64` + pub expire_after: Option, + /// Duration for flow evaluation interval + /// Duration in seconds as `i64` + /// If not set, flow will be evaluated based on time window size and other args. + pub eval_interval: Option, + /// Comment string + pub comment: Option, + } + + // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT` + let testcases = vec![ + ( + r" CREATE OR REPLACE FLOW IF NOT EXISTS task_1 SINK TO schema_1.table_1 EXPIRE AFTER INTERVAL '5 minutes' COMMENT 'test comment' AS -SELECT max(c1), min(c2) FROM schema_2.table_2;"; - let stmts = - ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) - .unwrap(); - assert_eq!(1, stmts.len()); - let create_task = match &stmts[0] { - Statement::CreateFlow(c) => c, - _ => unreachable!(), - }; +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]), + sink_table_name: ObjectName(vec![ + ObjectNamePart::Identifier(Ident::new("schema_1")), + ObjectNamePart::Identifier(Ident::new("table_1")), + ]), + or_replace: true, + if_not_exists: true, + expire_after: Some(300), + eval_interval: None, + comment: Some("test comment".to_string()), + }, + ), + ( + r" +CREATE OR REPLACE FLOW IF NOT EXISTS task_1 +SINK TO schema_1.table_1 +EXPIRE AFTER INTERVAL '300 s' +COMMENT 'test comment' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]), + sink_table_name: ObjectName(vec![ + ObjectNamePart::Identifier(Ident::new("schema_1")), + ObjectNamePart::Identifier(Ident::new("table_1")), + ]), + or_replace: true, + if_not_exists: true, + expire_after: Some(300), + eval_interval: None, + comment: Some("test comment".to_string()), + }, + ), + ( + r" +CREATE OR REPLACE FLOW IF NOT EXISTS task_1 +SINK TO schema_1.table_1 +EXPIRE AFTER '5 minutes' +EVAL INTERVAL '10 seconds' +COMMENT 'test comment' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]), + sink_table_name: ObjectName(vec![ + ObjectNamePart::Identifier(Ident::new("schema_1")), + ObjectNamePart::Identifier(Ident::new("table_1")), + ]), + or_replace: true, + if_not_exists: true, + expire_after: Some(300), + eval_interval: Some(10), + comment: Some("test comment".to_string()), + }, + ), + ( + r" +CREATE OR REPLACE FLOW IF NOT EXISTS task_1 +SINK TO schema_1.table_1 +EXPIRE AFTER '5 minutes' +EVAL INTERVAL INTERVAL '10 seconds' +COMMENT 'test comment' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]), + sink_table_name: ObjectName(vec![ + ObjectNamePart::Identifier(Ident::new("schema_1")), + ObjectNamePart::Identifier(Ident::new("table_1")), + ]), + or_replace: true, + if_not_exists: true, + expire_after: Some(300), + eval_interval: Some(10), + comment: Some("test comment".to_string()), + }, + ), + ( + r" +CREATE FLOW `task_2` +SINK TO schema_1.table_1 +EXPIRE AFTER '2 days 1h 2 min' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;", + CreateFlowWoutQuery { + flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote( + '`', "task_2", + ))]), + sink_table_name: ObjectName(vec![ + ObjectNamePart::Identifier(Ident::new("schema_1")), + ObjectNamePart::Identifier(Ident::new("table_1")), + ]), + or_replace: false, + if_not_exists: false, + expire_after: Some(2 * 86400 + 3600 + 2 * 60), + eval_interval: None, + comment: None, + }, + ), + ]; - let expected = CreateFlow { - flow_name: vec![Ident::new("task_1")].into(), - sink_table_name: vec![Ident::new("schema_1"), Ident::new("table_1")].into(), - or_replace: true, - if_not_exists: true, - expire_after: Some(300), - comment: Some("test comment".to_string()), - // ignore query parse result - query: create_task.query.clone(), - }; - assert_eq!(create_task, &expected); + for (sql, expected) in testcases { + let create_task = parse_create_flow(sql); - // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT` + let expected = CreateFlow { + flow_name: expected.flow_name, + sink_table_name: expected.sink_table_name, + or_replace: expected.or_replace, + if_not_exists: expected.if_not_exists, + expire_after: expected.expire_after, + eval_interval: expected.eval_interval, + comment: expected.comment, + // ignore query parse result + query: create_task.query.clone(), + }; + + assert_eq!(create_task, expected, "input sql is:\n{sql}"); + let show_create = create_task.to_string(); + let recreated = parse_create_flow(&show_create); + assert_eq!(recreated, expected, "input sql is:\n{show_create}"); + } + } + + #[test] + fn test_create_flow_no_month() { let sql = r" CREATE FLOW `task_2` SINK TO schema_1.table_1 @@ -1526,21 +1703,15 @@ EXPIRE AFTER '1 month 2 days 1h 2 min' AS SELECT max(c1), min(c2) FROM schema_2.table_2;"; let stmts = - ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) - .unwrap(); - assert_eq!(1, stmts.len()); - let create_task = match &stmts[0] { - Statement::CreateFlow(c) => c, - _ => unreachable!(), - }; - assert!(!create_task.or_replace); - assert!(!create_task.if_not_exists); - assert_eq!( - create_task.expire_after, - Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60) + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + + assert!( + stmts.is_err() + && stmts + .unwrap_err() + .to_string() + .contains("Interval with months is not allowed") ); - assert!(create_task.comment.is_none()); - assert_eq!(create_task.flow_name.to_string(), "`task_2`"); } #[test] diff --git a/src/sql/src/parsers/tql_parser.rs b/src/sql/src/parsers/tql_parser.rs index 71e80d905b..20aabcdb2d 100644 --- a/src/sql/src/parsers/tql_parser.rs +++ b/src/sql/src/parsers/tql_parser.rs @@ -44,7 +44,7 @@ use crate::parsers::error::{ /// - `TQL EXPLAIN [VERBOSE] [FORMAT format] ` /// - `TQL ANALYZE [VERBOSE] [FORMAT format] ` impl ParserContext<'_> { - pub(crate) fn parse_tql(&mut self) -> Result { + pub(crate) fn parse_tql(&mut self, require_now_expr: bool) -> Result { let _ = self.parser.next_token(); match self.parser.peek_token().token { @@ -56,7 +56,7 @@ impl ParserContext<'_> { if (uppercase == EVAL || uppercase == EVALUATE) && w.quote_style.is_none() => { - self.parse_tql_params() + self.parse_tql_params(require_now_expr) .map(|params| Statement::Tql(Tql::Eval(TqlEval::from(params)))) .context(error::TQLSyntaxSnafu) } @@ -67,7 +67,7 @@ impl ParserContext<'_> { let _consume_verbose_token = self.parser.next_token(); } let format = self.parse_format_option(); - self.parse_tql_params() + self.parse_tql_params(require_now_expr) .map(|mut params| { params.is_verbose = is_verbose; params.format = format; @@ -82,7 +82,7 @@ impl ParserContext<'_> { let _consume_verbose_token = self.parser.next_token(); } let format = self.parse_format_option(); - self.parse_tql_params() + self.parse_tql_params(require_now_expr) .map(|mut params| { params.is_verbose = is_verbose; params.format = format; @@ -97,18 +97,35 @@ impl ParserContext<'_> { } } - fn parse_tql_params(&mut self) -> std::result::Result { + /// `require_now_expr` indicates whether the start&end must contain a `now()` function. + fn parse_tql_params( + &mut self, + require_now_expr: bool, + ) -> std::result::Result { let parser = &mut self.parser; let (start, end, step, lookback) = match parser.peek_token().token { Token::LParen => { let _consume_lparen_token = parser.next_token(); - let start = Self::parse_string_or_number_or_word(parser, &[Token::Comma])?.0; - let end = Self::parse_string_or_number_or_word(parser, &[Token::Comma])?.0; + let start = Self::parse_string_or_number_or_word( + parser, + &[Token::Comma], + require_now_expr, + )? + .0; + let end = Self::parse_string_or_number_or_word( + parser, + &[Token::Comma], + require_now_expr, + )? + .0; - let (step, delimiter) = - Self::parse_string_or_number_or_word(parser, &[Token::Comma, Token::RParen])?; + let (step, delimiter) = Self::parse_string_or_number_or_word( + parser, + &[Token::Comma, Token::RParen], + false, + )?; let lookback = if delimiter == Token::Comma { - Self::parse_string_or_number_or_word(parser, &[Token::RParen]) + Self::parse_string_or_number_or_word(parser, &[Token::RParen], false) .ok() .map(|t| t.0) } else { @@ -168,6 +185,7 @@ impl ParserContext<'_> { fn parse_string_or_number_or_word( parser: &mut Parser, delimiter_tokens: &[Token], + require_now_expr: bool, ) -> std::result::Result<(String, Token), TQLError> { let mut tokens = vec![]; @@ -185,19 +203,30 @@ impl ParserContext<'_> { .context(ParserSnafu), 1 => { let value = match tokens[0].clone() { - Token::Number(n, _) => n, - Token::DoubleQuotedString(s) | Token::SingleQuotedString(s) => s, - Token::Word(_) => Self::parse_tokens_to_ts(tokens)?, + Token::Number(n, _) if !require_now_expr => n, + Token::DoubleQuotedString(s) | Token::SingleQuotedString(s) + if !require_now_expr => + { + s + } + Token::Word(_) => Self::parse_tokens_to_ts(tokens, require_now_expr)?, unexpected => { - return Err(ParserError::ParserError(format!( - "Expected number, string or word, but have {unexpected:?}" - ))) - .context(ParserSnafu); + if !require_now_expr { + return Err(ParserError::ParserError(format!( + "Expected number, string or word, but have {unexpected:?}" + ))) + .context(ParserSnafu); + } else { + return Err(ParserError::ParserError(format!( + "Expected expression containing `now()`, but have {unexpected:?}" + ))) + .context(ParserSnafu); + } } }; Ok(value) } - _ => Self::parse_tokens_to_ts(tokens), + _ => Self::parse_tokens_to_ts(tokens, require_now_expr), }; for token in delimiter_tokens { if parser.consume_token(token) { @@ -211,9 +240,12 @@ impl ParserContext<'_> { } /// Parse the tokens to seconds and convert to string. - fn parse_tokens_to_ts(tokens: Vec) -> std::result::Result { + fn parse_tokens_to_ts( + tokens: Vec, + require_now_expr: bool, + ) -> std::result::Result { let parser_expr = Self::parse_to_expr(tokens)?; - let lit = utils::parser_expr_to_scalar_value_literal(parser_expr) + let lit = utils::parser_expr_to_scalar_value_literal(parser_expr, require_now_expr) .map_err(Box::new) .context(ConvertToLogicalExpressionSnafu)?; @@ -281,6 +313,65 @@ mod tests { result.remove(0) } + #[test] + fn test_require_now_expr() { + let sql = "TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + + let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + let statement = parser.parse_tql(true).unwrap(); + match statement { + Statement::Tql(Tql::Eval(eval)) => { + assert_eq!(eval.start, "0"); + assert_eq!(eval.end, "10"); + assert_eq!(eval.step, "1s"); + assert_eq!(eval.lookback, None); + assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); + } + _ => unreachable!(), + }; + + let sql = "TQL EVAL (0, 15, '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + + let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + let statement = parser.parse_tql(true); + assert!( + statement.is_err() + && format!("{:?}", statement) + .contains("Expected expression containing `now()`, but have "), + "statement: {:?}", + statement + ); + + let sql = "TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + + let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + let statement = parser.parse_tql(false).unwrap(); + match statement { + Statement::Tql(Tql::Eval(eval)) => { + assert_eq!(eval.start, "0"); + assert_eq!(eval.end, "10"); + assert_eq!(eval.step, "1s"); + assert_eq!(eval.lookback, None); + assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); + } + _ => unreachable!(), + }; + + let sql = "TQL EVAL (0, 15, '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap(); + let statement = parser.parse_tql(false).unwrap(); + match statement { + Statement::Tql(Tql::Eval(eval)) => { + assert_eq!(eval.start, "0"); + assert_eq!(eval.end, "15"); + assert_eq!(eval.step, "1s"); + assert_eq!(eval.lookback, None); + assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); + } + _ => unreachable!(), + }; + } + #[test] fn test_parse_tql_eval_with_functions() { let sql = "TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; diff --git a/src/sql/src/parsers/utils.rs b/src/sql/src/parsers/utils.rs index 949261a416..9933fe3167 100644 --- a/src/sql/src/parsers/utils.rs +++ b/src/sql/src/parsers/utils.rs @@ -20,10 +20,11 @@ use datafusion::error::Result as DfResult; use datafusion::execution::context::SessionState; use datafusion::execution::SessionStateBuilder; use datafusion::optimizer::simplify_expressions::ExprSimplifier; +use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor}; use datafusion_common::{DFSchema, ScalarValue}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::simplify::SimplifyContext; -use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF}; +use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource, WindowUDF}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use datafusion_sql::TableReference; use datatypes::arrow::datatypes::DataType; @@ -33,26 +34,105 @@ use datatypes::schema::{ COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, }; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; +use sqlparser::dialect::Dialect; use crate::error::{ - ConvertToLogicalExpressionSnafu, ParseSqlValueSnafu, Result, SimplificationSnafu, + ConvertToLogicalExpressionSnafu, InvalidSqlSnafu, ParseSqlValueSnafu, Result, + SimplificationSnafu, }; +use crate::parser::{ParseOptions, ParserContext}; +use crate::statements::statement::Statement; + +/// Check if the given SQL query is a TQL statement. +pub fn is_tql(dialect: &dyn Dialect, sql: &str) -> Result { + let stmts = ParserContext::create_with_dialect(sql, dialect, ParseOptions::default())?; + + ensure!( + stmts.len() == 1, + InvalidSqlSnafu { + msg: format!("Expect only one statement, found {}", stmts.len()) + } + ); + let stmt = &stmts[0]; + match stmt { + Statement::Tql(_) => Ok(true), + _ => Ok(false), + } +} /// Convert a parser expression to a scalar value. This function will try the /// best to resolve and reduce constants. Exprs like `1 + 1` or `now()` can be /// handled properly. -pub fn parser_expr_to_scalar_value_literal(expr: sqlparser::ast::Expr) -> Result { +/// +/// if `require_now_expr` is true, it will ensure that the expression contains a `now()` function. +/// If the expression does not contain `now()`, it will return an error. +/// +pub fn parser_expr_to_scalar_value_literal( + expr: sqlparser::ast::Expr, + require_now_expr: bool, +) -> Result { // 1. convert parser expr to logical expr let empty_df_schema = DFSchema::empty(); let logical_expr = SqlToRel::new(&StubContextProvider::default()) .sql_to_expr(expr.into(), &empty_df_schema, &mut Default::default()) .context(ConvertToLogicalExpressionSnafu)?; + struct FindNow { + found: bool, + } + + impl TreeNodeVisitor<'_> for FindNow { + type Node = Expr; + fn f_down( + &mut self, + node: &Self::Node, + ) -> DfResult { + if let Expr::ScalarFunction(func) = node { + if func.name().to_lowercase() == "now" { + if !func.args.is_empty() { + return Err(datafusion_common::DataFusionError::Plan( + "now() function should not have arguments".to_string(), + )); + } + self.found = true; + return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop); + } + } + Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue) + } + } + + if require_now_expr { + let have_now = { + let mut visitor = FindNow { found: false }; + logical_expr.visit(&mut visitor).unwrap(); + visitor.found + }; + if !have_now { + return ParseSqlValueSnafu { + msg: format!( + "expected now() expression, but not found in {}", + logical_expr + ), + } + .fail(); + } + } + // 2. simplify logical expr let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now()); - let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(empty_df_schema)); - let simplified_expr = ExprSimplifier::new(info) + let info = + SimplifyContext::new(&execution_props).with_schema(Arc::new(empty_df_schema.clone())); + + let simplifier = ExprSimplifier::new(info); + + // Coerce the logical expression so simplifier can handle it correctly. This is necessary for const eval with possible type mismatch. i.e.: `now() - now() + '15s'::interval` which is `TimestampNanosecond - TimestampNanosecond + IntervalMonthDayNano`. + let logical_expr = simplifier + .coerce(logical_expr, &empty_df_schema) + .context(SimplificationSnafu)?; + + let simplified_expr = simplifier .simplify(logical_expr) .context(SimplificationSnafu)?; @@ -177,3 +257,72 @@ pub fn convert_month_day_nano_to_duration( Ok(std::time::Duration::new(adjusted_seconds, nanos_remainder)) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use chrono::DateTime; + use datafusion::functions::datetime::expr_fn::now; + use datafusion_expr::lit; + use datatypes::arrow::datatypes::TimestampNanosecondType; + + use super::*; + + /// Keep this test to make sure we are using datafusion's `ExprSimplifier` correctly. + #[test] + fn test_simplifier() { + let now_time = DateTime::from_timestamp(61, 0).unwrap(); + let lit_now = lit(ScalarValue::new_timestamp::( + now_time.timestamp_nanos_opt(), + None, + )); + let testcases = vec![ + (now(), lit_now), + (now() - now(), lit(ScalarValue::DurationNanosecond(Some(0)))), + ( + now() + lit(ScalarValue::new_interval_dt(0, 1500)), + lit(ScalarValue::new_timestamp::( + Some(62500000000), + None, + )), + ), + ( + now() - (now() + lit(ScalarValue::new_interval_dt(0, 1500))), + lit(ScalarValue::DurationNanosecond(Some(-1500000000))), + ), + // this one failed if type is not coerced + ( + now() - now() + lit(ScalarValue::new_interval_dt(0, 1500)), + lit(ScalarValue::new_interval_mdn(0, 0, 1500000000)), + ), + ( + lit(ScalarValue::new_interval_mdn( + 0, + 0, + 61 * 86400 * 1_000_000_000, + )), + lit(ScalarValue::new_interval_mdn( + 0, + 0, + 61 * 86400 * 1_000_000_000, + )), + ), + ]; + + let execution_props = ExecutionProps::new().with_query_execution_start_time(now_time); + let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty())); + + let simplifier = ExprSimplifier::new(info); + for (expr, expected) in testcases { + let expr_name = expr.schema_name().to_string(); + let expr = simplifier.coerce(expr, &DFSchema::empty()).unwrap(); + + let simplified_expr = simplifier.simplify(expr).unwrap(); + assert_eq!( + simplified_expr, expected, + "Failed to simplify expression: {expr_name}" + ); + } + } +} diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 81bb317c6a..12cd5843d7 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -383,6 +383,10 @@ pub struct CreateFlow { /// `EXPIRE AFTER` /// Duration in second as `i64` pub expire_after: Option, + /// Duration for flow evaluation interval + /// Duration in seconds as `i64` + /// If not set, flow will be evaluated based on time window size and other args. + pub eval_interval: Option, /// Comment string pub comment: Option, /// SQL statement @@ -436,7 +440,10 @@ impl Display for CreateFlow { writeln!(f, "{}", &self.flow_name)?; writeln!(f, "SINK TO {}", &self.sink_table_name)?; if let Some(expire_after) = &self.expire_after { - writeln!(f, "EXPIRE AFTER '{} s' ", expire_after)?; + writeln!(f, "EXPIRE AFTER '{} s'", expire_after)?; + } + if let Some(eval_interval) = &self.eval_interval { + writeln!(f, "EVAL INTERVAL '{} s'", eval_interval)?; } if let Some(comment) = &self.comment { writeln!(f, "COMMENT '{}'", comment)?; diff --git a/tests/cases/standalone/common/flow/flow_tql.result b/tests/cases/distributed/flow-tql/flow_tql.result similarity index 82% rename from tests/cases/standalone/common/flow/flow_tql.result rename to tests/cases/distributed/flow-tql/flow_tql.result index f650da2f91..5edbe5a547 100644 --- a/tests/cases/standalone/common/flow/flow_tql.result +++ b/tests/cases/distributed/flow-tql/flow_tql.result @@ -8,7 +8,7 @@ CREATE TABLE http_requests ( Affected Rows: 0 -CREATE FLOW calc_reqs SINK TO cnt_reqs AS +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests); Affected Rows: 0 @@ -91,9 +91,26 @@ CREATE TABLE http_requests ( Affected Rows: 0 -CREATE FLOW calc_reqs SINK TO cnt_reqs AS +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS TQL EVAL (0, 15, '5s') count_values("status_code", http_requests); +Error: 2000(InvalidSyntax), Invalid TQL syntax: sql parser error: Expected expression containing `now()`, but have Number("0", false) + +-- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative")) +-- so duplicate test into two +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests); + +Error: 3001(EngineExecuteQuery), Failed to convert float seconds to duration, raw: -15 + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests); + +Error: 2000(InvalidSyntax), Invalid TQL syntax: Failed to evaluate TQL expression: Failed to extract a timestamp value IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 15000000000 }") + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests); + Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; diff --git a/tests/cases/standalone/common/flow/flow_tql.sql b/tests/cases/distributed/flow-tql/flow_tql.sql similarity index 76% rename from tests/cases/standalone/common/flow/flow_tql.sql rename to tests/cases/distributed/flow-tql/flow_tql.sql index daa6f9848f..a160e97315 100644 --- a/tests/cases/standalone/common/flow/flow_tql.sql +++ b/tests/cases/distributed/flow-tql/flow_tql.sql @@ -6,7 +6,7 @@ CREATE TABLE http_requests ( PRIMARY KEY(host, idc), ); -CREATE FLOW calc_reqs SINK TO cnt_reqs AS +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests); SHOW CREATE TABLE cnt_reqs; @@ -47,9 +47,21 @@ CREATE TABLE http_requests ( PRIMARY KEY(host, idc), ); -CREATE FLOW calc_reqs SINK TO cnt_reqs AS +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS TQL EVAL (0, 15, '5s') count_values("status_code", http_requests); +-- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative")) +-- so duplicate test into two +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests); + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests); + + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests); + SHOW CREATE TABLE cnt_reqs; INSERT INTO TABLE http_requests VALUES diff --git a/tests/cases/standalone/flow-tql/flow_tql.result b/tests/cases/standalone/flow-tql/flow_tql.result new file mode 100644 index 0000000000..6d20bbfcca --- /dev/null +++ b/tests/cases/standalone/flow-tql/flow_tql.result @@ -0,0 +1,189 @@ +CREATE TABLE http_requests ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests); + +Affected Rows: 0 + +SHOW CREATE TABLE cnt_reqs; + ++----------+-------------------------------------------+ +| Table | Create Table | ++----------+-------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" BIGINT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-------------------------------------------+ + +INSERT INTO TABLE http_requests VALUES + (now() - '17s'::interval, 'host1', 'idc1', 200), + (now() - '17s'::interval, 'host2', 'idc1', 200), + (now() - '17s'::interval, 'host3', 'idc2', 200), + (now() - '17s'::interval, 'host4', 'idc2', 401), + (now() - '13s'::interval, 'host1', 'idc1', 404), + (now() - '13s'::interval, 'host2', 'idc1', 401), + (now() - '13s'::interval, 'host3', 'idc2', 404), + (now() - '13s'::interval, 'host4', 'idc2', 500), + (now() - '7s'::interval, 'host1', 'idc1', 200), + (now() - '7s'::interval, 'host2', 'idc1', 200), + (now() - '7s'::interval, 'host3', 'idc2', 201), + (now() - '7s'::interval, 'host4', 'idc2', 201), + (now() - '3s'::interval, 'host1', 'idc1', 500), + (now() - '3s'::interval, 'host2', 'idc1', 500), + (now() - '3s'::interval, 'host3', 'idc2', 500), + (now() - '3s'::interval, 'host4', 'idc2', 500); + +Affected Rows: 16 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_reqs'); + ++-------------------------------+ +| ADMIN FLUSH_FLOW('calc_reqs') | ++-------------------------------+ +| FLOW_FLUSHED | ++-------------------------------+ + +-- too much indeterminsticity in the test, so just check that the flow is running +SELECT count(*) > 0 FROM cnt_reqs; + ++---------------------+ +| count(*) > Int64(0) | ++---------------------+ +| true | ++---------------------+ + +DROP FLOW calc_reqs; + +Affected Rows: 0 + +DROP TABLE http_requests; + +Affected Rows: 0 + +DROP TABLE cnt_reqs; + +Affected Rows: 0 + +CREATE TABLE http_requests ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (0, 15, '5s') count_values("status_code", http_requests); + +Error: 2000(InvalidSyntax), Invalid TQL syntax: sql parser error: Expected expression containing `now()`, but have Number("0", false) + +-- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative")) +-- so duplicate test into two +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests); + +Error: 3001(EngineExecuteQuery), Failed to convert float seconds to duration, raw: -15: cannot convert float seconds to Duration: value is negative + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests); + +Error: 2000(InvalidSyntax), Invalid TQL syntax: Failed to evaluate TQL expression: Failed to extract a timestamp value IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 15000000000 }") + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests); + +Affected Rows: 0 + +SHOW CREATE TABLE cnt_reqs; + ++----------+-------------------------------------------+ +| Table | Create Table | ++----------+-------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" BIGINT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-------------------------------------------+ + +INSERT INTO TABLE http_requests VALUES + (0::Timestamp, 'host1', 'idc1', 200), + (0::Timestamp, 'host2', 'idc1', 200), + (0::Timestamp, 'host3', 'idc2', 200), + (0::Timestamp, 'host4', 'idc2', 401), + (5000::Timestamp, 'host1', 'idc1', 404), + (5000::Timestamp, 'host2', 'idc1', 401), + (5000::Timestamp, 'host3', 'idc2', 404), + (5000::Timestamp, 'host4', 'idc2', 500), + (10000::Timestamp, 'host1', 'idc1', 200), + (10000::Timestamp, 'host2', 'idc1', 200), + (10000::Timestamp, 'host3', 'idc2', 201), + (10000::Timestamp, 'host4', 'idc2', 201), + (15000::Timestamp, 'host1', 'idc1', 500), + (15000::Timestamp, 'host2', 'idc1', 500), + (15000::Timestamp, 'host3', 'idc2', 500), + (15000::Timestamp, 'host4', 'idc2', 500); + +Affected Rows: 16 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_reqs'); + ++-------------------------------+ +| ADMIN FLUSH_FLOW('calc_reqs') | ++-------------------------------+ +| FLOW_FLUSHED | ++-------------------------------+ + +SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code; + ++--------------------------+---------------------+-------------+ +| count(http_requests.val) | ts | status_code | ++--------------------------+---------------------+-------------+ +| 3 | 1970-01-01T00:00:00 | 200 | +| 1 | 1970-01-01T00:00:00 | 401 | +| 1 | 1970-01-01T00:00:05 | 401 | +| 2 | 1970-01-01T00:00:05 | 404 | +| 1 | 1970-01-01T00:00:05 | 500 | +| 2 | 1970-01-01T00:00:10 | 200 | +| 2 | 1970-01-01T00:00:10 | 201 | +| 4 | 1970-01-01T00:00:15 | 500 | ++--------------------------+---------------------+-------------+ + +DROP FLOW calc_reqs; + +Affected Rows: 0 + +DROP TABLE http_requests; + +Affected Rows: 0 + +DROP TABLE cnt_reqs; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow-tql/flow_tql.sql b/tests/cases/standalone/flow-tql/flow_tql.sql new file mode 100644 index 0000000000..a160e97315 --- /dev/null +++ b/tests/cases/standalone/flow-tql/flow_tql.sql @@ -0,0 +1,92 @@ +CREATE TABLE http_requests ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests); + +SHOW CREATE TABLE cnt_reqs; + +INSERT INTO TABLE http_requests VALUES + (now() - '17s'::interval, 'host1', 'idc1', 200), + (now() - '17s'::interval, 'host2', 'idc1', 200), + (now() - '17s'::interval, 'host3', 'idc2', 200), + (now() - '17s'::interval, 'host4', 'idc2', 401), + (now() - '13s'::interval, 'host1', 'idc1', 404), + (now() - '13s'::interval, 'host2', 'idc1', 401), + (now() - '13s'::interval, 'host3', 'idc2', 404), + (now() - '13s'::interval, 'host4', 'idc2', 500), + (now() - '7s'::interval, 'host1', 'idc1', 200), + (now() - '7s'::interval, 'host2', 'idc1', 200), + (now() - '7s'::interval, 'host3', 'idc2', 201), + (now() - '7s'::interval, 'host4', 'idc2', 201), + (now() - '3s'::interval, 'host1', 'idc1', 500), + (now() - '3s'::interval, 'host2', 'idc1', 500), + (now() - '3s'::interval, 'host3', 'idc2', 500), + (now() - '3s'::interval, 'host4', 'idc2', 500); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_reqs'); + +-- too much indeterminsticity in the test, so just check that the flow is running +SELECT count(*) > 0 FROM cnt_reqs; + +DROP FLOW calc_reqs; +DROP TABLE http_requests; +DROP TABLE cnt_reqs; + +CREATE TABLE http_requests ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (0, 15, '5s') count_values("status_code", http_requests); + +-- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative")) +-- so duplicate test into two +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests); + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests); + + +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests); + +SHOW CREATE TABLE cnt_reqs; + +INSERT INTO TABLE http_requests VALUES + (0::Timestamp, 'host1', 'idc1', 200), + (0::Timestamp, 'host2', 'idc1', 200), + (0::Timestamp, 'host3', 'idc2', 200), + (0::Timestamp, 'host4', 'idc2', 401), + (5000::Timestamp, 'host1', 'idc1', 404), + (5000::Timestamp, 'host2', 'idc1', 401), + (5000::Timestamp, 'host3', 'idc2', 404), + (5000::Timestamp, 'host4', 'idc2', 500), + (10000::Timestamp, 'host1', 'idc1', 200), + (10000::Timestamp, 'host2', 'idc1', 200), + (10000::Timestamp, 'host3', 'idc2', 201), + (10000::Timestamp, 'host4', 'idc2', 201), + (15000::Timestamp, 'host1', 'idc1', 500), + (15000::Timestamp, 'host2', 'idc1', 500), + (15000::Timestamp, 'host3', 'idc2', 500), + (15000::Timestamp, 'host4', 'idc2', 500); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_reqs'); + +SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code; + +DROP FLOW calc_reqs; +DROP TABLE http_requests; +DROP TABLE cnt_reqs;