feat(flow): add eval interval option (#6623)

* feat: add flow eval interval

Signed-off-by: discord9 <discord9@163.com>

* feat: tql flow must have eval interval

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* test: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* wip

Signed-off-by: discord9 <discord9@163.com>

* wip

Signed-off-by: discord9 <discord9@163.com>

* feat: check for now func

Signed-off-by: discord9 <discord9@163.com>

* refactor: use ms instead

Signed-off-by: discord9 <discord9@163.com>

* fix: not panic&proper simplifier

Signed-off-by: discord9 <discord9@163.com>

* test: update to fix

Signed-off-by: discord9 <discord9@163.com>

* feat: not allow month in interval

Signed-off-by: discord9 <discord9@163.com>

* test: update remov months

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* feat: use seconds and add to field instead

Signed-off-by: discord9 <discord9@163.com>

* chore: aft rebase fix

Signed-off-by: discord9 <discord9@163.com>

* fix: add check for month

Signed-off-by: discord9 <discord9@163.com>

* chore: fmt

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: rm clone per review

Signed-off-by: discord9 <discord9@163.com>

* chore: update proto

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-27 17:44:32 +08:00
committed by GitHub
parent 5ef4dd1743
commit 8452a9d579
28 changed files with 951 additions and 128 deletions

2
Cargo.lock generated
View File

@@ -5299,7 +5299,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=df2bb74b5990c159dfd5b7a344eecf8f4307af64#df2bb74b5990c159dfd5b7a344eecf8f4307af64"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=66eb089afa6baaa3ddfafabd0a4abbe317d012c3#66eb089afa6baaa3ddfafabd0a4abbe317d012c3"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",

View File

@@ -145,7 +145,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "df2bb74b5990c159dfd5b7a344eecf8f4307af64" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "66eb089afa6baaa3ddfafabd0a4abbe317d012c3" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -242,6 +242,7 @@ mod tests {
flow_name: "my_flow".to_string(),
raw_sql: "sql".to_string(),
expire_after: Some(300),
eval_interval_secs: None,
comment: "comment".to_string(),
options: Default::default(),
created_time: chrono::Utc::now(),

View File

@@ -445,6 +445,10 @@ impl From<&CreateFlowData> for CreateRequest {
create_if_not_exists: true,
or_replace: value.task.or_replace,
expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
eval_interval: value
.task
.eval_interval_secs
.map(|seconds| api::v1::EvalInterval { seconds }),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
flow_options: value.task.flow_options.clone(),
@@ -464,6 +468,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
flow_name,
sink_table_name,
expire_after,
eval_interval_secs: eval_interval,
comment,
sql,
flow_options: mut options,
@@ -503,6 +508,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
flow_name,
raw_sql: sql,
expire_after,
eval_interval_secs: eval_interval,
comment,
options,
created_time: create_time,

View File

@@ -45,6 +45,7 @@ pub(crate) fn test_create_flow_task(
or_replace: false,
create_if_not_exists,
expire_after: Some(300),
eval_interval_secs: None,
comment: "".to_string(),
sql: "select 1".to_string(),
flow_options: Default::default(),
@@ -189,6 +190,7 @@ fn create_test_flow_task_for_serialization() -> CreateFlowTask {
or_replace: false,
create_if_not_exists: false,
expire_after: None,
eval_interval_secs: None,
comment: "test comment".to_string(),
sql: "SELECT * FROM source_table".to_string(),
flow_options: HashMap::new(),

View File

@@ -464,6 +464,7 @@ mod tests {
flownode_ids,
raw_sql: "raw".to_string(),
expire_after: Some(300),
eval_interval_secs: None,
comment: "hi".to_string(),
options: Default::default(),
created_time: chrono::Utc::now(),
@@ -638,6 +639,7 @@ mod tests {
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_after: Some(300),
eval_interval_secs: None,
comment: "hi".to_string(),
options: Default::default(),
created_time: chrono::Utc::now(),
@@ -1013,6 +1015,7 @@ mod tests {
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_after: Some(300),
eval_interval_secs: None,
comment: "hi".to_string(),
options: Default::default(),
created_time: chrono::Utc::now(),

View File

@@ -135,6 +135,12 @@ pub struct FlowInfoValue {
/// The expr of expire.
/// Duration in seconds as `i64`.
pub expire_after: Option<i64>,
/// The eval interval.
/// Duration in seconds as `i64`.
/// If `None`, will automatically decide when to evaluate the flow.
/// If `Some`, it will be evaluated every `eval_interval` seconds.
#[serde(default)]
pub eval_interval_secs: Option<i64>,
/// The comment.
pub comment: String,
/// The options.
@@ -191,6 +197,10 @@ impl FlowInfoValue {
self.expire_after
}
pub fn eval_interval(&self) -> Option<i64> {
self.eval_interval_secs
}
pub fn comment(&self) -> &String {
&self.comment
}

View File

@@ -34,8 +34,8 @@ use api::v1::meta::{
};
use api::v1::{
AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval,
ExpireAfter, Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
@@ -1125,6 +1125,7 @@ pub struct CreateFlowTask {
pub create_if_not_exists: bool,
/// Duration in seconds. Data older than this duration will not be used.
pub expire_after: Option<i64>,
pub eval_interval_secs: Option<i64>,
pub comment: String,
pub sql: String,
pub flow_options: HashMap<String, String>,
@@ -1142,6 +1143,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
or_replace,
create_if_not_exists,
expire_after,
eval_interval,
comment,
sql,
flow_options,
@@ -1161,6 +1163,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
or_replace,
create_if_not_exists,
expire_after: expire_after.map(|e| e.value),
eval_interval_secs: eval_interval.map(|e| e.seconds),
comment,
sql,
flow_options,
@@ -1178,6 +1181,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
or_replace,
create_if_not_exists,
expire_after,
eval_interval_secs: eval_interval,
comment,
sql,
flow_options,
@@ -1192,6 +1196,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
or_replace,
create_if_not_exists,
expire_after: expire_after.map(|value| ExpireAfter { value }),
eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
comment,
sql,
flow_options,

View File

@@ -63,6 +63,7 @@ prost.workspace = true
query.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
session.workspace = true
smallvec.workspace = true
@@ -81,6 +82,5 @@ common-catalog.workspace = true
pretty_assertions.workspace = true
prost.workspace = true
query.workspace = true
serde_json = "1.0"
session.workspace = true
table.workspace = true

View File

@@ -773,6 +773,7 @@ impl StreamingEngine {
create_if_not_exists,
or_replace,
expire_after,
eval_interval: _,
comment,
sql,
flow_options,

View File

@@ -318,6 +318,7 @@ impl FlowDualEngine {
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
eval_interval: info.eval_interval(),
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
@@ -770,6 +771,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_after,
eval_interval,
comment,
sql,
flow_options,
@@ -789,6 +791,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
create_if_not_exists,
or_replace,
expire_after,
eval_interval: eval_interval.map(|e| e.seconds),
comment: Some(comment),
sql: sql.clone(),
flow_options,

View File

@@ -16,6 +16,7 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
use catalog::CatalogManagerRef;
@@ -30,6 +31,7 @@ use common_telemetry::{debug, info};
use common_time::TimeToLive;
use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::parsers::utils::is_tql;
use store_api::storage::{RegionId, TableId};
use tokio::sync::{oneshot, RwLock};
@@ -40,8 +42,8 @@ use crate::batching_mode::utils::sql_to_df_plan;
use crate::batching_mode::BatchingModeOptions;
use crate::engine::FlowEngine;
use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu,
CreateFlowSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, InvalidQuerySnafu,
TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
use crate::{CreateFlowArgs, Error, FlowId, TableName};
@@ -335,6 +337,7 @@ impl BatchingEngine {
create_if_not_exists,
or_replace,
expire_after,
eval_interval,
comment: _,
sql,
flow_options,
@@ -361,6 +364,25 @@ impl BatchingEngine {
}
}
let query_ctx = query_ctx.context({
UnexpectedSnafu {
reason: "Query context is None".to_string(),
}
})?;
let query_ctx = Arc::new(query_ctx);
// optionally set a eval interval for the flow
if eval_interval.is_none()
&& is_tql(query_ctx.sql_dialect(), &sql)
.map_err(BoxedError::new)
.context(CreateFlowSnafu { sql: &sql })?
{
InvalidQuerySnafu {
reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
}
.fail()?;
}
let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
ensure!(
@@ -374,13 +396,6 @@ impl BatchingEngine {
}
);
let Some(query_ctx) = query_ctx else {
UnexpectedSnafu {
reason: "Query context is None".to_string(),
}
.fail()?
};
let query_ctx = Arc::new(query_ctx);
let mut source_table_names = Vec::with_capacity(2);
for src_id in source_table_ids {
// also check table option to see if ttl!=instant
@@ -442,6 +457,7 @@ impl BatchingEngine {
catalog_manager: self.catalog_manager.clone(),
shutdown_rx: rx,
batch_opts: self.batch_opts.clone(),
flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)),
};
let task = BatchingTask::try_new(task_args)?;

View File

@@ -75,11 +75,12 @@ pub struct TaskConfig {
pub time_window_expr: Option<TimeWindowExpr>,
/// in seconds
pub expire_after: Option<i64>,
sink_table_name: [String; 3],
pub sink_table_name: [String; 3],
pub source_table_names: HashSet<[String; 3]>,
catalog_manager: CatalogManagerRef,
query_type: QueryType,
batch_opts: Arc<BatchingModeOptions>,
pub catalog_manager: CatalogManagerRef,
pub query_type: QueryType,
pub batch_opts: Arc<BatchingModeOptions>,
pub flow_eval_interval: Option<Duration>,
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
@@ -102,7 +103,7 @@ fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<Quer
}
#[derive(Debug, Clone)]
enum QueryType {
pub enum QueryType {
/// query is a tql query
Tql,
/// query is a sql query
@@ -128,6 +129,7 @@ pub struct TaskArgs<'a> {
pub catalog_manager: CatalogManagerRef,
pub shutdown_rx: oneshot::Receiver<()>,
pub batch_opts: Arc<BatchingModeOptions>,
pub flow_eval_interval: Option<Duration>,
}
pub struct PlanInfo {
@@ -150,6 +152,7 @@ impl BatchingTask {
catalog_manager,
shutdown_rx,
batch_opts,
flow_eval_interval,
}: TaskArgs<'_>,
) -> Result<Self, Error> {
Ok(Self {
@@ -164,6 +167,7 @@ impl BatchingTask {
output_schema: plan.schema().clone(),
query_type: determine_query_type(query, &query_ctx)?,
batch_opts,
flow_eval_interval,
}),
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
})
@@ -452,6 +456,13 @@ impl BatchingTask {
) {
let flow_id_str = self.config.flow_id.to_string();
let mut max_window_cnt = None;
let mut interval = self
.config
.flow_eval_interval
.map(|d| tokio::time::interval(d));
if let Some(tick) = &mut interval {
tick.tick().await; // pass the first tick immediately
}
loop {
// first check if shutdown signal is received
// if so, break the loop
@@ -499,24 +510,33 @@ impl BatchingTask {
max_window_cnt = max_window_cnt.map(|cnt| {
(cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
});
let sleep_until = {
let state = self.state.write().unwrap();
let time_window_size = self
.config
.time_window_expr
.as_ref()
.and_then(|t| *t.time_window_size());
// here use proper ticking if set eval interval
if let Some(eval_interval) = &mut interval {
eval_interval.tick().await;
} else {
// if not explicitly set, just automatically calculate next start time
// using time window size and more args
let sleep_until = {
let state = self.state.write().unwrap();
state.get_next_start_query_time(
self.config.flow_id,
&time_window_size,
min_refresh,
Some(self.config.batch_opts.query_timeout),
self.config.batch_opts.experimental_max_filter_num_per_query,
)
let time_window_size = self
.config
.time_window_expr
.as_ref()
.and_then(|t| *t.time_window_size());
state.get_next_start_query_time(
self.config.flow_id,
&time_window_size,
min_refresh,
Some(self.config.batch_opts.query_timeout),
self.config.batch_opts.experimental_max_filter_num_per_query,
)
};
tokio::time::sleep_until(sleep_until).await;
};
tokio::time::sleep_until(sleep_until).await;
}
// no new data, sleep for some time before checking for new data
Ok(None) => {

View File

@@ -70,6 +70,7 @@ pub struct CreateFlowArgs {
pub create_if_not_exists: bool,
pub or_replace: bool,
pub expire_after: Option<i64>,
pub eval_interval: Option<i64>,
pub comment: Option<String>,
pub sql: String,
pub flow_options: HashMap<String, String>,

View File

@@ -796,6 +796,8 @@ pub fn to_create_flow_task_expr(
})
.collect::<Result<Vec<_>>>()?;
let eval_interval = create_flow.eval_interval;
Ok(CreateFlowExpr {
catalog_name: query_ctx.current_catalog().to_string(),
flow_name: sanitize_flow_name(create_flow.flow_name)?,
@@ -804,9 +806,10 @@ pub fn to_create_flow_task_expr(
or_replace: create_flow.or_replace,
create_if_not_exists: create_flow.if_not_exists,
expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
flow_options: HashMap::new(),
flow_options: Default::default(),
})
}
@@ -839,6 +842,18 @@ mod tests {
let sql = r#"
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
stmt.is_err(),
"Expected error for invalid TQL EVAL parameters: {:#?}",
stmt
);
let sql = r#"
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
@@ -860,7 +875,7 @@ TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
);
assert!(expr.source_table_names.is_empty());
assert_eq!(
r#"TQL EVAL (0, 15, '5s') count_values("status_code", http_requests)"#,
r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
expr.sql
);
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use std::time::{Duration, TryFromFloatSecsError};
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
@@ -344,6 +344,15 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert float seconds to duration, raw: {}", raw))]
TryIntoDuration {
raw: String,
#[snafu(source)]
error: TryFromFloatSecsError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -368,7 +377,8 @@ impl ErrorExt for Error {
| UnsupportedVariable { .. }
| ColumnSchemaNoDefault { .. }
| CteColumnSchemaMismatch { .. }
| ConvertValue { .. } => StatusCode::InvalidArguments,
| ConvertValue { .. }
| TryIntoDuration { .. } => StatusCode::InvalidArguments,
BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,

View File

@@ -33,7 +33,7 @@ use sql::statements::statement::Statement;
use crate::error::{
AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
QueryParseSnafu, Result, UnimplementedSnafu,
QueryParseSnafu, Result, TryIntoDurationSnafu, UnimplementedSnafu,
};
use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
@@ -206,7 +206,8 @@ impl QueryLanguageParser {
// also report rfc3339 error if float parsing fails
.map_err(|_| rfc3339_result.unwrap_err())?;
let duration = Duration::from_secs_f64(secs);
let duration =
Duration::try_from_secs_f64(secs).context(TryIntoDurationSnafu { raw: timestamp })?;
SystemTime::UNIX_EPOCH
.checked_add(duration)
.context(AddSystemTimeOverflowSnafu { duration })

View File

@@ -1019,6 +1019,7 @@ pub fn show_create_flow(
or_replace: false,
if_not_exists: true,
expire_after: flow_val.expire_after(),
eval_interval: flow_val.eval_interval(),
comment,
query,
};

View File

@@ -170,7 +170,7 @@ impl ParserContext<'_> {
Keyword::NoKeyword
if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
{
self.parse_tql()
self.parse_tql(false)
}
Keyword::DECLARE => self.parse_declare_cursor(),

View File

@@ -31,5 +31,5 @@ pub(crate) mod set_var_parser;
pub(crate) mod show_parser;
pub(crate) mod tql_parser;
pub(crate) mod truncate_parser;
pub(crate) mod utils;
pub mod utils;
pub mod with_tql_parser;

View File

@@ -39,6 +39,7 @@ use crate::error::{
SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
};
use crate::parser::{ParserContext, FLOW};
use crate::parsers::tql_parser;
use crate::parsers::utils::{
self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
};
@@ -295,7 +296,16 @@ impl<'a> ParserContext<'a> {
.parser
.consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)])
{
Some(self.parse_interval()?)
Some(self.parse_interval_no_month("EXPIRE AFTER")?)
} else {
None
};
let eval_interval = if self
.parser
.consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
{
Some(self.parse_interval_no_month("EVAL INTERVAL")?)
} else {
None
};
@@ -321,10 +331,39 @@ impl<'a> ParserContext<'a> {
.expect_keyword(Keyword::AS)
.context(SyntaxSnafu)?;
let query = Box::new(self.parse_sql_or_tql(true)?);
Ok(Statement::CreateFlow(CreateFlow {
flow_name,
sink_table_name: output_table_name,
or_replace,
if_not_exists,
expire_after,
eval_interval,
comment,
query,
}))
}
fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
let start_loc = self.parser.peek_token().span.start;
let start_index = location_to_index(self.sql, &start_loc);
let query = self.parse_statement()?;
// only accept sql or tql
let query = match self.parser.peek_token().token {
Token::Word(w) => match w.keyword {
Keyword::SELECT => self.parse_query(),
Keyword::NoKeyword
if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
{
self.parse_tql(require_now_expr)
}
_ => self.unsupported(self.peek_token_as_string()),
},
_ => self.unsupported(self.peek_token_as_string()),
}?;
let end_token = self.parser.peek_token();
let raw_query = if end_token == Token::EOF {
@@ -335,23 +374,19 @@ impl<'a> ParserContext<'a> {
&self.sql[start_index..end_index.min(self.sql.len())]
};
let raw_query = raw_query.trim_end_matches(";");
let query = Box::new(SqlOrTql::try_from_statement(query, raw_query)?);
Ok(Statement::CreateFlow(CreateFlow {
flow_name,
sink_table_name: output_table_name,
or_replace,
if_not_exists,
expire_after,
comment,
query,
}))
let query = SqlOrTql::try_from_statement(query, raw_query)?;
Ok(query)
}
/// Parse the interval expr to duration in seconds.
fn parse_interval(&mut self) -> Result<i64> {
fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
let interval = self.parse_interval_month_day_nano()?.0;
if interval.months != 0 {
return InvalidIntervalSnafu {
reason: format!("Interval with months is not allowed in {context}"),
}
.fail();
}
Ok(
interval.nanoseconds / 1_000_000_000
+ interval.days as i64 * 60 * 60 * 24
@@ -365,7 +400,7 @@ impl<'a> ParserContext<'a> {
fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
let raw_interval_expr = interval_expr.to_string();
let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone())?
let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
.cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
.ok()
.with_context(|| InvalidIntervalSnafu {
@@ -1113,7 +1148,7 @@ mod tests {
use common_catalog::consts::FILE_ENGINE;
use common_error::ext::ErrorExt;
use sqlparser::ast::ColumnOption::NotNull;
use sqlparser::ast::{BinaryOperator, Expr, ObjectName, Value};
use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
use sqlparser::dialect::GenericDialect;
use sqlparser::tokenizer::Tokenizer;
@@ -1450,7 +1485,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
r"
CREATE FLOW `task_2`
SINK TO schema_1.table_1
EXPIRE AFTER '1 month 2 days 1h 2 min'
EXPIRE AFTER '2 days 1h 2 min'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
@@ -1461,7 +1496,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
]),
or_replace: false,
if_not_exists: false,
expire_after: Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60),
expire_after: Some(2 * 86400 + 3600 + 2 * 60),
comment: None,
},
),
@@ -1476,6 +1511,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
or_replace: expected.or_replace,
if_not_exists: expected.if_not_exists,
expire_after: expected.expire_after,
eval_interval: None,
comment: expected.comment,
// ignore query parse result
query: create_task.query.clone(),
@@ -1490,35 +1526,176 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
#[test]
fn test_parse_create_flow() {
let sql = r"
use pretty_assertions::assert_eq;
fn parse_create_flow(sql: &str) -> CreateFlow {
let stmts = ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap();
assert_eq!(1, stmts.len());
match &stmts[0] {
Statement::CreateFlow(c) => c.clone(),
_ => panic!("{:?}", stmts[0]),
}
}
struct CreateFlowWoutQuery {
/// Flow name
pub flow_name: ObjectName,
/// Output (sink) table name
pub sink_table_name: ObjectName,
/// Whether to replace existing task
pub or_replace: bool,
/// Create if not exist
pub if_not_exists: bool,
/// `EXPIRE AFTER`
/// Duration in second as `i64`
pub expire_after: Option<i64>,
/// Duration for flow evaluation interval
/// Duration in seconds as `i64`
/// If not set, flow will be evaluated based on time window size and other args.
pub eval_interval: Option<i64>,
/// Comment string
pub comment: Option<String>,
}
// create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
let testcases = vec![
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER INTERVAL '5 minutes'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
let create_task = match &stmts[0] {
Statement::CreateFlow(c) => c,
_ => unreachable!(),
};
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
sink_table_name: ObjectName(vec![
ObjectNamePart::Identifier(Ident::new("schema_1")),
ObjectNamePart::Identifier(Ident::new("table_1")),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
eval_interval: None,
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER INTERVAL '300 s'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
sink_table_name: ObjectName(vec![
ObjectNamePart::Identifier(Ident::new("schema_1")),
ObjectNamePart::Identifier(Ident::new("table_1")),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
eval_interval: None,
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER '5 minutes'
EVAL INTERVAL '10 seconds'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
sink_table_name: ObjectName(vec![
ObjectNamePart::Identifier(Ident::new("schema_1")),
ObjectNamePart::Identifier(Ident::new("table_1")),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
eval_interval: Some(10),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE OR REPLACE FLOW IF NOT EXISTS task_1
SINK TO schema_1.table_1
EXPIRE AFTER '5 minutes'
EVAL INTERVAL INTERVAL '10 seconds'
COMMENT 'test comment'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
sink_table_name: ObjectName(vec![
ObjectNamePart::Identifier(Ident::new("schema_1")),
ObjectNamePart::Identifier(Ident::new("table_1")),
]),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
eval_interval: Some(10),
comment: Some("test comment".to_string()),
},
),
(
r"
CREATE FLOW `task_2`
SINK TO schema_1.table_1
EXPIRE AFTER '2 days 1h 2 min'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
'`', "task_2",
))]),
sink_table_name: ObjectName(vec![
ObjectNamePart::Identifier(Ident::new("schema_1")),
ObjectNamePart::Identifier(Ident::new("table_1")),
]),
or_replace: false,
if_not_exists: false,
expire_after: Some(2 * 86400 + 3600 + 2 * 60),
eval_interval: None,
comment: None,
},
),
];
let expected = CreateFlow {
flow_name: vec![Ident::new("task_1")].into(),
sink_table_name: vec![Ident::new("schema_1"), Ident::new("table_1")].into(),
or_replace: true,
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
// ignore query parse result
query: create_task.query.clone(),
};
assert_eq!(create_task, &expected);
for (sql, expected) in testcases {
let create_task = parse_create_flow(sql);
// create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
let expected = CreateFlow {
flow_name: expected.flow_name,
sink_table_name: expected.sink_table_name,
or_replace: expected.or_replace,
if_not_exists: expected.if_not_exists,
expire_after: expected.expire_after,
eval_interval: expected.eval_interval,
comment: expected.comment,
// ignore query parse result
query: create_task.query.clone(),
};
assert_eq!(create_task, expected, "input sql is:\n{sql}");
let show_create = create_task.to_string();
let recreated = parse_create_flow(&show_create);
assert_eq!(recreated, expected, "input sql is:\n{show_create}");
}
}
#[test]
fn test_create_flow_no_month() {
let sql = r"
CREATE FLOW `task_2`
SINK TO schema_1.table_1
@@ -1526,21 +1703,15 @@ EXPIRE AFTER '1 month 2 days 1h 2 min'
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
let create_task = match &stmts[0] {
Statement::CreateFlow(c) => c,
_ => unreachable!(),
};
assert!(!create_task.or_replace);
assert!(!create_task.if_not_exists);
assert_eq!(
create_task.expire_after,
Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60)
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
stmts.is_err()
&& stmts
.unwrap_err()
.to_string()
.contains("Interval with months is not allowed")
);
assert!(create_task.comment.is_none());
assert_eq!(create_task.flow_name.to_string(), "`task_2`");
}
#[test]

View File

@@ -44,7 +44,7 @@ use crate::parsers::error::{
/// - `TQL EXPLAIN [VERBOSE] [FORMAT format] <query>`
/// - `TQL ANALYZE [VERBOSE] [FORMAT format] <query>`
impl ParserContext<'_> {
pub(crate) fn parse_tql(&mut self) -> Result<Statement> {
pub(crate) fn parse_tql(&mut self, require_now_expr: bool) -> Result<Statement> {
let _ = self.parser.next_token();
match self.parser.peek_token().token {
@@ -56,7 +56,7 @@ impl ParserContext<'_> {
if (uppercase == EVAL || uppercase == EVALUATE)
&& w.quote_style.is_none() =>
{
self.parse_tql_params()
self.parse_tql_params(require_now_expr)
.map(|params| Statement::Tql(Tql::Eval(TqlEval::from(params))))
.context(error::TQLSyntaxSnafu)
}
@@ -67,7 +67,7 @@ impl ParserContext<'_> {
let _consume_verbose_token = self.parser.next_token();
}
let format = self.parse_format_option();
self.parse_tql_params()
self.parse_tql_params(require_now_expr)
.map(|mut params| {
params.is_verbose = is_verbose;
params.format = format;
@@ -82,7 +82,7 @@ impl ParserContext<'_> {
let _consume_verbose_token = self.parser.next_token();
}
let format = self.parse_format_option();
self.parse_tql_params()
self.parse_tql_params(require_now_expr)
.map(|mut params| {
params.is_verbose = is_verbose;
params.format = format;
@@ -97,18 +97,35 @@ impl ParserContext<'_> {
}
}
fn parse_tql_params(&mut self) -> std::result::Result<TqlParameters, TQLError> {
/// `require_now_expr` indicates whether the start&end must contain a `now()` function.
fn parse_tql_params(
&mut self,
require_now_expr: bool,
) -> std::result::Result<TqlParameters, TQLError> {
let parser = &mut self.parser;
let (start, end, step, lookback) = match parser.peek_token().token {
Token::LParen => {
let _consume_lparen_token = parser.next_token();
let start = Self::parse_string_or_number_or_word(parser, &[Token::Comma])?.0;
let end = Self::parse_string_or_number_or_word(parser, &[Token::Comma])?.0;
let start = Self::parse_string_or_number_or_word(
parser,
&[Token::Comma],
require_now_expr,
)?
.0;
let end = Self::parse_string_or_number_or_word(
parser,
&[Token::Comma],
require_now_expr,
)?
.0;
let (step, delimiter) =
Self::parse_string_or_number_or_word(parser, &[Token::Comma, Token::RParen])?;
let (step, delimiter) = Self::parse_string_or_number_or_word(
parser,
&[Token::Comma, Token::RParen],
false,
)?;
let lookback = if delimiter == Token::Comma {
Self::parse_string_or_number_or_word(parser, &[Token::RParen])
Self::parse_string_or_number_or_word(parser, &[Token::RParen], false)
.ok()
.map(|t| t.0)
} else {
@@ -168,6 +185,7 @@ impl ParserContext<'_> {
fn parse_string_or_number_or_word(
parser: &mut Parser,
delimiter_tokens: &[Token],
require_now_expr: bool,
) -> std::result::Result<(String, Token), TQLError> {
let mut tokens = vec![];
@@ -185,19 +203,30 @@ impl ParserContext<'_> {
.context(ParserSnafu),
1 => {
let value = match tokens[0].clone() {
Token::Number(n, _) => n,
Token::DoubleQuotedString(s) | Token::SingleQuotedString(s) => s,
Token::Word(_) => Self::parse_tokens_to_ts(tokens)?,
Token::Number(n, _) if !require_now_expr => n,
Token::DoubleQuotedString(s) | Token::SingleQuotedString(s)
if !require_now_expr =>
{
s
}
Token::Word(_) => Self::parse_tokens_to_ts(tokens, require_now_expr)?,
unexpected => {
return Err(ParserError::ParserError(format!(
"Expected number, string or word, but have {unexpected:?}"
)))
.context(ParserSnafu);
if !require_now_expr {
return Err(ParserError::ParserError(format!(
"Expected number, string or word, but have {unexpected:?}"
)))
.context(ParserSnafu);
} else {
return Err(ParserError::ParserError(format!(
"Expected expression containing `now()`, but have {unexpected:?}"
)))
.context(ParserSnafu);
}
}
};
Ok(value)
}
_ => Self::parse_tokens_to_ts(tokens),
_ => Self::parse_tokens_to_ts(tokens, require_now_expr),
};
for token in delimiter_tokens {
if parser.consume_token(token) {
@@ -211,9 +240,12 @@ impl ParserContext<'_> {
}
/// Parse the tokens to seconds and convert to string.
fn parse_tokens_to_ts(tokens: Vec<Token>) -> std::result::Result<String, TQLError> {
fn parse_tokens_to_ts(
tokens: Vec<Token>,
require_now_expr: bool,
) -> std::result::Result<String, TQLError> {
let parser_expr = Self::parse_to_expr(tokens)?;
let lit = utils::parser_expr_to_scalar_value_literal(parser_expr)
let lit = utils::parser_expr_to_scalar_value_literal(parser_expr, require_now_expr)
.map_err(Box::new)
.context(ConvertToLogicalExpressionSnafu)?;
@@ -281,6 +313,65 @@ mod tests {
result.remove(0)
}
#[test]
fn test_require_now_expr() {
let sql = "TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
let statement = parser.parse_tql(true).unwrap();
match statement {
Statement::Tql(Tql::Eval(eval)) => {
assert_eq!(eval.start, "0");
assert_eq!(eval.end, "10");
assert_eq!(eval.step, "1s");
assert_eq!(eval.lookback, None);
assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
}
_ => unreachable!(),
};
let sql = "TQL EVAL (0, 15, '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
let statement = parser.parse_tql(true);
assert!(
statement.is_err()
&& format!("{:?}", statement)
.contains("Expected expression containing `now()`, but have "),
"statement: {:?}",
statement
);
let sql = "TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
let statement = parser.parse_tql(false).unwrap();
match statement {
Statement::Tql(Tql::Eval(eval)) => {
assert_eq!(eval.start, "0");
assert_eq!(eval.end, "10");
assert_eq!(eval.step, "1s");
assert_eq!(eval.lookback, None);
assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
}
_ => unreachable!(),
};
let sql = "TQL EVAL (0, 15, '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
let statement = parser.parse_tql(false).unwrap();
match statement {
Statement::Tql(Tql::Eval(eval)) => {
assert_eq!(eval.start, "0");
assert_eq!(eval.end, "15");
assert_eq!(eval.step, "1s");
assert_eq!(eval.lookback, None);
assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
}
_ => unreachable!(),
};
}
#[test]
fn test_parse_tql_eval_with_functions() {
let sql = "TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";

View File

@@ -20,10 +20,11 @@ use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion::execution::SessionStateBuilder;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor};
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource, WindowUDF};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::TableReference;
use datatypes::arrow::datatypes::DataType;
@@ -33,26 +34,105 @@ use datatypes::schema::{
COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE,
};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use sqlparser::dialect::Dialect;
use crate::error::{
ConvertToLogicalExpressionSnafu, ParseSqlValueSnafu, Result, SimplificationSnafu,
ConvertToLogicalExpressionSnafu, InvalidSqlSnafu, ParseSqlValueSnafu, Result,
SimplificationSnafu,
};
use crate::parser::{ParseOptions, ParserContext};
use crate::statements::statement::Statement;
/// Check if the given SQL query is a TQL statement.
pub fn is_tql(dialect: &dyn Dialect, sql: &str) -> Result<bool> {
let stmts = ParserContext::create_with_dialect(sql, dialect, ParseOptions::default())?;
ensure!(
stmts.len() == 1,
InvalidSqlSnafu {
msg: format!("Expect only one statement, found {}", stmts.len())
}
);
let stmt = &stmts[0];
match stmt {
Statement::Tql(_) => Ok(true),
_ => Ok(false),
}
}
/// Convert a parser expression to a scalar value. This function will try the
/// best to resolve and reduce constants. Exprs like `1 + 1` or `now()` can be
/// handled properly.
pub fn parser_expr_to_scalar_value_literal(expr: sqlparser::ast::Expr) -> Result<ScalarValue> {
///
/// if `require_now_expr` is true, it will ensure that the expression contains a `now()` function.
/// If the expression does not contain `now()`, it will return an error.
///
pub fn parser_expr_to_scalar_value_literal(
expr: sqlparser::ast::Expr,
require_now_expr: bool,
) -> Result<ScalarValue> {
// 1. convert parser expr to logical expr
let empty_df_schema = DFSchema::empty();
let logical_expr = SqlToRel::new(&StubContextProvider::default())
.sql_to_expr(expr.into(), &empty_df_schema, &mut Default::default())
.context(ConvertToLogicalExpressionSnafu)?;
struct FindNow {
found: bool,
}
impl TreeNodeVisitor<'_> for FindNow {
type Node = Expr;
fn f_down(
&mut self,
node: &Self::Node,
) -> DfResult<datafusion_common::tree_node::TreeNodeRecursion> {
if let Expr::ScalarFunction(func) = node {
if func.name().to_lowercase() == "now" {
if !func.args.is_empty() {
return Err(datafusion_common::DataFusionError::Plan(
"now() function should not have arguments".to_string(),
));
}
self.found = true;
return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
}
}
Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
}
}
if require_now_expr {
let have_now = {
let mut visitor = FindNow { found: false };
logical_expr.visit(&mut visitor).unwrap();
visitor.found
};
if !have_now {
return ParseSqlValueSnafu {
msg: format!(
"expected now() expression, but not found in {}",
logical_expr
),
}
.fail();
}
}
// 2. simplify logical expr
let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(empty_df_schema));
let simplified_expr = ExprSimplifier::new(info)
let info =
SimplifyContext::new(&execution_props).with_schema(Arc::new(empty_df_schema.clone()));
let simplifier = ExprSimplifier::new(info);
// Coerce the logical expression so simplifier can handle it correctly. This is necessary for const eval with possible type mismatch. i.e.: `now() - now() + '15s'::interval` which is `TimestampNanosecond - TimestampNanosecond + IntervalMonthDayNano`.
let logical_expr = simplifier
.coerce(logical_expr, &empty_df_schema)
.context(SimplificationSnafu)?;
let simplified_expr = simplifier
.simplify(logical_expr)
.context(SimplificationSnafu)?;
@@ -177,3 +257,72 @@ pub fn convert_month_day_nano_to_duration(
Ok(std::time::Duration::new(adjusted_seconds, nanos_remainder))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chrono::DateTime;
use datafusion::functions::datetime::expr_fn::now;
use datafusion_expr::lit;
use datatypes::arrow::datatypes::TimestampNanosecondType;
use super::*;
/// Keep this test to make sure we are using datafusion's `ExprSimplifier` correctly.
#[test]
fn test_simplifier() {
let now_time = DateTime::from_timestamp(61, 0).unwrap();
let lit_now = lit(ScalarValue::new_timestamp::<TimestampNanosecondType>(
now_time.timestamp_nanos_opt(),
None,
));
let testcases = vec![
(now(), lit_now),
(now() - now(), lit(ScalarValue::DurationNanosecond(Some(0)))),
(
now() + lit(ScalarValue::new_interval_dt(0, 1500)),
lit(ScalarValue::new_timestamp::<TimestampNanosecondType>(
Some(62500000000),
None,
)),
),
(
now() - (now() + lit(ScalarValue::new_interval_dt(0, 1500))),
lit(ScalarValue::DurationNanosecond(Some(-1500000000))),
),
// this one failed if type is not coerced
(
now() - now() + lit(ScalarValue::new_interval_dt(0, 1500)),
lit(ScalarValue::new_interval_mdn(0, 0, 1500000000)),
),
(
lit(ScalarValue::new_interval_mdn(
0,
0,
61 * 86400 * 1_000_000_000,
)),
lit(ScalarValue::new_interval_mdn(
0,
0,
61 * 86400 * 1_000_000_000,
)),
),
];
let execution_props = ExecutionProps::new().with_query_execution_start_time(now_time);
let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
let simplifier = ExprSimplifier::new(info);
for (expr, expected) in testcases {
let expr_name = expr.schema_name().to_string();
let expr = simplifier.coerce(expr, &DFSchema::empty()).unwrap();
let simplified_expr = simplifier.simplify(expr).unwrap();
assert_eq!(
simplified_expr, expected,
"Failed to simplify expression: {expr_name}"
);
}
}
}

View File

@@ -383,6 +383,10 @@ pub struct CreateFlow {
/// `EXPIRE AFTER`
/// Duration in second as `i64`
pub expire_after: Option<i64>,
/// Duration for flow evaluation interval
/// Duration in seconds as `i64`
/// If not set, flow will be evaluated based on time window size and other args.
pub eval_interval: Option<i64>,
/// Comment string
pub comment: Option<String>,
/// SQL statement
@@ -436,7 +440,10 @@ impl Display for CreateFlow {
writeln!(f, "{}", &self.flow_name)?;
writeln!(f, "SINK TO {}", &self.sink_table_name)?;
if let Some(expire_after) = &self.expire_after {
writeln!(f, "EXPIRE AFTER '{} s' ", expire_after)?;
writeln!(f, "EXPIRE AFTER '{} s'", expire_after)?;
}
if let Some(eval_interval) = &self.eval_interval {
writeln!(f, "EVAL INTERVAL '{} s'", eval_interval)?;
}
if let Some(comment) = &self.comment {
writeln!(f, "COMMENT '{}'", comment)?;

View File

@@ -8,7 +8,7 @@ CREATE TABLE http_requests (
Affected Rows: 0
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests);
Affected Rows: 0
@@ -91,9 +91,26 @@ CREATE TABLE http_requests (
Affected Rows: 0
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);
Error: 2000(InvalidSyntax), Invalid TQL syntax: sql parser error: Expected expression containing `now()`, but have Number("0", false)
-- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative"))
-- so duplicate test into two
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests);
Error: 3001(EngineExecuteQuery), Failed to convert float seconds to duration, raw: -15
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests);
Error: 2000(InvalidSyntax), Invalid TQL syntax: Failed to evaluate TQL expression: Failed to extract a timestamp value IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 15000000000 }")
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests);
Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;

View File

@@ -6,7 +6,7 @@ CREATE TABLE http_requests (
PRIMARY KEY(host, idc),
);
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests);
SHOW CREATE TABLE cnt_reqs;
@@ -47,9 +47,21 @@ CREATE TABLE http_requests (
PRIMARY KEY(host, idc),
);
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);
-- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative"))
-- so duplicate test into two
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests);
SHOW CREATE TABLE cnt_reqs;
INSERT INTO TABLE http_requests VALUES

View File

@@ -0,0 +1,189 @@
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests);
Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" BIGINT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
(now() - '17s'::interval, 'host2', 'idc1', 200),
(now() - '17s'::interval, 'host3', 'idc2', 200),
(now() - '17s'::interval, 'host4', 'idc2', 401),
(now() - '13s'::interval, 'host1', 'idc1', 404),
(now() - '13s'::interval, 'host2', 'idc1', 401),
(now() - '13s'::interval, 'host3', 'idc2', 404),
(now() - '13s'::interval, 'host4', 'idc2', 500),
(now() - '7s'::interval, 'host1', 'idc1', 200),
(now() - '7s'::interval, 'host2', 'idc1', 200),
(now() - '7s'::interval, 'host3', 'idc2', 201),
(now() - '7s'::interval, 'host4', 'idc2', 201),
(now() - '3s'::interval, 'host1', 'idc1', 500),
(now() - '3s'::interval, 'host2', 'idc1', 500),
(now() - '3s'::interval, 'host3', 'idc2', 500),
(now() - '3s'::interval, 'host4', 'idc2', 500);
Affected Rows: 16
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_reqs');
+-------------------------------+
| ADMIN FLUSH_FLOW('calc_reqs') |
+-------------------------------+
| FLOW_FLUSHED |
+-------------------------------+
-- too much indeterminsticity in the test, so just check that the flow is running
SELECT count(*) > 0 FROM cnt_reqs;
+---------------------+
| count(*) > Int64(0) |
+---------------------+
| true |
+---------------------+
DROP FLOW calc_reqs;
Affected Rows: 0
DROP TABLE http_requests;
Affected Rows: 0
DROP TABLE cnt_reqs;
Affected Rows: 0
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);
Error: 2000(InvalidSyntax), Invalid TQL syntax: sql parser error: Expected expression containing `now()`, but have Number("0", false)
-- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative"))
-- so duplicate test into two
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests);
Error: 3001(EngineExecuteQuery), Failed to convert float seconds to duration, raw: -15: cannot convert float seconds to Duration: value is negative
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests);
Error: 2000(InvalidSyntax), Invalid TQL syntax: Failed to evaluate TQL expression: Failed to extract a timestamp value IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 15000000000 }")
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests);
Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" BIGINT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200),
(0::Timestamp, 'host2', 'idc1', 200),
(0::Timestamp, 'host3', 'idc2', 200),
(0::Timestamp, 'host4', 'idc2', 401),
(5000::Timestamp, 'host1', 'idc1', 404),
(5000::Timestamp, 'host2', 'idc1', 401),
(5000::Timestamp, 'host3', 'idc2', 404),
(5000::Timestamp, 'host4', 'idc2', 500),
(10000::Timestamp, 'host1', 'idc1', 200),
(10000::Timestamp, 'host2', 'idc1', 200),
(10000::Timestamp, 'host3', 'idc2', 201),
(10000::Timestamp, 'host4', 'idc2', 201),
(15000::Timestamp, 'host1', 'idc1', 500),
(15000::Timestamp, 'host2', 'idc1', 500),
(15000::Timestamp, 'host3', 'idc2', 500),
(15000::Timestamp, 'host4', 'idc2', 500);
Affected Rows: 16
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_reqs');
+-------------------------------+
| ADMIN FLUSH_FLOW('calc_reqs') |
+-------------------------------+
| FLOW_FLUSHED |
+-------------------------------+
SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
+--------------------------+---------------------+-------------+
| count(http_requests.val) | ts | status_code |
+--------------------------+---------------------+-------------+
| 3 | 1970-01-01T00:00:00 | 200 |
| 1 | 1970-01-01T00:00:00 | 401 |
| 1 | 1970-01-01T00:00:05 | 401 |
| 2 | 1970-01-01T00:00:05 | 404 |
| 1 | 1970-01-01T00:00:05 | 500 |
| 2 | 1970-01-01T00:00:10 | 200 |
| 2 | 1970-01-01T00:00:10 | 201 |
| 4 | 1970-01-01T00:00:15 | 500 |
+--------------------------+---------------------+-------------+
DROP FLOW calc_reqs;
Affected Rows: 0
DROP TABLE http_requests;
Affected Rows: 0
DROP TABLE cnt_reqs;
Affected Rows: 0

View File

@@ -0,0 +1,92 @@
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests);
SHOW CREATE TABLE cnt_reqs;
INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
(now() - '17s'::interval, 'host2', 'idc1', 200),
(now() - '17s'::interval, 'host3', 'idc2', 200),
(now() - '17s'::interval, 'host4', 'idc2', 401),
(now() - '13s'::interval, 'host1', 'idc1', 404),
(now() - '13s'::interval, 'host2', 'idc1', 401),
(now() - '13s'::interval, 'host3', 'idc2', 404),
(now() - '13s'::interval, 'host4', 'idc2', 500),
(now() - '7s'::interval, 'host1', 'idc1', 200),
(now() - '7s'::interval, 'host2', 'idc1', 200),
(now() - '7s'::interval, 'host3', 'idc2', 201),
(now() - '7s'::interval, 'host4', 'idc2', 201),
(now() - '3s'::interval, 'host1', 'idc1', 500),
(now() - '3s'::interval, 'host2', 'idc1', 500),
(now() - '3s'::interval, 'host3', 'idc2', 500),
(now() - '3s'::interval, 'host4', 'idc2', 500);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_reqs');
-- too much indeterminsticity in the test, so just check that the flow is running
SELECT count(*) > 0 FROM cnt_reqs;
DROP FLOW calc_reqs;
DROP TABLE http_requests;
DROP TABLE cnt_reqs;
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);
-- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative"))
-- so duplicate test into two
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests);
SHOW CREATE TABLE cnt_reqs;
INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200),
(0::Timestamp, 'host2', 'idc1', 200),
(0::Timestamp, 'host3', 'idc2', 200),
(0::Timestamp, 'host4', 'idc2', 401),
(5000::Timestamp, 'host1', 'idc1', 404),
(5000::Timestamp, 'host2', 'idc1', 401),
(5000::Timestamp, 'host3', 'idc2', 404),
(5000::Timestamp, 'host4', 'idc2', 500),
(10000::Timestamp, 'host1', 'idc1', 200),
(10000::Timestamp, 'host2', 'idc1', 200),
(10000::Timestamp, 'host3', 'idc2', 201),
(10000::Timestamp, 'host4', 'idc2', 201),
(15000::Timestamp, 'host1', 'idc1', 500),
(15000::Timestamp, 'host2', 'idc1', 500),
(15000::Timestamp, 'host3', 'idc2', 500),
(15000::Timestamp, 'host4', 'idc2', 500);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_reqs');
SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
DROP FLOW calc_reqs;
DROP TABLE http_requests;
DROP TABLE cnt_reqs;