From ba034c5a9ee2e5f9c706392a04e8abb90ff67ed8 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Sat, 11 Oct 2025 14:27:51 +0800 Subject: [PATCH] feat: explain custom statement (#7058) * feat: explain tql cte Signed-off-by: discord9 * chore: unused Signed-off-by: discord9 * fix: analyze format Signed-off-by: discord9 * Update src/sql/src/statements/statement.rs Co-authored-by: Yingwen Signed-off-by: discord9 * test: sqlness Signed-off-by: discord9 * pcr Signed-off-by: discord9 --------- Signed-off-by: discord9 Co-authored-by: Yingwen --- src/query/src/planner.rs | 73 ++- src/sql/src/parsers/explain_parser.rs | 83 +-- src/sql/src/statements/explain.rs | 46 +- src/sql/src/statements/statement.rs | 5 +- .../standalone/common/tql/tql-cte.result | 615 +++++++++++++++++- tests/cases/standalone/common/tql/tql-cte.sql | 177 +++++ 6 files changed, 935 insertions(+), 64 deletions(-) diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 3952c3ab4c..abf6b99bd1 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -20,10 +20,14 @@ use async_trait::async_trait; use catalog::table_source::DfTableSourceProvider; use common_error::ext::BoxedError; use common_telemetry::tracing; -use datafusion::common::DFSchema; +use datafusion::common::{DFSchema, plan_err}; use datafusion::execution::context::SessionState; use datafusion::sql::planner::PlannerContext; -use datafusion_expr::{Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, col}; +use datafusion_common::ToDFSchema; +use datafusion_expr::{ + Analyze, Explain, ExplainFormat, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, PlanType, + ToStringifiedPlan, col, +}; use datafusion_sql::planner::{ParserOptions, SqlToRel}; use log_query::LogQuery; use promql_parser::parser::EvalStmt; @@ -31,6 +35,7 @@ use session::context::QueryContextRef; use snafu::{ResultExt, ensure}; use sql::CteContent; use sql::ast::Expr as SqlExpr; +use sql::statements::explain::ExplainStatement; use sql::statements::query::Query; use sql::statements::statement::Statement; use sql::statements::tql::Tql; @@ -75,12 +80,69 @@ impl DfLogicalPlanner { } } + /// Basically the same with `explain_to_plan` in DataFusion, but adapted to Greptime's + /// `plan_sql` to support Greptime Statements. + async fn explain_to_plan( + &self, + explain: &ExplainStatement, + query_ctx: QueryContextRef, + ) -> Result { + let plan = self.plan_sql(&explain.statement, query_ctx).await?; + if matches!(plan, LogicalPlan::Explain(_)) { + return plan_err!("Nested EXPLAINs are not supported").context(PlanSqlSnafu); + } + + let verbose = explain.verbose; + let analyze = explain.analyze; + let format = explain.format.map(|f| f.to_string()); + + let plan = Arc::new(plan); + let schema = LogicalPlan::explain_schema(); + let schema = ToDFSchema::to_dfschema_ref(schema)?; + + if verbose && format.is_some() { + return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported").context(PlanSqlSnafu); + } + + if analyze { + // notice format is already set in query context, so can be ignore here + Ok(LogicalPlan::Analyze(Analyze { + verbose, + input: plan, + schema, + })) + } else { + let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)]; + + // default to configuration value + let options = self.session_state.config().options(); + let format = format.as_ref().unwrap_or(&options.explain.format); + + let format: ExplainFormat = format.parse()?; + + Ok(LogicalPlan::Explain(Explain { + verbose, + explain_format: format, + plan, + stringified_plans, + schema, + logical_optimization_succeeded: false, + })) + } + } + #[tracing::instrument(skip_all)] + #[async_recursion::async_recursion] async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result { let mut planner_context = PlannerContext::new(); let mut stmt = Cow::Borrowed(stmt); let mut is_tql_cte = false; + // handle explain before normal processing so we can explain Greptime Statements + if let Statement::Explain(explain) = stmt.as_ref() { + return self.explain_to_plan(explain, query_ctx).await; + } + // Check for hybrid CTEs before normal processing if self.has_hybrid_ctes(stmt.as_ref()) { let stmt_owned = stmt.into_owned(); @@ -101,10 +163,13 @@ impl DfLogicalPlanner { // TODO(LFC): Remove this when Datafusion supports **both** the syntax and implementation of "explain with format". if let datafusion::sql::parser::Statement::Statement( - box datafusion::sql::sqlparser::ast::Statement::Explain { format, .. }, + box datafusion::sql::sqlparser::ast::Statement::Explain { .. }, ) = &mut df_stmt { - format.take(); + UnimplementedSnafu { + operation: "EXPLAIN with FORMAT using raw datafusion planner", + } + .fail()?; } let table_provider = DfTableSourceProvider::new( diff --git a/src/sql/src/parsers/explain_parser.rs b/src/sql/src/parsers/explain_parser.rs index e3fd05cad6..c595efc86d 100644 --- a/src/sql/src/parsers/explain_parser.rs +++ b/src/sql/src/parsers/explain_parser.rs @@ -13,37 +13,47 @@ // limitations under the License. use snafu::ResultExt; -use sqlparser::ast::DescribeAlias; +use sqlparser::keywords::Keyword; use crate::error::{self, Result}; use crate::parser::ParserContext; -use crate::statements::explain::Explain; +use crate::statements::explain::ExplainStatement; use crate::statements::statement::Statement; /// EXPLAIN statement parser implementation impl ParserContext<'_> { + /// explain that support use our own parser to parse the inner statement pub(crate) fn parse_explain(&mut self) -> Result { - let explain_statement = self - .parser - .parse_explain(DescribeAlias::Explain) - .with_context(|_| error::UnexpectedSnafu { - expected: "a query statement", - actual: self.peek_token_as_string(), - })?; + let analyze = self.parser.parse_keyword(Keyword::ANALYZE); + let verbose = self.parser.parse_keyword(Keyword::VERBOSE); + let format = + if self.parser.parse_keyword(Keyword::FORMAT) { + Some(self.parser.parse_analyze_format().with_context(|_| { + error::UnexpectedSnafu { + expected: "analyze format", + actual: self.peek_token_as_string(), + } + })?) + } else { + None + }; - Ok(Statement::Explain(Box::new(Explain::try_from( - explain_statement, - )?))) + let statement = self.parse_statement()?; + + let explain = ExplainStatement { + analyze, + verbose, + format, + statement: Box::new(statement), + }; + Ok(Statement::Explain(Box::new(explain))) } } #[cfg(test)] mod tests { use sqlparser::ast::helpers::attached_token::AttachedToken; - use sqlparser::ast::{ - GroupByExpr, Query as SpQuery, SelectFlavor, Statement as SpStatement, - WildcardAdditionalOptions, - }; + use sqlparser::ast::{GroupByExpr, Query as SpQuery, SelectFlavor, WildcardAdditionalOptions}; use super::*; use crate::dialect::GreptimeDbDialect; @@ -97,31 +107,30 @@ mod tests { flavor: SelectFlavor::Standard, }; - let sp_statement = SpStatement::Query(Box::new(SpQuery { - with: None, - body: Box::new(sqlparser::ast::SetExpr::Select(Box::new(select))), - order_by: None, - limit: None, - limit_by: vec![], - offset: None, - fetch: None, - locks: vec![], - for_clause: None, - settings: None, - format_clause: None, - })); + let sp_query = Box::new( + SpQuery { + with: None, + body: Box::new(sqlparser::ast::SetExpr::Select(Box::new(select))), + order_by: None, + limit: None, + limit_by: vec![], + offset: None, + fetch: None, + locks: vec![], + for_clause: None, + settings: None, + format_clause: None, + } + .try_into() + .unwrap(), + ); - let explain = Explain::try_from(SpStatement::Explain { - describe_alias: DescribeAlias::Explain, + let explain = ExplainStatement { analyze: false, verbose: false, - statement: Box::new(sp_statement), format: None, - query_plan: false, - options: None, - estimate: false, - }) - .unwrap(); + statement: Box::new(Statement::Query(sp_query)), + }; assert_eq!(stmts[0], Statement::Explain(Box::new(explain))) } diff --git a/src/sql/src/statements/explain.rs b/src/sql/src/statements/explain.rs index c893321fdb..b311f55fe0 100644 --- a/src/sql/src/statements/explain.rs +++ b/src/sql/src/statements/explain.rs @@ -15,36 +15,44 @@ use std::fmt::{Display, Formatter}; use serde::Serialize; -use sqlparser::ast::{AnalyzeFormat, Statement as SpStatement}; +use sqlparser::ast::AnalyzeFormat; use sqlparser_derive::{Visit, VisitMut}; -use crate::error::Error; +use crate::statements::statement::Statement; /// Explain statement. #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] -pub struct Explain { - pub inner: SpStatement, +pub struct ExplainStatement { + /// `EXPLAIN ANALYZE ..` + pub analyze: bool, + /// `EXPLAIN .. VERBOSE ..` + pub verbose: bool, + /// `EXPLAIN .. FORMAT ` + pub format: Option, + /// The statement to analyze. Note this is a Greptime [`Statement`] (not a + /// [`sqlparser::ast::Statement`] so that we can use + /// Greptime specific statements + pub statement: Box, } -impl Explain { +impl ExplainStatement { pub fn format(&self) -> Option { - match self.inner { - SpStatement::Explain { format, .. } => format, - _ => None, - } + self.format } } -impl TryFrom for Explain { - type Error = Error; - - fn try_from(value: SpStatement) -> Result { - Ok(Explain { inner: value }) - } -} - -impl Display for Explain { +impl Display for ExplainStatement { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.inner) + write!(f, "EXPLAIN")?; + if self.analyze { + write!(f, " ANALYZE")?; + } + if self.verbose { + write!(f, " VERBOSE")?; + } + if let Some(format) = &self.format { + write!(f, " FORMAT {}", format)?; + } + write!(f, " {}", self.statement) } } diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 3f582a9f30..d0096baa71 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -30,7 +30,7 @@ use crate::statements::cursor::{CloseCursor, DeclareCursor, FetchCursor}; use crate::statements::delete::Delete; use crate::statements::describe::DescribeTable; use crate::statements::drop::{DropDatabase, DropFlow, DropTable, DropView}; -use crate::statements::explain::Explain; +use crate::statements::explain::ExplainStatement; use crate::statements::insert::Insert; use crate::statements::kill::Kill; use crate::statements::query::Query; @@ -126,7 +126,7 @@ pub enum Statement { // DESCRIBE TABLE DescribeTable(DescribeTable), // EXPLAIN QUERY - Explain(Box), + Explain(Box), // COPY Copy(Copy), // Telemetry Query Language @@ -301,7 +301,6 @@ impl TryFrom<&Statement> for DfStatement { fn try_from(s: &Statement) -> Result { let s = match s { Statement::Query(query) => SpStatement::Query(Box::new(query.inner.clone())), - Statement::Explain(explain) => explain.inner.clone(), Statement::Insert(insert) => insert.inner.clone(), Statement::Delete(delete) => delete.inner.clone(), _ => { diff --git a/tests/cases/standalone/common/tql/tql-cte.result b/tests/cases/standalone/common/tql/tql-cte.result index 30abd5e6f1..76e4b511db 100644 --- a/tests/cases/standalone/common/tql/tql-cte.result +++ b/tests/cases/standalone/common/tql/tql-cte.result @@ -45,6 +45,29 @@ SELECT * FROM tql; | 1970-01-01T00:00:40 | 3.0 | +---------------------+-----+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql as ( + TQL EVAL (0, 40, '10s') metric +) +SELECT * FROM tql; + ++---------------+----------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: tql.ts, tql.val | +| | SubqueryAlias: tql | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with column aliases WITH tql (the_timestamp, the_value) as ( TQL EVAL (0, 40, '10s') metric @@ -61,12 +84,52 @@ SELECT * FROM tql; | 1970-01-01T00:00:40 | 3.0 | +---------------------+-----------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql (the_timestamp, the_value) as ( + TQL EVAL (0, 40, '10s') metric +) +SELECT * FROM tql; + ++---------------+------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: tql.the_timestamp, tql.the_value | +| | SubqueryAlias: tql | +| | Projection: metric.ts AS the_timestamp, metric.val AS the_value | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------+ + -- Explain TQL CTE +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED EXPLAIN WITH tql AS ( TQL EVAL (0, 40, '10s') metric ) SELECT * FROM tql; -Error: 1001(Unsupported), SQL statement is not supported, keyword: tql ++---------------+----------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: tql.ts, tql.val | +| | SubqueryAlias: tql | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------+ -- Hybrid CTEs (TQL + SQL) WITH @@ -80,6 +143,34 @@ SELECT count(*) FROM filtered; | 2 | +----------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') metric), + filtered AS (SELECT * FROM tql_data WHERE val > 5) +SELECT count(*) FROM filtered; + ++---------------+--------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: count(Int64(1)) AS count(*) | +| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] | +| | SubqueryAlias: filtered | +| | Projection: tql_data.ts, tql_data.val | +| | Filter: tql_data.val > CAST(Int64(5) AS Float64) | +| | SubqueryAlias: tql_data | +| | Projection: metric.ts AS ts, metric.val AS val | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with complex PromQL expressions WITH tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])), @@ -92,6 +183,38 @@ SELECT sum(val) FROM filtered; | 1.05 | +-------------------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])), + filtered (ts, val) AS (SELECT * FROM tql_data WHERE val > 0) +SELECT sum(val) FROM filtered; + ++---------------+----------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: sum(filtered.val) | +| | Aggregate: groupBy=[[]], aggr=[[sum(filtered.val)]] | +| | SubqueryAlias: filtered | +| | Projection: tql_data.ts AS ts, tql_data.val AS val | +| | Projection: tql_data.ts, tql_data.val | +| | Filter: tql_data.val > CAST(Int64(0) AS Float64) | +| | SubqueryAlias: tql_data | +| | Projection: metric.ts AS ts, prom_rate(ts_range,val,ts,Int64(20000)) AS val | +| | Filter: prom_rate(ts_range,val,ts,Int64(20000)) IS NOT NULL | +| | Projection: metric.ts, prom_rate(ts_range, val, metric.ts, Int64(20000)) AS prom_rate(ts_range,val,ts,Int64(20000)) | +| | PromRangeManipulate: req range=[0..40000], interval=[10000], eval range=[20000], time index=[ts], values=["val"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-320000, None) AND metric.ts <= TimestampMillisecond(340000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with aggregation functions WITH tql_agg(ts, summary) AS ( TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"}) @@ -104,6 +227,32 @@ SELECT round(avg(summary)) as avg_sum FROM tql_agg; | 1.0 | +---------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_agg(ts, summary) AS ( + TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"}) +) +SELECT round(avg(summary)) as avg_sum FROM tql_agg; + ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: round(avg(tql_agg.summary)) AS avg_sum | +| | Aggregate: groupBy=[[]], aggr=[[avg(tql_agg.summary)]] | +| | SubqueryAlias: tql_agg | +| | Projection: labels.ts AS ts, sum(labels.cpu) AS summary | +| | Aggregate: groupBy=[[labels.ts]], aggr=[[sum(labels.cpu)]] | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=["host"] | +| | Filter: labels.host ~ Utf8("^(?:host.*)$") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) | +| | TableScan: labels | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with label selectors WITH host_metrics AS ( TQL EVAL (0, 40, '10s') labels{host="host1"} @@ -116,6 +265,30 @@ SELECT count(*) as host1_points FROM host_metrics; | 5 | +--------------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH host_metrics AS ( + TQL EVAL (0, 40, '10s') labels{host="host1"} +) +SELECT count(*) as host1_points FROM host_metrics; + ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: count(Int64(1)) AS count(*) AS host1_points | +| | Aggregate: groupBy=[[]], aggr=[[count(host_metrics.ts) AS count(Int64(1))]] | +| | SubqueryAlias: host_metrics | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=["host"] | +| | Filter: labels.host = Utf8("host1") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) | +| | TableScan: labels | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with column reference WITH host_metrics AS ( TQL EVAL (0, 40, '10s') labels{host="host1"} @@ -132,6 +305,29 @@ SELECT host_metrics.ts, host_metrics.host FROM host_metrics; | 1970-01-01T00:00:40 | host1 | +---------------------+-------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH host_metrics AS ( + TQL EVAL (0, 40, '10s') labels{host="host1"} +) +SELECT host_metrics.ts, host_metrics.host FROM host_metrics; + ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: host_metrics.ts, host_metrics.host | +| | SubqueryAlias: host_metrics | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=["host"] | +| | Filter: labels.host = Utf8("host1") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) | +| | TableScan: labels | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+ + -- Multiple TQL CTEs referencing different tables WITH metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric), @@ -152,6 +348,63 @@ LIMIT 3; | 8.0 | 0.7 | +------------+-----------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric), + label_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"}) +SELECT + m.val as metric_val, + l.cpu as label_val +FROM metric_data m, label_data l +WHERE m.ts = l.ts +ORDER BY m.ts +LIMIT 3; + ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: metric_val, label_val | +| | Sort: m.ts ASC NULLS LAST, fetch=3 | +| | Projection: m.val AS metric_val, l.cpu AS label_val, m.ts | +| | Inner Join: m.ts = l.ts | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: m | +| | SubqueryAlias: metric_data | +| | Projection: metric.ts AS ts, metric.val AS val | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) | +| | TableScan: metric | +| | ]] | +| | Projection: l.ts, l.cpu | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: l | +| | SubqueryAlias: label_data | +| | Projection: labels.ts AS ts, labels.host AS host, labels.cpu AS cpu | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=["host"] | +| | Filter: labels.host = Utf8("host2") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) | +| | TableScan: labels | +| | ]] | +| physical_plan | ProjectionExec: expr=[metric_val@0 as metric_val, label_val@1 as label_val] | +| | SortPreservingMergeExec: [ts@2 ASC NULLS LAST], fetch=3 | +| | SortExec: TopK(fetch=3), expr=[ts@2 ASC NULLS LAST], preserve_REDACTED +| | ProjectionExec: expr=[val@1 as metric_val, cpu@2 as label_val, ts@0 as ts] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ts@0, ts@0)], projection=[ts@0, val@1, cpu@3] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: REDACTED +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: REDACTED +| | ProjectionExec: expr=[ts@0 as ts, cpu@2 as cpu] | +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with mathematical operations WITH computed(ts, val) AS ( TQL EVAL (0, 40, '10s') metric * 2 + 1 @@ -164,6 +417,33 @@ SELECT min(val) as min_computed, max(val) as max_computed FROM computed; | 1.0 | 17.0 | +--------------+--------------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH computed(ts, val) AS ( + TQL EVAL (0, 40, '10s') metric * 2 + 1 +) +SELECT min(val) as min_computed, max(val) as max_computed FROM computed; + ++---------------+------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: min(computed.val) AS min_computed, max(computed.val) AS max_computed | +| | Aggregate: groupBy=[[]], aggr=[[min(computed.val), max(computed.val)]] | +| | SubqueryAlias: computed | +| | Projection: metric.ts AS ts, val * Float64(2) + Float64(1) AS val | +| | Projection: metric.ts, val * Float64(2) + Float64(1) AS val * Float64(2) + Float64(1) | +| | Projection: metric.ts, metric.val * Float64(2) AS val * Float64(2) | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with window functions in SQL part WITH tql_base(ts, val) AS ( TQL EVAL (0, 40, '10s') metric @@ -185,6 +465,41 @@ ORDER BY ts; | 1970-01-01T00:00:40 | 3.0 | 2.0 | +---------------------+-----+------------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_base(ts, val) AS ( + TQL EVAL (0, 40, '10s') metric +) +SELECT + ts, + val, + LAG(val, 1) OVER (ORDER BY ts) as prev_value +FROM tql_base +ORDER BY ts; + ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Sort: tql_base.ts ASC NULLS LAST | +| | Projection: tql_base.ts, tql_base.val, lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS prev_value | +| | WindowAggr: windowExpr=[[lag(tql_base.val, Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: tql_base | +| | Projection: metric.ts AS ts, metric.val AS val | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | ProjectionExec: expr=[ts@0 as ts, val@1 as val, lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as prev_value] | +| | BoundedWindowAggExec: wdw=[lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] | +| | SortPreservingMergeExec: [ts@0 ASC NULLS LAST] | +| | SortExec: expr=[ts@0 ASC NULLS LAST], preserve_REDACTED +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with HAVING clause WITH tql_grouped(ts, host, cpu) AS ( TQL EVAL (0, 40, '10s') labels @@ -202,6 +517,37 @@ HAVING count(*) > 1; | 1970-01-01T00:00:00 | 10 | +---------------------+-------------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_grouped(ts, host, cpu) AS ( + TQL EVAL (0, 40, '10s') labels +) +SELECT + DATE_TRUNC('minute', ts) as minute, + count(*) as point_count +FROM tql_grouped +GROUP BY minute +HAVING count(*) > 1; + ++---------------+----------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: date_trunc(Utf8("minute"),tql_grouped.ts) AS minute, count(Int64(1)) AS count(*) AS point_count | +| | Filter: count(Int64(1)) > Int64(1) | +| | Aggregate: groupBy=[[date_trunc(Utf8("minute"), tql_grouped.ts)]], aggr=[[count(tql_grouped.ts) AS count(Int64(1))]] | +| | SubqueryAlias: tql_grouped | +| | Projection: labels.ts AS ts, labels.host AS host, labels.cpu AS cpu | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=["host"] | +| | Filter: labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) | +| | TableScan: labels | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with UNION -- SQLNESS SORT_RESULT 3 1 WITH @@ -226,6 +572,45 @@ SELECT 'host2' as source, ts, cpu FROM host2_data; | host2 | 1970-01-01T00:00:40 | 0.5 | +--------+---------------------+-----+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + host1_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host1"}), + host2_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"}) +SELECT 'host1' as source, ts, cpu FROM host1_data +UNION ALL +SELECT 'host2' as source, ts, cpu FROM host2_data; + ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Union | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: Utf8("host1") AS source, host1_data.ts, host1_data.cpu | +| | SubqueryAlias: host1_data | +| | Projection: labels.ts AS ts, labels.host AS host, labels.cpu AS cpu | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=["host"] | +| | Filter: labels.host = Utf8("host1") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) | +| | TableScan: labels | +| | ]] | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: Utf8("host2") AS source, host2_data.ts, host2_data.cpu | +| | SubqueryAlias: host2_data | +| | Projection: labels.ts AS ts, labels.host AS host, labels.cpu AS cpu | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=["host"] | +| | Filter: labels.host = Utf8("host2") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) | +| | TableScan: labels | +| | ]] | +| physical_plan | InterleaveExec | +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ + -- Nested CTEs with TQL WITH base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric), @@ -245,6 +630,46 @@ SELECT count(*) as high_values FROM final; | 3 | +-------------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric), + processed(ts, percent) AS ( + SELECT ts, val * 100 as percent + FROM base_tql + WHERE val > 0 + ), + final(ts, percent) AS ( + SELECT * FROM processed WHERE percent > 200 + ) +SELECT count(*) as high_values FROM final; + ++---------------+------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: count(Int64(1)) AS count(*) AS high_values | +| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] | +| | SubqueryAlias: final | +| | Projection: processed.ts AS ts, processed.percent AS percent | +| | Projection: processed.ts, processed.percent | +| | Filter: processed.percent > CAST(Int64(200) AS Float64) | +| | SubqueryAlias: processed | +| | Projection: base_tql.ts AS ts, percent AS percent | +| | Projection: base_tql.ts, base_tql.val * CAST(Int64(100) AS Float64) AS percent | +| | Filter: base_tql.val > CAST(Int64(0) AS Float64) | +| | SubqueryAlias: base_tql | +| | Projection: metric.ts AS ts, metric.val AS val | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with time-based functions WITH time_shifted AS ( TQL EVAL (0, 40, '10s') metric offset 50s @@ -261,6 +686,30 @@ SELECT * FROM time_shifted; | 1970-01-01T00:00:40 | 3.0 | +---------------------+-----+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH time_shifted AS ( + TQL EVAL (0, 40, '10s') metric offset 50s +) +SELECT * FROM time_shifted; + ++---------------+------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: time_shifted.ts, time_shifted.val | +| | SubqueryAlias: time_shifted | +| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] | +| | PromSeriesNormalize: offset=[50000], time index=[ts], filter NaN: [false] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-250000, None) AND metric.ts <= TimestampMillisecond(390000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with JOIN between TQL and regular table -- SQLNESS SORT_RESULT 3 1 WITH tql_summary(ts, host, cpu) AS ( @@ -286,6 +735,67 @@ LIMIT 5; | 1970-01-01T00:00:20 | host1 | host1 | +---------------------+-----------+-------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_summary(ts, host, cpu) AS ( + TQL EVAL (0, 40, '10s') avg_over_time(labels[30s]) +) +SELECT + t.ts, + t.cpu as avg_value, + l.host +FROM tql_summary t +JOIN labels l ON DATE_TRUNC('second', t.ts) = DATE_TRUNC('second', l.ts) +WHERE l.host = 'host1' +ORDER BY t.ts, l.host, avg_value +LIMIT 5; + ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Sort: t.ts ASC NULLS LAST, l.host ASC NULLS LAST, avg_value ASC NULLS LAST, fetch=5 | +| | Projection: t.ts, t.cpu AS avg_value, l.host | +| | Inner Join: date_trunc(Utf8("second"), t.ts) = date_trunc(Utf8("second"), l.ts) | +| | Projection: t.ts, t.cpu | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: t | +| | SubqueryAlias: tql_summary | +| | Projection: labels.ts AS ts, prom_avg_over_time(ts_range,cpu) AS host, labels.host AS cpu | +| | Filter: prom_avg_over_time(ts_range,cpu) IS NOT NULL | +| | Projection: labels.ts, prom_avg_over_time(ts_range, cpu) AS prom_avg_over_time(ts_range,cpu), labels.host | +| | PromRangeManipulate: req range=[0..40000], interval=[10000], eval range=[30000], time index=[ts], values=["cpu"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["host"] | +| | Filter: labels.ts >= TimestampMillisecond(-330000, None) AND labels.ts <= TimestampMillisecond(340000, None) | +| | TableScan: labels | +| | ]] | +| | Filter: l.host = Utf8("host1") | +| | Projection: l.ts, l.host | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: l | +| | TableScan: labels | +| | ]] | +| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST, host@2 ASC NULLS LAST, avg_value@1 ASC NULLS LAST], fetch=5 | +| | SortExec: TopK(fetch=5), expr=[ts@0 ASC NULLS LAST, host@2 ASC NULLS LAST, avg_value@1 ASC NULLS LAST], preserve_REDACTED +| | ProjectionExec: expr=[ts@0 as ts, cpu@1 as avg_value, host@2 as host] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(date_trunc(Utf8("second"),t.ts)@2, date_trunc(Utf8("second"),l.ts)@2)], projection=[ts@0, cpu@1, host@4] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: REDACTED +| | ProjectionExec: expr=[ts@0 as ts, cpu@2 as cpu, date_trunc(second, ts@0) as date_trunc(Utf8("second"),t.ts)] | +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: REDACTED +| | ProjectionExec: expr=[ts@0 as ts, host@1 as host, date_trunc(second, ts@0) as date_trunc(Utf8("second"),l.ts)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: host@1 = host1 | +| | ProjectionExec: expr=[ts@0 as ts, host@1 as host] | +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + -- TQL CTE with JOIN and value aliasing -- SQLNESS SORT_RESULT 3 1 WITH tql_summary AS ( @@ -311,6 +821,67 @@ LIMIT 5; | 1970-01-01T00:00:20 | 0.5666666666666668 | host1 | +---------------------+--------------------+-------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_summary AS ( + TQL EVAL (0, 40, '10s') avg_over_time(labels[30s]) AS cpu +) +SELECT + t.ts, + t.cpu as avg_value, + l.host +FROM tql_summary t +JOIN labels l ON DATE_TRUNC('second', t.ts) = DATE_TRUNC('second', l.ts) +WHERE l.host = 'host1' +ORDER BY t.ts, l.host, avg_value +LIMIT 5; + ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Sort: t.ts ASC NULLS LAST, l.host ASC NULLS LAST, avg_value ASC NULLS LAST, fetch=5 | +| | Projection: t.ts, t.cpu AS avg_value, l.host | +| | Inner Join: date_trunc(Utf8("second"), t.ts) = date_trunc(Utf8("second"), l.ts) | +| | Projection: t.cpu, t.ts | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: t | +| | SubqueryAlias: tql_summary | +| | Projection: prom_avg_over_time(ts_range,cpu) AS cpu, labels.host, labels.ts | +| | Filter: prom_avg_over_time(ts_range,cpu) IS NOT NULL | +| | Projection: labels.ts, prom_avg_over_time(ts_range, cpu) AS prom_avg_over_time(ts_range,cpu), labels.host | +| | PromRangeManipulate: req range=[0..40000], interval=[10000], eval range=[30000], time index=[ts], values=["cpu"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["host"] | +| | Filter: labels.ts >= TimestampMillisecond(-330000, None) AND labels.ts <= TimestampMillisecond(340000, None) | +| | TableScan: labels | +| | ]] | +| | Filter: l.host = Utf8("host1") | +| | Projection: l.ts, l.host | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: l | +| | TableScan: labels | +| | ]] | +| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST, host@2 ASC NULLS LAST, avg_value@1 ASC NULLS LAST], fetch=5 | +| | SortExec: TopK(fetch=5), expr=[ts@0 ASC NULLS LAST, host@2 ASC NULLS LAST, avg_value@1 ASC NULLS LAST], preserve_REDACTED +| | ProjectionExec: expr=[ts@1 as ts, cpu@0 as avg_value, host@2 as host] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(date_trunc(Utf8("second"),t.ts)@2, date_trunc(Utf8("second"),l.ts)@2)], projection=[cpu@0, ts@1, host@4] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: REDACTED +| | ProjectionExec: expr=[cpu@0 as cpu, ts@2 as ts, date_trunc(second, ts@2) as date_trunc(Utf8("second"),t.ts)] | +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: REDACTED +| | ProjectionExec: expr=[ts@0 as ts, host@1 as host, date_trunc(second, ts@0) as date_trunc(Utf8("second"),l.ts)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: host@1 = host1 | +| | ProjectionExec: expr=[ts@0 as ts, host@1 as host] | +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + -- Error case - TQL ANALYZE should fail WITH tql_analyze AS ( TQL ANALYZE (0, 40, '10s') metric @@ -319,6 +890,15 @@ SELECT * FROM tql_analyze limit 1; Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_analyze AS ( + TQL ANALYZE (0, 40, '10s') metric +) +SELECT * FROM tql_analyze limit 1; + +Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs + -- Error case - TQL EXPLAIN should fail WITH tql_explain AS ( TQL EXPLAIN (0, 40, '10s') metric @@ -327,6 +907,15 @@ SELECT * FROM tql_explain limit 1; Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_explain AS ( + TQL EXPLAIN (0, 40, '10s') metric +) +SELECT * FROM tql_explain limit 1; + +Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs + -- TQL CTE with lookback parameter WITH tql_lookback AS ( TQL EVAL (0, 40, '10s', '15s') metric @@ -339,6 +928,30 @@ SELECT count(*) FROM tql_lookback; | 5 | +----------+ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_lookback AS ( + TQL EVAL (0, 40, '10s', '15s') metric +) +SELECT count(*) FROM tql_lookback; + ++---------------+----------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: count(Int64(1)) AS count(*) | +| | Aggregate: groupBy=[[]], aggr=[[count(tql_lookback.ts) AS count(Int64(1))]] | +| | SubqueryAlias: tql_lookback | +| | PromInstantManipulate: range=[0..40000], lookback=[15000], interval=[10000], time index=[ts] | +| | PromSeriesDivide: tags=[] | +| | Filter: metric.ts >= TimestampMillisecond(-15000, None) AND metric.ts <= TimestampMillisecond(55000, None) | +| | TableScan: metric | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------+ + drop table metric; Affected Rows: 0 diff --git a/tests/cases/standalone/common/tql/tql-cte.sql b/tests/cases/standalone/common/tql/tql-cte.sql index 6b9c197ef3..6d928d8451 100644 --- a/tests/cases/standalone/common/tql/tql-cte.sql +++ b/tests/cases/standalone/common/tql/tql-cte.sql @@ -26,13 +26,29 @@ WITH tql as ( ) SELECT * FROM tql; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql as ( + TQL EVAL (0, 40, '10s') metric +) +SELECT * FROM tql; + -- TQL CTE with column aliases WITH tql (the_timestamp, the_value) as ( TQL EVAL (0, 40, '10s') metric ) SELECT * FROM tql; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql (the_timestamp, the_value) as ( + TQL EVAL (0, 40, '10s') metric +) +SELECT * FROM tql; + -- Explain TQL CTE +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED EXPLAIN WITH tql AS ( TQL EVAL (0, 40, '10s') metric ) SELECT * FROM tql; @@ -43,30 +59,66 @@ WITH filtered AS (SELECT * FROM tql_data WHERE val > 5) SELECT count(*) FROM filtered; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') metric), + filtered AS (SELECT * FROM tql_data WHERE val > 5) +SELECT count(*) FROM filtered; + -- TQL CTE with complex PromQL expressions WITH tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])), filtered (ts, val) AS (SELECT * FROM tql_data WHERE val > 0) SELECT sum(val) FROM filtered; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])), + filtered (ts, val) AS (SELECT * FROM tql_data WHERE val > 0) +SELECT sum(val) FROM filtered; + -- TQL CTE with aggregation functions WITH tql_agg(ts, summary) AS ( TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"}) ) SELECT round(avg(summary)) as avg_sum FROM tql_agg; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_agg(ts, summary) AS ( + TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"}) +) +SELECT round(avg(summary)) as avg_sum FROM tql_agg; + -- TQL CTE with label selectors WITH host_metrics AS ( TQL EVAL (0, 40, '10s') labels{host="host1"} ) SELECT count(*) as host1_points FROM host_metrics; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH host_metrics AS ( + TQL EVAL (0, 40, '10s') labels{host="host1"} +) +SELECT count(*) as host1_points FROM host_metrics; + -- TQL CTE with column reference WITH host_metrics AS ( TQL EVAL (0, 40, '10s') labels{host="host1"} ) SELECT host_metrics.ts, host_metrics.host FROM host_metrics; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH host_metrics AS ( + TQL EVAL (0, 40, '10s') labels{host="host1"} +) +SELECT host_metrics.ts, host_metrics.host FROM host_metrics; + + -- Multiple TQL CTEs referencing different tables WITH metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric), @@ -79,12 +131,32 @@ WHERE m.ts = l.ts ORDER BY m.ts LIMIT 3; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric), + label_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"}) +SELECT + m.val as metric_val, + l.cpu as label_val +FROM metric_data m, label_data l +WHERE m.ts = l.ts +ORDER BY m.ts +LIMIT 3; + -- TQL CTE with mathematical operations WITH computed(ts, val) AS ( TQL EVAL (0, 40, '10s') metric * 2 + 1 ) SELECT min(val) as min_computed, max(val) as max_computed FROM computed; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH computed(ts, val) AS ( + TQL EVAL (0, 40, '10s') metric * 2 + 1 +) +SELECT min(val) as min_computed, max(val) as max_computed FROM computed; + -- TQL CTE with window functions in SQL part WITH tql_base(ts, val) AS ( TQL EVAL (0, 40, '10s') metric @@ -96,6 +168,18 @@ SELECT FROM tql_base ORDER BY ts; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_base(ts, val) AS ( + TQL EVAL (0, 40, '10s') metric +) +SELECT + ts, + val, + LAG(val, 1) OVER (ORDER BY ts) as prev_value +FROM tql_base +ORDER BY ts; + -- TQL CTE with HAVING clause WITH tql_grouped(ts, host, cpu) AS ( TQL EVAL (0, 40, '10s') labels @@ -107,6 +191,18 @@ FROM tql_grouped GROUP BY minute HAVING count(*) > 1; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_grouped(ts, host, cpu) AS ( + TQL EVAL (0, 40, '10s') labels +) +SELECT + DATE_TRUNC('minute', ts) as minute, + count(*) as point_count +FROM tql_grouped +GROUP BY minute +HAVING count(*) > 1; + -- TQL CTE with UNION -- SQLNESS SORT_RESULT 3 1 WITH @@ -116,6 +212,15 @@ SELECT 'host1' as source, ts, cpu FROM host1_data UNION ALL SELECT 'host2' as source, ts, cpu FROM host2_data; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + host1_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host1"}), + host2_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"}) +SELECT 'host1' as source, ts, cpu FROM host1_data +UNION ALL +SELECT 'host2' as source, ts, cpu FROM host2_data; + -- Nested CTEs with TQL WITH base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric), @@ -129,12 +234,33 @@ WITH ) SELECT count(*) as high_values FROM final; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH + base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric), + processed(ts, percent) AS ( + SELECT ts, val * 100 as percent + FROM base_tql + WHERE val > 0 + ), + final(ts, percent) AS ( + SELECT * FROM processed WHERE percent > 200 + ) +SELECT count(*) as high_values FROM final; + -- TQL CTE with time-based functions WITH time_shifted AS ( TQL EVAL (0, 40, '10s') metric offset 50s ) SELECT * FROM time_shifted; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH time_shifted AS ( + TQL EVAL (0, 40, '10s') metric offset 50s +) +SELECT * FROM time_shifted; + -- TQL CTE with JOIN between TQL and regular table -- SQLNESS SORT_RESULT 3 1 WITH tql_summary(ts, host, cpu) AS ( @@ -150,6 +276,21 @@ WHERE l.host = 'host1' ORDER BY t.ts, l.host, avg_value LIMIT 5; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_summary(ts, host, cpu) AS ( + TQL EVAL (0, 40, '10s') avg_over_time(labels[30s]) +) +SELECT + t.ts, + t.cpu as avg_value, + l.host +FROM tql_summary t +JOIN labels l ON DATE_TRUNC('second', t.ts) = DATE_TRUNC('second', l.ts) +WHERE l.host = 'host1' +ORDER BY t.ts, l.host, avg_value +LIMIT 5; + -- TQL CTE with JOIN and value aliasing -- SQLNESS SORT_RESULT 3 1 WITH tql_summary AS ( @@ -165,23 +306,59 @@ WHERE l.host = 'host1' ORDER BY t.ts, l.host, avg_value LIMIT 5; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_summary AS ( + TQL EVAL (0, 40, '10s') avg_over_time(labels[30s]) AS cpu +) +SELECT + t.ts, + t.cpu as avg_value, + l.host +FROM tql_summary t +JOIN labels l ON DATE_TRUNC('second', t.ts) = DATE_TRUNC('second', l.ts) +WHERE l.host = 'host1' +ORDER BY t.ts, l.host, avg_value +LIMIT 5; + -- Error case - TQL ANALYZE should fail WITH tql_analyze AS ( TQL ANALYZE (0, 40, '10s') metric ) SELECT * FROM tql_analyze limit 1; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_analyze AS ( + TQL ANALYZE (0, 40, '10s') metric +) +SELECT * FROM tql_analyze limit 1; + -- Error case - TQL EXPLAIN should fail WITH tql_explain AS ( TQL EXPLAIN (0, 40, '10s') metric ) SELECT * FROM tql_explain limit 1; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_explain AS ( + TQL EXPLAIN (0, 40, '10s') metric +) +SELECT * FROM tql_explain limit 1; + -- TQL CTE with lookback parameter WITH tql_lookback AS ( TQL EVAL (0, 40, '10s', '15s') metric ) SELECT count(*) FROM tql_lookback; +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN WITH tql_lookback AS ( + TQL EVAL (0, 40, '10s', '15s') metric +) +SELECT count(*) FROM tql_lookback; + drop table metric; drop table labels;