feat: support TQL CTE in planner (#6645)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-08-05 21:07:38 -07:00
committed by Zhenchi
parent b25a6527ed
commit d3f15e72bf
6 changed files with 556 additions and 10 deletions

View File

@@ -323,6 +323,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Column schema mismatch in CTE {}, original: {:?}, expected: {:?}",
cte_name,
original,
expected,
))]
CteColumnSchemaMismatch {
cte_name: String,
original: Vec<String>,
expected: Vec<String>,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -345,7 +359,8 @@ impl ErrorExt for Error {
| AddSystemTimeOverflow { .. }
| ColumnSchemaIncompatible { .. }
| UnsupportedVariable { .. }
| ColumnSchemaNoDefault { .. } => StatusCode::InvalidArguments,
| ColumnSchemaNoDefault { .. }
| CteColumnSchemaMismatch { .. } => StatusCode::InvalidArguments,
BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;
use async_trait::async_trait;
@@ -22,18 +23,24 @@ use common_telemetry::tracing;
use datafusion::common::DFSchema;
use datafusion::execution::context::SessionState;
use datafusion::sql::planner::PlannerContext;
use datafusion_expr::{Expr as DfExpr, LogicalPlan};
use datafusion_expr::{col, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder};
use datafusion_sql::planner::{ParserOptions, SqlToRel};
use log_query::LogQuery;
use promql_parser::parser::EvalStmt;
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use sql::ast::Expr as SqlExpr;
use sql::statements::query::Query;
use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
use sql::CteContent;
use crate::error::{PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu};
use crate::error::{
CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
UnimplementedSnafu,
};
use crate::log_query::planner::LogQueryPlanner;
use crate::parser::QueryStatement;
use crate::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING};
use crate::promql::planner::PromPlanner;
use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
use crate::range_select::plan_rewrite::RangePlanRewriter;
@@ -70,7 +77,27 @@ impl DfLogicalPlanner {
#[tracing::instrument(skip_all)]
async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let df_stmt = stmt.try_into().context(SqlSnafu)?;
let mut planner_context = PlannerContext::new();
let mut stmt = Cow::Borrowed(stmt);
let mut is_tql_cte = false;
// Check for hybrid CTEs before normal processing
if self.has_hybrid_ctes(stmt.as_ref()) {
let stmt_owned = stmt.into_owned();
let mut query = match stmt_owned {
Statement::Query(query) => query.as_ref().clone(),
_ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
};
self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
.await?;
// remove the processed TQL CTEs from the query
query.hybrid_cte = None;
stmt = Cow::Owned(Statement::Query(Box::new(query)));
is_tql_cte = true;
}
let df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
let table_provider = DfTableSourceProvider::new(
self.engine_state.catalog_manager().clone(),
@@ -106,9 +133,22 @@ impl DfLogicalPlanner {
let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
let result = sql_to_rel
.statement_to_plan(df_stmt)
.context(PlanSqlSnafu)?;
// this IF is to handle different version of ASTs
let result = if is_tql_cte {
let Statement::Query(query) = stmt.into_owned() else {
unreachable!("is_tql_cte should only be true for Query statements");
};
let sqlparser_stmt =
datafusion::sql::sqlparser::ast::Statement::Query(Box::new(query.inner.into()));
sql_to_rel
.sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
.context(PlanSqlSnafu)?
} else {
sql_to_rel
.statement_to_plan(df_stmt)
.context(PlanSqlSnafu)?
};
common_telemetry::debug!("Logical planner, statement to plan result: {result}");
let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
.rewrite(result)
@@ -182,6 +222,108 @@ impl DfLogicalPlanner {
fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
Ok(self.engine_state.optimize_logical_plan(plan)?)
}
/// Check if a statement contains hybrid CTEs (mix of SQL and TQL)
fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
if let Statement::Query(query) = stmt {
query
.hybrid_cte
.as_ref()
.map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
.unwrap_or(false)
} else {
false
}
}
/// Plan a query with hybrid CTEs using DataFusion's native PlannerContext
async fn plan_query_with_hybrid_ctes(
&self,
query: &Query,
query_ctx: QueryContextRef,
planner_context: &mut PlannerContext,
) -> Result<()> {
let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
for cte in &hybrid_cte.cte_tables {
match &cte.content {
CteContent::Tql(tql) => {
// Plan TQL and register in PlannerContext
let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
if !cte.columns.is_empty() {
let schema = logical_plan.schema();
let schema_fields = schema.fields().to_vec();
ensure!(
schema_fields.len() == cte.columns.len(),
CteColumnSchemaMismatchSnafu {
cte_name: cte.name.value.clone(),
original: schema_fields
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>(),
expected: cte
.columns
.iter()
.map(|column| column.to_string())
.collect::<Vec<_>>(),
}
);
let aliases = cte
.columns
.iter()
.zip(schema_fields.iter())
.map(|(column, field)| col(field.name()).alias(column.to_string()));
logical_plan = LogicalPlanBuilder::from(logical_plan)
.project(aliases)
.context(PlanSqlSnafu)?
.build()
.context(PlanSqlSnafu)?;
}
planner_context.insert_cte(&cte.name.value, logical_plan);
}
CteContent::Sql(_) => {
// SQL CTEs should have been moved to the main query's WITH clause
// during parsing, so we shouldn't encounter them here
unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
}
}
}
Ok(())
}
/// Convert TQL to LogicalPlan directly
async fn tql_to_logical_plan(
&self,
tql: &Tql,
query_ctx: QueryContextRef,
) -> Result<LogicalPlan> {
match tql {
Tql::Eval(eval) => {
// Convert TqlEval to PromQuery then to QueryStatement::Promql
let prom_query = PromQuery {
query: eval.query.clone(),
start: eval.start.clone(),
end: eval.end.clone(),
step: eval.step.clone(),
lookback: eval
.lookback
.clone()
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
};
let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
self.plan(&stmt, query_ctx).await
}
Tql::Explain(_) => UnimplementedSnafu {
operation: "TQL EXPLAIN in CTEs",
}
.fail(),
Tql::Analyze(_) => UnimplementedSnafu {
operation: "TQL ANALYZE in CTEs",
}
.fail(),
}
}
}
#[async_trait]

View File

@@ -28,3 +28,4 @@ pub mod util;
pub use parsers::create_parser::{ENGINE, MAXVALUE};
pub use parsers::tql_parser::TQL;
pub use parsers::with_tql_parser::{CteContent, HybridCteWith};

View File

@@ -32,4 +32,4 @@ pub(crate) mod show_parser;
pub(crate) mod tql_parser;
pub(crate) mod truncate_parser;
pub(crate) mod utils;
pub(crate) mod with_tql_parser;
pub mod with_tql_parser;