From d3f15e72bf2fb412fc86155cecc8b34c14e5b5f4 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 5 Aug 2025 21:07:38 -0700 Subject: [PATCH] feat: support TQL CTE in planner (#6645) Signed-off-by: Ruihang Xia --- src/query/src/error.rs | 17 +- src/query/src/planner.rs | 158 ++++++++++- src/sql/src/lib.rs | 1 + src/sql/src/parsers.rs | 2 +- .../standalone/common/tql/tql-cte.result | 247 ++++++++++++++++++ tests/cases/standalone/common/tql/tql-cte.sql | 141 ++++++++++ 6 files changed, 556 insertions(+), 10 deletions(-) create mode 100644 tests/cases/standalone/common/tql/tql-cte.result create mode 100644 tests/cases/standalone/common/tql/tql-cte.sql diff --git a/src/query/src/error.rs b/src/query/src/error.rs index c2a2e960b0..1efc4bf470 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -323,6 +323,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Column schema mismatch in CTE {}, original: {:?}, expected: {:?}", + cte_name, + original, + expected, + ))] + CteColumnSchemaMismatch { + cte_name: String, + original: Vec, + expected: Vec, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -345,7 +359,8 @@ impl ErrorExt for Error { | AddSystemTimeOverflow { .. } | ColumnSchemaIncompatible { .. } | UnsupportedVariable { .. } - | ColumnSchemaNoDefault { .. } => StatusCode::InvalidArguments, + | ColumnSchemaNoDefault { .. } + | CteColumnSchemaMismatch { .. } => StatusCode::InvalidArguments, BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index b3f46c060b..69a3ed2968 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::borrow::Cow; use std::sync::Arc; use async_trait::async_trait; @@ -22,18 +23,24 @@ use common_telemetry::tracing; use datafusion::common::DFSchema; use datafusion::execution::context::SessionState; use datafusion::sql::planner::PlannerContext; -use datafusion_expr::{Expr as DfExpr, LogicalPlan}; +use datafusion_expr::{col, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder}; use datafusion_sql::planner::{ParserOptions, SqlToRel}; use log_query::LogQuery; use promql_parser::parser::EvalStmt; use session::context::QueryContextRef; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use sql::ast::Expr as SqlExpr; +use sql::statements::query::Query; use sql::statements::statement::Statement; +use sql::statements::tql::Tql; +use sql::CteContent; -use crate::error::{PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu}; +use crate::error::{ + CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu, + UnimplementedSnafu, +}; use crate::log_query::planner::LogQueryPlanner; -use crate::parser::QueryStatement; +use crate::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING}; use crate::promql::planner::PromPlanner; use crate::query_engine::{DefaultPlanDecoder, QueryEngineState}; use crate::range_select::plan_rewrite::RangePlanRewriter; @@ -70,7 +77,27 @@ impl DfLogicalPlanner { #[tracing::instrument(skip_all)] async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result { - let df_stmt = stmt.try_into().context(SqlSnafu)?; + let mut planner_context = PlannerContext::new(); + let mut stmt = Cow::Borrowed(stmt); + let mut is_tql_cte = false; + + // Check for hybrid CTEs before normal processing + if self.has_hybrid_ctes(stmt.as_ref()) { + let stmt_owned = stmt.into_owned(); + let mut query = match stmt_owned { + Statement::Query(query) => query.as_ref().clone(), + _ => unreachable!("has_hybrid_ctes should only return true for Query statements"), + }; + self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context) + .await?; + + // remove the processed TQL CTEs from the query + query.hybrid_cte = None; + stmt = Cow::Owned(Statement::Query(Box::new(query))); + is_tql_cte = true; + } + + let df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?; let table_provider = DfTableSourceProvider::new( self.engine_state.catalog_manager().clone(), @@ -106,9 +133,22 @@ impl DfLogicalPlanner { let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options); - let result = sql_to_rel - .statement_to_plan(df_stmt) - .context(PlanSqlSnafu)?; + // this IF is to handle different version of ASTs + let result = if is_tql_cte { + let Statement::Query(query) = stmt.into_owned() else { + unreachable!("is_tql_cte should only be true for Query statements"); + }; + let sqlparser_stmt = + datafusion::sql::sqlparser::ast::Statement::Query(Box::new(query.inner.into())); + sql_to_rel + .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context) + .context(PlanSqlSnafu)? + } else { + sql_to_rel + .statement_to_plan(df_stmt) + .context(PlanSqlSnafu)? + }; + common_telemetry::debug!("Logical planner, statement to plan result: {result}"); let plan = RangePlanRewriter::new(table_provider, query_ctx.clone()) .rewrite(result) @@ -182,6 +222,108 @@ impl DfLogicalPlanner { fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result { Ok(self.engine_state.optimize_logical_plan(plan)?) } + + /// Check if a statement contains hybrid CTEs (mix of SQL and TQL) + fn has_hybrid_ctes(&self, stmt: &Statement) -> bool { + if let Statement::Query(query) = stmt { + query + .hybrid_cte + .as_ref() + .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty()) + .unwrap_or(false) + } else { + false + } + } + + /// Plan a query with hybrid CTEs using DataFusion's native PlannerContext + async fn plan_query_with_hybrid_ctes( + &self, + query: &Query, + query_ctx: QueryContextRef, + planner_context: &mut PlannerContext, + ) -> Result<()> { + let hybrid_cte = query.hybrid_cte.as_ref().unwrap(); + + for cte in &hybrid_cte.cte_tables { + match &cte.content { + CteContent::Tql(tql) => { + // Plan TQL and register in PlannerContext + let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?; + if !cte.columns.is_empty() { + let schema = logical_plan.schema(); + let schema_fields = schema.fields().to_vec(); + ensure!( + schema_fields.len() == cte.columns.len(), + CteColumnSchemaMismatchSnafu { + cte_name: cte.name.value.clone(), + original: schema_fields + .iter() + .map(|field| field.name().to_string()) + .collect::>(), + expected: cte + .columns + .iter() + .map(|column| column.to_string()) + .collect::>(), + } + ); + let aliases = cte + .columns + .iter() + .zip(schema_fields.iter()) + .map(|(column, field)| col(field.name()).alias(column.to_string())); + logical_plan = LogicalPlanBuilder::from(logical_plan) + .project(aliases) + .context(PlanSqlSnafu)? + .build() + .context(PlanSqlSnafu)?; + } + planner_context.insert_cte(&cte.name.value, logical_plan); + } + CteContent::Sql(_) => { + // SQL CTEs should have been moved to the main query's WITH clause + // during parsing, so we shouldn't encounter them here + unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables"); + } + } + } + + Ok(()) + } + + /// Convert TQL to LogicalPlan directly + async fn tql_to_logical_plan( + &self, + tql: &Tql, + query_ctx: QueryContextRef, + ) -> Result { + match tql { + Tql::Eval(eval) => { + // Convert TqlEval to PromQuery then to QueryStatement::Promql + let prom_query = PromQuery { + query: eval.query.clone(), + start: eval.start.clone(), + end: eval.end.clone(), + step: eval.step.clone(), + lookback: eval + .lookback + .clone() + .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()), + }; + let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?; + self.plan(&stmt, query_ctx).await + } + Tql::Explain(_) => UnimplementedSnafu { + operation: "TQL EXPLAIN in CTEs", + } + .fail(), + Tql::Analyze(_) => UnimplementedSnafu { + operation: "TQL ANALYZE in CTEs", + } + .fail(), + } + } } #[async_trait] diff --git a/src/sql/src/lib.rs b/src/sql/src/lib.rs index 90501ece82..bfa56c5e42 100644 --- a/src/sql/src/lib.rs +++ b/src/sql/src/lib.rs @@ -28,3 +28,4 @@ pub mod util; pub use parsers::create_parser::{ENGINE, MAXVALUE}; pub use parsers::tql_parser::TQL; +pub use parsers::with_tql_parser::{CteContent, HybridCteWith}; diff --git a/src/sql/src/parsers.rs b/src/sql/src/parsers.rs index 2c1e080e77..1d9aeb7dec 100644 --- a/src/sql/src/parsers.rs +++ b/src/sql/src/parsers.rs @@ -32,4 +32,4 @@ pub(crate) mod show_parser; pub(crate) mod tql_parser; pub(crate) mod truncate_parser; pub(crate) mod utils; -pub(crate) mod with_tql_parser; +pub mod with_tql_parser; diff --git a/tests/cases/standalone/common/tql/tql-cte.result b/tests/cases/standalone/common/tql/tql-cte.result new file mode 100644 index 0000000000..59943b92f8 --- /dev/null +++ b/tests/cases/standalone/common/tql/tql-cte.result @@ -0,0 +1,247 @@ +create table metric (ts timestamp(3) time index, val double); + +Affected Rows: 0 + +create table labels (ts timestamp(3) time index, host string primary key, cpu double); + +Affected Rows: 0 + +insert into metric values + (0,0), + (10000,8), + (20000,8), + (30000,2), + (40000,3); + +Affected Rows: 5 + +insert into labels values + ('1970-01-01 00:00:00', 'host1', 0.1), + ('1970-01-01 00:00:10', 'host1', 0.8), + ('1970-01-01 00:00:20', 'host1', 0.8), + ('1970-01-01 00:00:30', 'host1', 0.2), + ('1970-01-01 00:00:40', 'host1', 0.3), + ('1970-01-01 00:00:00', 'host2', 0.2), + ('1970-01-01 00:00:10', 'host2', 0.9), + ('1970-01-01 00:00:20', 'host2', 0.7), + ('1970-01-01 00:00:30', 'host2', 0.4), + ('1970-01-01 00:00:40', 'host2', 0.5); + +Affected Rows: 10 + +-- Basic TQL CTE without column aliases +WITH tql as ( + TQL EVAL (0, 40, '10s') metric +) +SELECT * FROM tql; + ++---------------------+-----+ +| ts | val | ++---------------------+-----+ +| 1970-01-01T00:00:00 | 0.0 | +| 1970-01-01T00:00:10 | 8.0 | +| 1970-01-01T00:00:20 | 8.0 | +| 1970-01-01T00:00:30 | 2.0 | +| 1970-01-01T00:00:40 | 3.0 | ++---------------------+-----+ + +-- TQL CTE with column aliases +WITH tql (the_timestamp, the_value) as ( + TQL EVAL (0, 40, '10s') metric +) +SELECT * FROM tql; + ++---------------------+-----------+ +| the_timestamp | the_value | ++---------------------+-----------+ +| 1970-01-01T00:00:00 | 0.0 | +| 1970-01-01T00:00:10 | 8.0 | +| 1970-01-01T00:00:20 | 8.0 | +| 1970-01-01T00:00:30 | 2.0 | +| 1970-01-01T00:00:40 | 3.0 | ++---------------------+-----------+ + +-- Hybrid CTEs (TQL + SQL) +WITH + tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') metric), + filtered AS (SELECT * FROM tql_data WHERE val > 5) +SELECT count(*) FROM filtered; + ++----------+ +| count(*) | ++----------+ +| 2 | ++----------+ + +-- TODO(ruihang): The following tests are not supported yet, need to fix parser first. +-- TQL CTE with complex PromQL expressions +-- WITH +-- tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])), +-- filtered (ts, val) AS (SELECT * FROM tql_data WHERE val > 0) +-- SELECT sum(val) FROM filtered; +-- TQL CTE with aggregation functions +-- WITH tql_agg AS ( +-- TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"}) +-- ) +-- SELECT avg(val) as avg_sum FROM tql_agg; +-- TQL CTE with label selectors +WITH host_metrics AS ( + TQL EVAL (0, 40, '10s') labels{host="host1"} +) +SELECT count(*) as host1_points FROM host_metrics; + ++--------------+ +| host1_points | ++--------------+ +| 5 | ++--------------+ + +-- Multiple TQL CTEs referencing different tables +WITH + metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric), + label_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"}) +SELECT + m.val as metric_val, + l.cpu as label_val +FROM metric_data m, label_data l +WHERE m.ts = l.ts +ORDER BY m.ts +LIMIT 3; + ++------------+-----------+ +| metric_val | label_val | ++------------+-----------+ +| 0.0 | 0.2 | +| 8.0 | 0.9 | +| 8.0 | 0.7 | ++------------+-----------+ + +-- TQL CTE with mathematical operations +WITH computed(ts, val) AS ( + TQL EVAL (0, 40, '10s') metric * 2 + 1 +) +SELECT min(val) as min_computed, max(val) as max_computed FROM computed; + ++--------------+--------------+ +| min_computed | max_computed | ++--------------+--------------+ +| 1.0 | 17.0 | ++--------------+--------------+ + +-- TQL CTE with window functions in SQL part +WITH tql_base(ts, val) AS ( + TQL EVAL (0, 40, '10s') metric +) +SELECT + ts, + val, + LAG(val, 1) OVER (ORDER BY ts) as prev_value +FROM tql_base +ORDER BY ts; + ++---------------------+-----+------------+ +| ts | val | prev_value | ++---------------------+-----+------------+ +| 1970-01-01T00:00:00 | 0.0 | | +| 1970-01-01T00:00:10 | 8.0 | 0.0 | +| 1970-01-01T00:00:20 | 8.0 | 8.0 | +| 1970-01-01T00:00:30 | 2.0 | 8.0 | +| 1970-01-01T00:00:40 | 3.0 | 2.0 | ++---------------------+-----+------------+ + +-- TQL CTE with HAVING clause +WITH tql_grouped(ts, host, cpu) AS ( + TQL EVAL (0, 40, '10s') labels +) +SELECT + DATE_TRUNC('minute', ts) as minute, + count(*) as point_count +FROM tql_grouped +GROUP BY minute +HAVING count(*) > 1; + ++---------------------+-------------+ +| minute | point_count | ++---------------------+-------------+ +| 1970-01-01T00:00:00 | 10 | ++---------------------+-------------+ + +-- TQL CTE with UNION +-- SQLNESS SORT_RESULT 3 1 +WITH + host1_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host1"}), + host2_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"}) +SELECT 'host1' as source, ts, cpu FROM host1_data +UNION ALL +SELECT 'host2' as source, ts, cpu FROM host2_data; + ++--------+---------------------+-----+ +| source | ts | cpu | ++--------+---------------------+-----+ +| host1 | 1970-01-01T00:00:00 | 0.1 | +| host1 | 1970-01-01T00:00:10 | 0.8 | +| host1 | 1970-01-01T00:00:20 | 0.8 | +| host1 | 1970-01-01T00:00:30 | 0.2 | +| host1 | 1970-01-01T00:00:40 | 0.3 | +| host2 | 1970-01-01T00:00:00 | 0.2 | +| host2 | 1970-01-01T00:00:10 | 0.9 | +| host2 | 1970-01-01T00:00:20 | 0.7 | +| host2 | 1970-01-01T00:00:30 | 0.4 | +| host2 | 1970-01-01T00:00:40 | 0.5 | ++--------+---------------------+-----+ + +-- Nested CTEs with TQL +WITH + base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric), + processed(ts, percent) AS ( + SELECT ts, val * 100 as percent + FROM base_tql + WHERE val > 0 + ), + final(ts, percent) AS ( + SELECT * FROM processed WHERE percent > 200 + ) +SELECT count(*) as high_values FROM final; + ++-------------+ +| high_values | ++-------------+ +| 3 | ++-------------+ + +-- Error case - TQL ANALYZE should fail +WITH tql_analyze AS ( + TQL ANALYZE (0, 40, '10s') metric +) +SELECT * FROM tql_analyze limit 1; + +Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs + +-- Error case - TQL EXPLAIN should fail +WITH tql_explain AS ( + TQL EXPLAIN (0, 40, '10s') metric +) +SELECT * FROM tql_explain limit 1; + +Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs + +-- TQL CTE with lookback parameter +WITH tql_lookback AS ( + TQL EVAL (0, 40, '10s', 15s) metric +) +SELECT count(*) FROM tql_lookback; + ++----------+ +| count(*) | ++----------+ +| 5 | ++----------+ + +drop table metric; + +Affected Rows: 0 + +drop table labels; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/tql/tql-cte.sql b/tests/cases/standalone/common/tql/tql-cte.sql new file mode 100644 index 0000000000..de5cc6cead --- /dev/null +++ b/tests/cases/standalone/common/tql/tql-cte.sql @@ -0,0 +1,141 @@ +create table metric (ts timestamp(3) time index, val double); +create table labels (ts timestamp(3) time index, host string primary key, cpu double); + +insert into metric values + (0,0), + (10000,8), + (20000,8), + (30000,2), + (40000,3); + +insert into labels values + ('1970-01-01 00:00:00', 'host1', 0.1), + ('1970-01-01 00:00:10', 'host1', 0.8), + ('1970-01-01 00:00:20', 'host1', 0.8), + ('1970-01-01 00:00:30', 'host1', 0.2), + ('1970-01-01 00:00:40', 'host1', 0.3), + ('1970-01-01 00:00:00', 'host2', 0.2), + ('1970-01-01 00:00:10', 'host2', 0.9), + ('1970-01-01 00:00:20', 'host2', 0.7), + ('1970-01-01 00:00:30', 'host2', 0.4), + ('1970-01-01 00:00:40', 'host2', 0.5); + +-- Basic TQL CTE without column aliases +WITH tql as ( + TQL EVAL (0, 40, '10s') metric +) +SELECT * FROM tql; + +-- TQL CTE with column aliases +WITH tql (the_timestamp, the_value) as ( + TQL EVAL (0, 40, '10s') metric +) +SELECT * FROM tql; + +-- Hybrid CTEs (TQL + SQL) +WITH + tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') metric), + filtered AS (SELECT * FROM tql_data WHERE val > 5) +SELECT count(*) FROM filtered; + +-- TODO(ruihang): The following tests are not supported yet, need to fix parser first. +-- TQL CTE with complex PromQL expressions +-- WITH +-- tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])), +-- filtered (ts, val) AS (SELECT * FROM tql_data WHERE val > 0) +-- SELECT sum(val) FROM filtered; + +-- TQL CTE with aggregation functions +-- WITH tql_agg AS ( +-- TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"}) +-- ) +-- SELECT avg(val) as avg_sum FROM tql_agg; + +-- TQL CTE with label selectors +WITH host_metrics AS ( + TQL EVAL (0, 40, '10s') labels{host="host1"} +) +SELECT count(*) as host1_points FROM host_metrics; + +-- Multiple TQL CTEs referencing different tables +WITH + metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric), + label_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"}) +SELECT + m.val as metric_val, + l.cpu as label_val +FROM metric_data m, label_data l +WHERE m.ts = l.ts +ORDER BY m.ts +LIMIT 3; + +-- TQL CTE with mathematical operations +WITH computed(ts, val) AS ( + TQL EVAL (0, 40, '10s') metric * 2 + 1 +) +SELECT min(val) as min_computed, max(val) as max_computed FROM computed; + +-- TQL CTE with window functions in SQL part +WITH tql_base(ts, val) AS ( + TQL EVAL (0, 40, '10s') metric +) +SELECT + ts, + val, + LAG(val, 1) OVER (ORDER BY ts) as prev_value +FROM tql_base +ORDER BY ts; + +-- TQL CTE with HAVING clause +WITH tql_grouped(ts, host, cpu) AS ( + TQL EVAL (0, 40, '10s') labels +) +SELECT + DATE_TRUNC('minute', ts) as minute, + count(*) as point_count +FROM tql_grouped +GROUP BY minute +HAVING count(*) > 1; + +-- TQL CTE with UNION +-- SQLNESS SORT_RESULT 3 1 +WITH + host1_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host1"}), + host2_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"}) +SELECT 'host1' as source, ts, cpu FROM host1_data +UNION ALL +SELECT 'host2' as source, ts, cpu FROM host2_data; + +-- Nested CTEs with TQL +WITH + base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric), + processed(ts, percent) AS ( + SELECT ts, val * 100 as percent + FROM base_tql + WHERE val > 0 + ), + final(ts, percent) AS ( + SELECT * FROM processed WHERE percent > 200 + ) +SELECT count(*) as high_values FROM final; + +-- Error case - TQL ANALYZE should fail +WITH tql_analyze AS ( + TQL ANALYZE (0, 40, '10s') metric +) +SELECT * FROM tql_analyze limit 1; + +-- Error case - TQL EXPLAIN should fail +WITH tql_explain AS ( + TQL EXPLAIN (0, 40, '10s') metric +) +SELECT * FROM tql_explain limit 1; + +-- TQL CTE with lookback parameter +WITH tql_lookback AS ( + TQL EVAL (0, 40, '10s', 15s) metric +) +SELECT count(*) FROM tql_lookback; + +drop table metric; +drop table labels;