feat: flow tql cte (#7702)

* feat: flow tql cte

Signed-off-by: discord9 <discord9@163.com>

* fix: creating flow TQL CTE source tables lose cte part in query

Signed-off-by: discord9 <discord9@163.com>

* test: update sqlness result

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* fix: properly canonicalize ident

Signed-off-by: discord9 <discord9@163.com>

* feat: even stricter check

Signed-off-by: discord9 <discord9@163.com>

* chore: sqlness update

Signed-off-by: discord9 <discord9@163.com>

* chore: after rebase fix

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-06 11:36:42 +08:00
committed by GitHub
parent 4c30b9efaf
commit d1151b665b
13 changed files with 990 additions and 53 deletions

View File

@@ -35,8 +35,7 @@ use query::QueryEngineRef;
use query::query_engine::DefaultSerializer;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use sql::parsers::utils::is_tql;
use store_api::mito_engine_options::MERGE_MODE_KEY;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::adapter::DfTableProviderAdapter;
@@ -84,22 +83,14 @@ pub struct TaskConfig {
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
ensure!(
stmts.len() == 1,
InvalidQuerySnafu {
reason: format!("Expect only one statement, found {}", stmts.len())
}
);
let stmt = &stmts[0];
match stmt {
Statement::Tql(_) => Ok(QueryType::Tql),
_ => Ok(QueryType::Sql),
}
let is_tql = is_tql(query_ctx.sql_dialect(), query)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(if is_tql {
QueryType::Tql
} else {
QueryType::Sql
})
}
fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {

View File

@@ -1158,6 +1158,105 @@ TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http
);
}
#[test]
fn test_create_flow_tql_cte_source_tables() {
let sql = r#"
CREATE FLOW calc_cte
SINK TO metric_cte_sink
EVAL INTERVAL '1m'
AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
)
SELECT * FROM tql;
"#;
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
let to_dot_sep =
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
assert_eq!(1, expr.source_table_names.len());
assert_eq!(
"greptime.public.metric_cte",
to_dot_sep(expr.source_table_names[0].clone())
);
}
#[test]
fn test_create_flow_tql_cte_source_tables_quoted_cte_name() {
let sql = r#"
CREATE FLOW calc_cte
SINK TO metric_cte_sink
EVAL INTERVAL '1m'
AS
WITH "TQL"(ts, the_value) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
)
SELECT * FROM "TQL";
"#;
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
let to_dot_sep =
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
assert_eq!(1, expr.source_table_names.len());
assert_eq!(
"greptime.public.metric_cte",
to_dot_sep(expr.source_table_names[0].clone())
);
}
#[test]
fn test_create_flow_tql_cte_source_tables_same_name() {
let sql = r#"
CREATE FLOW calc_cte
SINK TO metric_cte_sink
EVAL INTERVAL '1m'
AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') tql
)
SELECT * FROM tql;
"#;
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
let to_dot_sep =
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
assert_eq!(1, expr.source_table_names.len());
assert_eq!(
"greptime.public.tql",
to_dot_sep(expr.source_table_names[0].clone())
);
}
#[test]
fn test_create_flow_expr() {
let sql = r"

View File

@@ -68,6 +68,7 @@ use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parser::{ParseOptions, ParserContext};
use sql::parsers::utils::is_tql;
use sql::statements::OptionMap;
#[cfg(feature = "enterprise")]
use sql::statements::alter::trigger::AlterTrigger;
@@ -716,6 +717,13 @@ impl StatementExecutor {
);
let stmt = &stmts[0];
if is_tql(query_ctx.sql_dialect(), &expr.sql)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
return Ok(FlowType::Batching);
}
// support tql parse too
let plan = match stmt {
// prom ql is only supported in batching mode

View File

@@ -38,9 +38,9 @@ use table::requests::validate_database_option;
use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
use crate::error::{
self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu,
UnexpectedSnafu, UnsupportedSnafu,
self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
};
use crate::parser::{FLOW, ParserContext};
use crate::parsers::tql_parser;
@@ -343,7 +343,7 @@ impl<'a> ParserContext<'a> {
.expect_keyword(Keyword::AS)
.context(SyntaxSnafu)?;
let query = Box::new(self.parse_sql_or_tql(true)?);
let query = Box::new(self.parse_flow_sql_or_tql(true)?);
Ok(Statement::CreateFlow(CreateFlow {
flow_name,
@@ -357,14 +357,20 @@ impl<'a> ParserContext<'a> {
}))
}
fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
let start_loc = self.parser.peek_token().span.start;
let start_index = location_to_index(self.sql, &start_loc);
let starts_with_with = matches!(
self.parser.peek_token().token,
Token::Word(w) if w.keyword == Keyword::WITH
);
// only accept sql or tql
let query = match self.parser.peek_token().token {
Token::Word(w) => match w.keyword {
Keyword::SELECT => self.parse_query(),
Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
Keyword::NoKeyword
if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
{
@@ -376,6 +382,23 @@ impl<'a> ParserContext<'a> {
_ => self.unsupported(self.peek_token_as_string()),
}?;
if starts_with_with {
let Statement::Query(query) = &query else {
return InvalidFlowQuerySnafu {
reason: "Expect a query after WITH".to_string(),
}
.fail();
};
if !utils::is_simple_tql_cte_query(query) {
return InvalidFlowQuerySnafu {
reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
.to_string(),
}
.fail();
}
}
let end_token = self.parser.peek_token();
let raw_query = if end_token == Token::EOF {
@@ -386,6 +409,7 @@ impl<'a> ParserContext<'a> {
&self.sql[start_index..end_index.min(self.sql.len())]
};
let raw_query = raw_query.trim_end_matches(";");
let query = SqlOrTql::try_from_statement(query, raw_query)?;
Ok(query)
}
@@ -1808,6 +1832,125 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
}
}
#[test]
fn test_parse_create_flow_with_tql_cte_query() {
let sql = r#"
CREATE FLOW calc_reqs_cte
SINK TO cnt_reqs_cte
EVAL INTERVAL '1m'
AS
WITH tql(the_timestamp, the_value) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric
)
SELECT * FROM tql;
"#;
let stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
let Statement::CreateFlow(create_flow) = &stmts[0] else {
panic!("unexpected stmt: {:?}", stmts[0]);
};
let query = create_flow.query.to_string();
assert!(query.to_uppercase().contains("WITH"));
assert!(query.to_uppercase().contains("TQL EVAL"));
}
#[test]
fn test_parse_create_flow_with_sql_cte_is_unsupported() {
let sql = r#"
CREATE FLOW f
SINK TO s
AS
WITH cte AS (SELECT 1) SELECT * FROM cte;
"#;
let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
let msg = err.to_string();
assert!(msg.to_uppercase().contains("WITH"), "err: {msg}");
}
#[test]
fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
let sql = r#"
CREATE FLOW f
SINK TO s
EVAL INTERVAL '1m'
AS
WITH tql(ts, val) AS (
TQL EVAL (0, 15, '5s') metric
)
SELECT * FROM tql;
"#;
let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains("Expected expression containing `now()`"),
"unexpected err: {msg}"
);
}
#[test]
fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
let sql = r#"
CREATE FLOW f
SINK TO s
AS
WITH tql(ts, val) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric
)
SELECT ts FROM tql;
"#;
let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
}
#[test]
fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
let sql = r#"
CREATE FLOW f
SINK TO s
AS
WITH tql(ts, val) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric
)
SELECT * FROM tql WHERE ts > 0;
"#;
let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
}
#[test]
fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
let sql = r#"
CREATE FLOW f
SINK TO s
AS
WITH s1 AS (SELECT 1),
tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
SELECT * FROM tql;
"#;
let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
}
#[test]
fn test_create_flow_no_month() {
let sql = r"

View File

@@ -1,7 +1,6 @@
use std::time::Duration;
use snafu::{OptionExt, ResultExt, ensure};
use sqlparser::ast::Query;
use sqlparser::keywords::Keyword;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::Token;
@@ -16,6 +15,7 @@ use crate::statements::create::SqlOrTql;
use crate::statements::create::trigger::{
AlertManagerWebhook, ChannelType, CreateTrigger, DurationExpr, NotifyChannel, TriggerOn,
};
use crate::statements::query::Query;
use crate::statements::statement::Statement;
use crate::util::{location_to_index, parse_option_string};
@@ -259,7 +259,8 @@ impl<'a> ParserContext<'a> {
);
let raw_sql = &self.sql[start_index..end_index];
Ok((*sql_query, raw_sql.trim().to_string()))
let query = (*sql_query).try_into()?;
Ok((query, raw_sql.trim().to_string()))
}
pub(crate) fn parse_trigger_for(

View File

@@ -47,11 +47,13 @@ use crate::error::{
Result, SimplificationSnafu, SyntaxSnafu,
};
use crate::parser::{ParseOptions, ParserContext};
use crate::parsers::with_tql_parser::CteContent;
use crate::statements::OptionMap;
use crate::statements::query::Query;
use crate::statements::statement::Statement;
use crate::util::{OptionValue, parse_option_string};
/// Check if the given SQL query is a TQL statement.
/// Check if the given SQL query is a TQL statement. Simple tql cte query is also considered as TQL statement.
pub fn is_tql(dialect: &dyn Dialect, sql: &str) -> Result<bool> {
let stmts = ParserContext::create_with_dialect(sql, dialect, ParseOptions::default())?;
@@ -64,10 +66,123 @@ pub fn is_tql(dialect: &dyn Dialect, sql: &str) -> Result<bool> {
let stmt = &stmts[0];
match stmt {
Statement::Tql(_) => Ok(true),
Statement::Query(query) => Ok(is_simple_tql_cte_query(query)),
_ => Ok(false),
}
}
pub(crate) fn is_simple_tql_cte_query(query: &Query) -> bool {
use crate::parser::ParserContext;
let Some(hybrid_cte) = &query.hybrid_cte else {
return false;
};
if !has_only_hybrid_tql_cte(query) {
return false;
}
let Some(cte) = hybrid_cte.cte_tables.first() else {
return false;
};
if hybrid_cte.cte_tables.len() != 1 || !matches!(cte.content, CteContent::Tql(_)) {
return false;
}
let Some(reference) = extract_simple_select_star_reference(query) else {
return false;
};
let reference = ParserContext::canonicalize_identifier(reference).value;
let cte_name = ParserContext::canonicalize_identifier(cte.name.clone()).value;
reference == cte_name
}
fn has_only_hybrid_tql_cte(query: &Query) -> bool {
query
.inner
.with
.as_ref()
.is_none_or(|with| with.cte_tables.is_empty())
}
fn extract_simple_select_star_reference(query: &Query) -> Option<sqlparser::ast::Ident> {
use sqlparser::ast::{SetExpr, TableFactor};
if !is_plain_query_root(&query.inner) {
return None;
}
let SetExpr::Select(select) = &*query.inner.body else {
return None;
};
if !is_plain_select(select) || !is_plain_wildcard_projection(select.projection.as_slice()) {
return None;
}
let [table_with_joins] = select.from.as_slice() else {
return None;
};
if !table_with_joins.joins.is_empty() {
return None;
}
let TableFactor::Table { name, .. } = &table_with_joins.relation else {
return None;
};
if name.0.len() != 1 {
return None;
}
name.0[0].as_ident().cloned()
}
fn is_plain_query_root(query: &sqlparser::ast::Query) -> bool {
query.order_by.is_none()
&& query.limit_clause.is_none()
&& query.fetch.is_none()
&& query.locks.is_empty()
&& query.for_clause.is_none()
&& query.settings.is_none()
&& query.format_clause.is_none()
&& query.pipe_operators.is_empty()
}
fn is_plain_select(select: &sqlparser::ast::Select) -> bool {
use sqlparser::ast::GroupByExpr;
select.distinct.is_none()
&& select.top.is_none()
&& select.exclude.is_none()
&& select.into.is_none()
&& select.lateral_views.is_empty()
&& select.prewhere.is_none()
&& select.selection.is_none()
&& matches!(select.group_by, GroupByExpr::Expressions(ref exprs, _) if exprs.is_empty())
&& select.cluster_by.is_empty()
&& select.distribute_by.is_empty()
&& select.sort_by.is_empty()
&& select.having.is_none()
&& select.named_window.is_empty()
&& select.qualify.is_none()
&& select.value_table_mode.is_none()
&& select.connect_by.is_empty()
}
fn is_plain_wildcard_projection(projection: &[sqlparser::ast::SelectItem]) -> bool {
use sqlparser::ast::SelectItem;
matches!(
projection,
[SelectItem::Wildcard(options)]
if options.opt_ilike.is_none()
&& options.opt_exclude.is_none()
&& options.opt_except.is_none()
&& options.opt_replace.is_none()
&& options.opt_rename.is_none()
)
}
/// Convert a parser expression to a scalar value. This function will try the
/// best to resolve and reduce constants. Exprs like `1 + 1` or `now()` can be
/// handled properly.
@@ -294,6 +409,74 @@ mod tests {
use datatypes::arrow::datatypes::TimestampNanosecondType;
use super::*;
use crate::dialect::GreptimeDbDialect;
use crate::parser::{ParseOptions, ParserContext};
use crate::statements::statement::Statement;
#[test]
fn test_is_tql() {
let dialect = GreptimeDbDialect {};
assert!(is_tql(&dialect, "TQL EVAL (0, 10, '1s') cpu_usage_total").unwrap());
assert!(!is_tql(&dialect, "SELECT 1").unwrap());
let tql_cte = r#"
WITH tql_cte(ts, val) AS (
TQL EVAL (0, 15, '5s') metric
)
SELECT * FROM tql_cte
"#;
assert!(is_tql(&dialect, tql_cte).unwrap());
let rename_cols = r#"
WITH tql (the_timestamp, the_value) AS (
TQL EVAL (0, 40, '10s') metric
)
SELECT * FROM tql
"#;
assert!(is_tql(&dialect, rename_cols).unwrap());
let stmts =
ParserContext::create_with_dialect(rename_cols, &dialect, ParseOptions::default())
.unwrap();
let Statement::Query(q) = &stmts[0] else {
panic!("Expected Query statement");
};
let hybrid = q.hybrid_cte.as_ref().expect("Expected hybrid cte");
assert_eq!(hybrid.cte_tables.len(), 1);
assert_eq!(hybrid.cte_tables[0].columns.len(), 2);
assert_eq!(hybrid.cte_tables[0].columns[0].to_string(), "the_timestamp");
assert_eq!(hybrid.cte_tables[0].columns[1].to_string(), "the_value");
let sql_cte = r#"
WITH cte AS (SELECT 1)
SELECT * FROM cte
"#;
assert!(!is_tql(&dialect, sql_cte).unwrap());
let extra_sql_cte = r#"
WITH sql_cte AS (SELECT 1), tql_cte(ts, val) AS (
TQL EVAL (0, 15, '5s') metric
)
SELECT * FROM tql_cte
"#;
assert!(!is_tql(&dialect, extra_sql_cte).unwrap());
let not_select_star = r#"
WITH tql_cte(ts, val) AS (
TQL EVAL (0, 15, '5s') metric
)
SELECT ts FROM tql_cte
"#;
assert!(!is_tql(&dialect, not_select_star).unwrap());
let with_filter = r#"
WITH tql_cte(ts, val) AS (
TQL EVAL (0, 15, '5s') metric
)
SELECT * FROM tql_cte WHERE ts > 0
"#;
assert!(!is_tql(&dialect, with_filter).unwrap());
}
/// Keep this test to make sure we are using datafusion's `ExprSimplifier` correctly.
#[test]

View File

@@ -27,7 +27,7 @@ use sqlparser_derive::{Visit, VisitMut};
use crate::dialect::GreptimeDbDialect;
use crate::error::{self, Result};
use crate::parser::{ParseOptions, ParserContext};
use crate::parser::ParserContext;
use crate::parsers::tql_parser;
use crate::statements::query::Query;
use crate::statements::statement::Statement;
@@ -97,6 +97,10 @@ impl fmt::Display for HybridCteWith {
impl ParserContext<'_> {
/// Parse a WITH clause that may contain TQL CTEs or SQL CTEs.
pub(crate) fn parse_with_tql(&mut self) -> Result<Statement> {
self.parse_with_tql_with_now(false)
}
pub(crate) fn parse_with_tql_with_now(&mut self, require_now_expr: bool) -> Result<Statement> {
// Consume the WITH token
self.parser
.expect_keyword(Keyword::WITH)
@@ -110,7 +114,7 @@ impl ParserContext<'_> {
let mut sql_cte_tables = Vec::new();
loop {
let cte = self.parse_hybrid_cte()?;
let cte = self.parse_hybrid_cte(require_now_expr)?;
match cte.content {
CteContent::Sql(body) => sql_cte_tables.push(Cte {
alias: TableAlias {
@@ -161,7 +165,7 @@ impl ParserContext<'_> {
}
/// Parse a single CTE that can be either SQL or TQL
fn parse_hybrid_cte(&mut self) -> Result<HybridCte> {
fn parse_hybrid_cte(&mut self, require_now_expr: bool) -> Result<HybridCte> {
// Parse CTE name
let name = self.parser.parse_identifier().context(error::SyntaxSnafu)?;
let name = Self::canonicalize_identifier(name);
@@ -182,7 +186,7 @@ impl ParserContext<'_> {
.expect_token(&Token::LParen)
.context(error::SyntaxSnafu)?;
let content = self.parse_cte_content()?;
let content = self.parse_cte_content(require_now_expr)?;
self.parser
.expect_token(&Token::RParen)
@@ -196,14 +200,14 @@ impl ParserContext<'_> {
}
/// Determine if CTE contains TQL or SQL and parse accordingly
fn parse_cte_content(&mut self) -> Result<CteContent> {
fn parse_cte_content(&mut self, require_now_expr: bool) -> Result<CteContent> {
// Check if the next token is TQL
if let Token::Word(w) = &self.parser.peek_token().token
&& w.keyword == Keyword::NoKeyword
&& w.quote_style.is_none()
&& w.value.to_uppercase() == tql_parser::TQL
{
let tql = self.parse_tql_content_in_cte()?;
let tql = self.parse_tql_content_in_cte(require_now_expr)?;
return Ok(CteContent::Tql(tql));
}
@@ -219,7 +223,7 @@ impl ParserContext<'_> {
/// can handle it normally.
///
/// Only `TQL EVAL` is supported inside CTEs.
fn parse_tql_content_in_cte(&mut self) -> Result<Tql> {
fn parse_tql_content_in_cte(&mut self, require_now_expr: bool) -> Result<Tql> {
// Consume and get the position of the TQL keyword
let tql_token = self.parser.next_token();
if tql_token.token == Token::EOF {
@@ -271,21 +275,10 @@ impl ParserContext<'_> {
let tql_string = &self.sql[start_index..end_index];
let tql_string = tql_string.trim();
// Parse the TQL string using the standard TQL parser
let mut stmts = ParserContext::create_with_dialect(
tql_string,
&GreptimeDbDialect {},
ParseOptions::default(),
)?;
let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, tql_string)?;
let statement = parser_ctx.parse_tql(require_now_expr)?;
if stmts.len() != 1 {
return Err(error::InvalidSqlSnafu {
msg: "Expected a single TQL statement inside CTE".to_string(),
}
.build());
}
match stmts.remove(0) {
match statement {
Statement::Tql(Tql::Eval(eval)) => Ok(Tql::Eval(eval)),
Statement::Tql(_) => Err(error::InvalidSqlSnafu {
msg: "Only TQL EVAL is supported in CTEs".to_string(),

View File

@@ -27,7 +27,7 @@ use datatypes::types::StructType;
use itertools::Itertools;
use serde::Serialize;
use snafu::{OptionExt, ResultExt};
use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
use sqlparser::ast::{ColumnOptionDef, DataType, Expr};
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
@@ -35,6 +35,7 @@ use crate::error::{
InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result,
SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
};
use crate::statements::query::Query as GtQuery;
use crate::statements::statement::Statement;
use crate::statements::tql::Tql;
use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
@@ -618,7 +619,7 @@ pub struct CreateFlow {
/// Either a sql query or a tql query
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
pub enum SqlOrTql {
Sql(Query, String),
Sql(GtQuery, String),
Tql(Tql, String),
}
@@ -637,9 +638,7 @@ impl SqlOrTql {
original_query: &str,
) -> std::result::Result<Self, crate::error::Error> {
match value {
Statement::Query(query) => {
Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
}
Statement::Query(query) => Ok(Self::Sql(*query, original_query.to_string())),
Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
_ => InvalidFlowQuerySnafu {
reason: format!("Expect either sql query or promql query, found {:?}", value),

View File

@@ -33,7 +33,10 @@ use sqlparser_derive::{Visit, VisitMut};
use crate::ast::ObjectNamePartExt;
use crate::error::{InvalidExprAsOptionValueSnafu, InvalidSqlSnafu, Result};
use crate::parser::ParserContext;
use crate::parsers::with_tql_parser::CteContent;
use crate::statements::create::SqlOrTql;
use crate::statements::query::Query;
use crate::statements::tql::Tql;
const SCHEMA_MATCHER: &str = "__schema__";
@@ -191,13 +194,57 @@ pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator<Item = Objec
let mut names = HashSet::new();
match query {
SqlOrTql::Sql(query, _) => extract_tables_from_set_expr(&query.body, &mut names),
SqlOrTql::Sql(query, _) => {
extract_tables_from_set_expr(&query.inner.body, &mut names);
extract_tables_from_hybrid_cte_query(query, &mut names);
}
SqlOrTql::Tql(tql, _) => extract_tables_from_tql(tql, &mut names),
}
names.into_iter()
}
fn extract_tables_from_hybrid_cte_query(query: &Query, sql_names: &mut HashSet<ObjectName>) {
let mut tql_names = HashSet::new();
let mut cte_names: HashSet<String> = HashSet::new();
if let Some(hybrid_cte) = &query.hybrid_cte {
for cte in &hybrid_cte.cte_tables {
cte_names.insert(ParserContext::canonicalize_identifier(cte.name.clone()).value);
if let CteContent::Tql(tql) = &cte.content {
extract_tables_from_tql(tql, &mut tql_names);
}
}
}
if let Some(with) = &query.inner.with {
for cte in &with.cte_tables {
cte_names.insert(ParserContext::canonicalize_identifier(cte.alias.name.clone()).value);
}
}
remove_cte_names(sql_names, &cte_names);
sql_names.extend(tql_names);
}
fn remove_cte_names(names: &mut HashSet<ObjectName>, cte_names: &HashSet<String>) {
if cte_names.is_empty() {
return;
}
names.retain(|name| {
if name.0.len() != 1 {
return true;
}
let Some(ident) = name.0[0].as_ident() else {
return true;
};
let canonical = ParserContext::canonicalize_identifier(ident.clone()).value;
!cte_names.contains(&canonical)
});
}
fn extract_tables_from_tql(tql: &Tql, names: &mut HashSet<ObjectName>) {
let promql = match tql {
Tql::Eval(eval) => &eval.query,

View File

@@ -0,0 +1,194 @@
CREATE TABLE metric_cte (
ts timestamp(3) time index,
val DOUBLE,
);
Affected Rows: 0
INSERT INTO TABLE metric_cte VALUES
(0::Timestamp, 0),
(5000::Timestamp, 1),
(10000::Timestamp, 2),
(15000::Timestamp, 3);
Affected Rows: 4
CREATE FLOW calc_cte SINK TO metric_cte_sink EVAL INTERVAL '1m' AS
WITH tql (ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM tql;
Affected Rows: 0
CREATE TABLE tql (
ts timestamp(3) time index,
val DOUBLE,
);
Affected Rows: 0
INSERT INTO TABLE tql VALUES
(0::Timestamp, 10),
(5000::Timestamp, 20),
(10000::Timestamp, 30),
(15000::Timestamp, 40);
Affected Rows: 4
-- Fail due to case sensitivity of CTE name
CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS
WITH "TQL"(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM TQL;
Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW
CREATE FLOW calc_cte_non_star SINK TO metric_cte_non_star_sink EVAL INTERVAL '1m' AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT ts FROM tql;
Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW
CREATE FLOW calc_cte_filter SINK TO metric_cte_filter_sink EVAL INTERVAL '1m' AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM tql WHERE ts > 0::timestamp;
Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW
CREATE FLOW calc_cte_mixed SINK TO metric_cte_mixed_sink EVAL INTERVAL '1m' AS
WITH s1 AS (SELECT 1),
tql(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM tql;
Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW
CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS
WITH "TQL"(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM "TQL";
Affected Rows: 0
SHOW CREATE TABLE metric_cte_sink;
+-----------------+---------------------------------------------------+
| Table | Create Table |
+-----------------+---------------------------------------------------+
| metric_cte_sink | CREATE TABLE IF NOT EXISTS "metric_cte_sink" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "the_value" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------------+---------------------------------------------------+
SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte';
+----------------------------+
| source_table_names |
+----------------------------+
| greptime.public.metric_cte |
+----------------------------+
SHOW CREATE TABLE metric_cte_join_sink;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| metric_cte_join_sink | CREATE TABLE IF NOT EXISTS "metric_cte_join_sink" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "the_value" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------------------+-----------------------------------------------------+
SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte_case';
+----------------------------+
| source_table_names |
+----------------------------+
| greptime.public.metric_cte |
+----------------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_cte');
+------------------------------+
| ADMIN FLUSH_FLOW('calc_cte') |
+------------------------------+
| FLOW_FLUSHED |
+------------------------------+
SELECT * FROM metric_cte_sink ORDER BY ts;
+---------------------+-----------+
| ts | the_value |
+---------------------+-----------+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:05 | 1.0 |
| 1970-01-01T00:00:10 | 2.0 |
| 1970-01-01T00:00:15 | 3.0 |
+---------------------+-----------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_cte_case');
+-----------------------------------+
| ADMIN FLUSH_FLOW('calc_cte_case') |
+-----------------------------------+
| FLOW_FLUSHED |
+-----------------------------------+
SELECT * FROM metric_cte_join_sink ORDER BY ts;
+---------------------+-----------+
| ts | the_value |
+---------------------+-----------+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:05 | 1.0 |
| 1970-01-01T00:00:10 | 2.0 |
| 1970-01-01T00:00:15 | 3.0 |
+---------------------+-----------+
DROP FLOW calc_cte;
Affected Rows: 0
DROP FLOW calc_cte_case;
Affected Rows: 0
DROP TABLE metric_cte;
Affected Rows: 0
DROP TABLE tql;
Affected Rows: 0
DROP TABLE metric_cte_sink;
Affected Rows: 0
DROP TABLE metric_cte_join_sink;
Affected Rows: 0

View File

@@ -0,0 +1 @@
../../standalone/flow-tql/flow_tql_cte.sql

View File

@@ -0,0 +1,194 @@
CREATE TABLE metric_cte (
ts timestamp(3) time index,
val DOUBLE,
);
Affected Rows: 0
INSERT INTO TABLE metric_cte VALUES
(0::Timestamp, 0),
(5000::Timestamp, 1),
(10000::Timestamp, 2),
(15000::Timestamp, 3);
Affected Rows: 4
CREATE FLOW calc_cte SINK TO metric_cte_sink EVAL INTERVAL '1m' AS
WITH tql (ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM tql;
Affected Rows: 0
CREATE TABLE tql (
ts timestamp(3) time index,
val DOUBLE,
);
Affected Rows: 0
INSERT INTO TABLE tql VALUES
(0::Timestamp, 10),
(5000::Timestamp, 20),
(10000::Timestamp, 30),
(15000::Timestamp, 40);
Affected Rows: 4
-- Fail due to case sensitivity of CTE name
CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS
WITH "TQL"(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM TQL;
Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW
CREATE FLOW calc_cte_non_star SINK TO metric_cte_non_star_sink EVAL INTERVAL '1m' AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT ts FROM tql;
Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW
CREATE FLOW calc_cte_filter SINK TO metric_cte_filter_sink EVAL INTERVAL '1m' AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM tql WHERE ts > 0::timestamp;
Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW
CREATE FLOW calc_cte_mixed SINK TO metric_cte_mixed_sink EVAL INTERVAL '1m' AS
WITH s1 AS (SELECT 1),
tql(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM tql;
Error: 1004(InvalidArguments), Invalid flow query: WITH is only supported for the simplest TQL CTE in CREATE FLOW
CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS
WITH "TQL"(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM "TQL";
Affected Rows: 0
SHOW CREATE TABLE metric_cte_sink;
+-----------------+---------------------------------------------------+
| Table | Create Table |
+-----------------+---------------------------------------------------+
| metric_cte_sink | CREATE TABLE IF NOT EXISTS "metric_cte_sink" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "the_value" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------------+---------------------------------------------------+
SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte';
+----------------------------+
| source_table_names |
+----------------------------+
| greptime.public.metric_cte |
+----------------------------+
SHOW CREATE TABLE metric_cte_join_sink;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| metric_cte_join_sink | CREATE TABLE IF NOT EXISTS "metric_cte_join_sink" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "the_value" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------------------+-----------------------------------------------------+
SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte_case';
+----------------------------+
| source_table_names |
+----------------------------+
| greptime.public.metric_cte |
+----------------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_cte');
+------------------------------+
| ADMIN FLUSH_FLOW('calc_cte') |
+------------------------------+
| FLOW_FLUSHED |
+------------------------------+
SELECT * FROM metric_cte_sink ORDER BY ts;
+---------------------+-----------+
| ts | the_value |
+---------------------+-----------+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:05 | 1.0 |
| 1970-01-01T00:00:10 | 2.0 |
| 1970-01-01T00:00:15 | 3.0 |
+---------------------+-----------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_cte_case');
+-----------------------------------+
| ADMIN FLUSH_FLOW('calc_cte_case') |
+-----------------------------------+
| FLOW_FLUSHED |
+-----------------------------------+
SELECT * FROM metric_cte_join_sink ORDER BY ts;
+---------------------+-----------+
| ts | the_value |
+---------------------+-----------+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:05 | 1.0 |
| 1970-01-01T00:00:10 | 2.0 |
| 1970-01-01T00:00:15 | 3.0 |
+---------------------+-----------+
DROP FLOW calc_cte;
Affected Rows: 0
DROP FLOW calc_cte_case;
Affected Rows: 0
DROP TABLE metric_cte;
Affected Rows: 0
DROP TABLE tql;
Affected Rows: 0
DROP TABLE metric_cte_sink;
Affected Rows: 0
DROP TABLE metric_cte_join_sink;
Affected Rows: 0

View File

@@ -0,0 +1,84 @@
CREATE TABLE metric_cte (
ts timestamp(3) time index,
val DOUBLE,
);
INSERT INTO TABLE metric_cte VALUES
(0::Timestamp, 0),
(5000::Timestamp, 1),
(10000::Timestamp, 2),
(15000::Timestamp, 3);
CREATE FLOW calc_cte SINK TO metric_cte_sink EVAL INTERVAL '1m' AS
WITH tql (ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM tql;
CREATE TABLE tql (
ts timestamp(3) time index,
val DOUBLE,
);
INSERT INTO TABLE tql VALUES
(0::Timestamp, 10),
(5000::Timestamp, 20),
(10000::Timestamp, 30),
(15000::Timestamp, 40);
-- Fail due to case sensitivity of CTE name
CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS
WITH "TQL"(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM TQL;
CREATE FLOW calc_cte_non_star SINK TO metric_cte_non_star_sink EVAL INTERVAL '1m' AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT ts FROM tql;
CREATE FLOW calc_cte_filter SINK TO metric_cte_filter_sink EVAL INTERVAL '1m' AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM tql WHERE ts > 0::timestamp;
CREATE FLOW calc_cte_mixed SINK TO metric_cte_mixed_sink EVAL INTERVAL '1m' AS
WITH s1 AS (SELECT 1),
tql(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM tql;
CREATE FLOW calc_cte_case SINK TO metric_cte_join_sink EVAL INTERVAL '1m' AS
WITH "TQL"(ts, the_value) AS (
TQL EVAL (now() - now(), now() - (now() - '15s'::interval), '5s') metric_cte
)
SELECT * FROM "TQL";
SHOW CREATE TABLE metric_cte_sink;
SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte';
SHOW CREATE TABLE metric_cte_join_sink;
SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_cte_case';
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_cte');
SELECT * FROM metric_cte_sink ORDER BY ts;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_cte_case');
SELECT * FROM metric_cte_join_sink ORDER BY ts;
DROP FLOW calc_cte;
DROP FLOW calc_cte_case;
DROP TABLE metric_cte;
DROP TABLE tql;
DROP TABLE metric_cte_sink;
DROP TABLE metric_cte_join_sink;