feat: explain custom statement (#7058)

* feat: explain tql cte

Signed-off-by: discord9 <discord9@163.com>

* chore: unused

Signed-off-by: discord9 <discord9@163.com>

* fix: analyze format

Signed-off-by: discord9 <discord9@163.com>

* Update src/sql/src/statements/statement.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* test: sqlness

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
discord9
2025-10-11 14:27:51 +08:00
committed by GitHub
parent e46ce7c6da
commit ba034c5a9e
6 changed files with 935 additions and 64 deletions

View File

@@ -20,10 +20,14 @@ use async_trait::async_trait;
use catalog::table_source::DfTableSourceProvider;
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use datafusion::common::DFSchema;
use datafusion::common::{DFSchema, plan_err};
use datafusion::execution::context::SessionState;
use datafusion::sql::planner::PlannerContext;
use datafusion_expr::{Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, col};
use datafusion_common::ToDFSchema;
use datafusion_expr::{
Analyze, Explain, ExplainFormat, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, PlanType,
ToStringifiedPlan, col,
};
use datafusion_sql::planner::{ParserOptions, SqlToRel};
use log_query::LogQuery;
use promql_parser::parser::EvalStmt;
@@ -31,6 +35,7 @@ use session::context::QueryContextRef;
use snafu::{ResultExt, ensure};
use sql::CteContent;
use sql::ast::Expr as SqlExpr;
use sql::statements::explain::ExplainStatement;
use sql::statements::query::Query;
use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
@@ -75,12 +80,69 @@ impl DfLogicalPlanner {
}
}
/// Basically the same with `explain_to_plan` in DataFusion, but adapted to Greptime's
/// `plan_sql` to support Greptime Statements.
async fn explain_to_plan(
&self,
explain: &ExplainStatement,
query_ctx: QueryContextRef,
) -> Result<LogicalPlan> {
let plan = self.plan_sql(&explain.statement, query_ctx).await?;
if matches!(plan, LogicalPlan::Explain(_)) {
return plan_err!("Nested EXPLAINs are not supported").context(PlanSqlSnafu);
}
let verbose = explain.verbose;
let analyze = explain.analyze;
let format = explain.format.map(|f| f.to_string());
let plan = Arc::new(plan);
let schema = LogicalPlan::explain_schema();
let schema = ToDFSchema::to_dfschema_ref(schema)?;
if verbose && format.is_some() {
return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported").context(PlanSqlSnafu);
}
if analyze {
// notice format is already set in query context, so can be ignore here
Ok(LogicalPlan::Analyze(Analyze {
verbose,
input: plan,
schema,
}))
} else {
let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
// default to configuration value
let options = self.session_state.config().options();
let format = format.as_ref().unwrap_or(&options.explain.format);
let format: ExplainFormat = format.parse()?;
Ok(LogicalPlan::Explain(Explain {
verbose,
explain_format: format,
plan,
stringified_plans,
schema,
logical_optimization_succeeded: false,
}))
}
}
#[tracing::instrument(skip_all)]
#[async_recursion::async_recursion]
async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let mut planner_context = PlannerContext::new();
let mut stmt = Cow::Borrowed(stmt);
let mut is_tql_cte = false;
// handle explain before normal processing so we can explain Greptime Statements
if let Statement::Explain(explain) = stmt.as_ref() {
return self.explain_to_plan(explain, query_ctx).await;
}
// Check for hybrid CTEs before normal processing
if self.has_hybrid_ctes(stmt.as_ref()) {
let stmt_owned = stmt.into_owned();
@@ -101,10 +163,13 @@ impl DfLogicalPlanner {
// TODO(LFC): Remove this when Datafusion supports **both** the syntax and implementation of "explain with format".
if let datafusion::sql::parser::Statement::Statement(
box datafusion::sql::sqlparser::ast::Statement::Explain { format, .. },
box datafusion::sql::sqlparser::ast::Statement::Explain { .. },
) = &mut df_stmt
{
format.take();
UnimplementedSnafu {
operation: "EXPLAIN with FORMAT using raw datafusion planner",
}
.fail()?;
}
let table_provider = DfTableSourceProvider::new(

View File

@@ -13,37 +13,47 @@
// limitations under the License.
use snafu::ResultExt;
use sqlparser::ast::DescribeAlias;
use sqlparser::keywords::Keyword;
use crate::error::{self, Result};
use crate::parser::ParserContext;
use crate::statements::explain::Explain;
use crate::statements::explain::ExplainStatement;
use crate::statements::statement::Statement;
/// EXPLAIN statement parser implementation
impl ParserContext<'_> {
/// explain that support use our own parser to parse the inner statement
pub(crate) fn parse_explain(&mut self) -> Result<Statement> {
let explain_statement = self
.parser
.parse_explain(DescribeAlias::Explain)
.with_context(|_| error::UnexpectedSnafu {
expected: "a query statement",
actual: self.peek_token_as_string(),
})?;
let analyze = self.parser.parse_keyword(Keyword::ANALYZE);
let verbose = self.parser.parse_keyword(Keyword::VERBOSE);
let format =
if self.parser.parse_keyword(Keyword::FORMAT) {
Some(self.parser.parse_analyze_format().with_context(|_| {
error::UnexpectedSnafu {
expected: "analyze format",
actual: self.peek_token_as_string(),
}
})?)
} else {
None
};
Ok(Statement::Explain(Box::new(Explain::try_from(
explain_statement,
)?)))
let statement = self.parse_statement()?;
let explain = ExplainStatement {
analyze,
verbose,
format,
statement: Box::new(statement),
};
Ok(Statement::Explain(Box::new(explain)))
}
}
#[cfg(test)]
mod tests {
use sqlparser::ast::helpers::attached_token::AttachedToken;
use sqlparser::ast::{
GroupByExpr, Query as SpQuery, SelectFlavor, Statement as SpStatement,
WildcardAdditionalOptions,
};
use sqlparser::ast::{GroupByExpr, Query as SpQuery, SelectFlavor, WildcardAdditionalOptions};
use super::*;
use crate::dialect::GreptimeDbDialect;
@@ -97,31 +107,30 @@ mod tests {
flavor: SelectFlavor::Standard,
};
let sp_statement = SpStatement::Query(Box::new(SpQuery {
with: None,
body: Box::new(sqlparser::ast::SetExpr::Select(Box::new(select))),
order_by: None,
limit: None,
limit_by: vec![],
offset: None,
fetch: None,
locks: vec![],
for_clause: None,
settings: None,
format_clause: None,
}));
let sp_query = Box::new(
SpQuery {
with: None,
body: Box::new(sqlparser::ast::SetExpr::Select(Box::new(select))),
order_by: None,
limit: None,
limit_by: vec![],
offset: None,
fetch: None,
locks: vec![],
for_clause: None,
settings: None,
format_clause: None,
}
.try_into()
.unwrap(),
);
let explain = Explain::try_from(SpStatement::Explain {
describe_alias: DescribeAlias::Explain,
let explain = ExplainStatement {
analyze: false,
verbose: false,
statement: Box::new(sp_statement),
format: None,
query_plan: false,
options: None,
estimate: false,
})
.unwrap();
statement: Box::new(Statement::Query(sp_query)),
};
assert_eq!(stmts[0], Statement::Explain(Box::new(explain)))
}

View File

@@ -15,36 +15,44 @@
use std::fmt::{Display, Formatter};
use serde::Serialize;
use sqlparser::ast::{AnalyzeFormat, Statement as SpStatement};
use sqlparser::ast::AnalyzeFormat;
use sqlparser_derive::{Visit, VisitMut};
use crate::error::Error;
use crate::statements::statement::Statement;
/// Explain statement.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
pub struct Explain {
pub inner: SpStatement,
pub struct ExplainStatement {
/// `EXPLAIN ANALYZE ..`
pub analyze: bool,
/// `EXPLAIN .. VERBOSE ..`
pub verbose: bool,
/// `EXPLAIN .. FORMAT `
pub format: Option<AnalyzeFormat>,
/// The statement to analyze. Note this is a Greptime [`Statement`] (not a
/// [`sqlparser::ast::Statement`] so that we can use
/// Greptime specific statements
pub statement: Box<Statement>,
}
impl Explain {
impl ExplainStatement {
pub fn format(&self) -> Option<AnalyzeFormat> {
match self.inner {
SpStatement::Explain { format, .. } => format,
_ => None,
}
self.format
}
}
impl TryFrom<SpStatement> for Explain {
type Error = Error;
fn try_from(value: SpStatement) -> Result<Self, Self::Error> {
Ok(Explain { inner: value })
}
}
impl Display for Explain {
impl Display for ExplainStatement {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner)
write!(f, "EXPLAIN")?;
if self.analyze {
write!(f, " ANALYZE")?;
}
if self.verbose {
write!(f, " VERBOSE")?;
}
if let Some(format) = &self.format {
write!(f, " FORMAT {}", format)?;
}
write!(f, " {}", self.statement)
}
}

View File

@@ -30,7 +30,7 @@ use crate::statements::cursor::{CloseCursor, DeclareCursor, FetchCursor};
use crate::statements::delete::Delete;
use crate::statements::describe::DescribeTable;
use crate::statements::drop::{DropDatabase, DropFlow, DropTable, DropView};
use crate::statements::explain::Explain;
use crate::statements::explain::ExplainStatement;
use crate::statements::insert::Insert;
use crate::statements::kill::Kill;
use crate::statements::query::Query;
@@ -126,7 +126,7 @@ pub enum Statement {
// DESCRIBE TABLE
DescribeTable(DescribeTable),
// EXPLAIN QUERY
Explain(Box<Explain>),
Explain(Box<ExplainStatement>),
// COPY
Copy(Copy),
// Telemetry Query Language
@@ -301,7 +301,6 @@ impl TryFrom<&Statement> for DfStatement {
fn try_from(s: &Statement) -> Result<Self, Self::Error> {
let s = match s {
Statement::Query(query) => SpStatement::Query(Box::new(query.inner.clone())),
Statement::Explain(explain) => explain.inner.clone(),
Statement::Insert(insert) => insert.inner.clone(),
Statement::Delete(delete) => delete.inner.clone(),
_ => {

View File

@@ -45,6 +45,29 @@ SELECT * FROM tql;
| 1970-01-01T00:00:40 | 3.0 |
+---------------------+-----+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql as (
TQL EVAL (0, 40, '10s') metric
)
SELECT * FROM tql;
+---------------+----------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: tql.ts, tql.val |
| | SubqueryAlias: tql |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with column aliases
WITH tql (the_timestamp, the_value) as (
TQL EVAL (0, 40, '10s') metric
@@ -61,12 +84,52 @@ SELECT * FROM tql;
| 1970-01-01T00:00:40 | 3.0 |
+---------------------+-----------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql (the_timestamp, the_value) as (
TQL EVAL (0, 40, '10s') metric
)
SELECT * FROM tql;
+---------------+------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: tql.the_timestamp, tql.the_value |
| | SubqueryAlias: tql |
| | Projection: metric.ts AS the_timestamp, metric.val AS the_value |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------+
-- Explain TQL CTE
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql AS (
TQL EVAL (0, 40, '10s') metric
) SELECT * FROM tql;
Error: 1001(Unsupported), SQL statement is not supported, keyword: tql
+---------------+----------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: tql.ts, tql.val |
| | SubqueryAlias: tql |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------+
-- Hybrid CTEs (TQL + SQL)
WITH
@@ -80,6 +143,34 @@ SELECT count(*) FROM filtered;
| 2 |
+----------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') metric),
filtered AS (SELECT * FROM tql_data WHERE val > 5)
SELECT count(*) FROM filtered;
+---------------+--------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: count(Int64(1)) AS count(*) |
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
| | SubqueryAlias: filtered |
| | Projection: tql_data.ts, tql_data.val |
| | Filter: tql_data.val > CAST(Int64(5) AS Float64) |
| | SubqueryAlias: tql_data |
| | Projection: metric.ts AS ts, metric.val AS val |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with complex PromQL expressions
WITH
tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])),
@@ -92,6 +183,38 @@ SELECT sum(val) FROM filtered;
| 1.05 |
+-------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])),
filtered (ts, val) AS (SELECT * FROM tql_data WHERE val > 0)
SELECT sum(val) FROM filtered;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: sum(filtered.val) |
| | Aggregate: groupBy=[[]], aggr=[[sum(filtered.val)]] |
| | SubqueryAlias: filtered |
| | Projection: tql_data.ts AS ts, tql_data.val AS val |
| | Projection: tql_data.ts, tql_data.val |
| | Filter: tql_data.val > CAST(Int64(0) AS Float64) |
| | SubqueryAlias: tql_data |
| | Projection: metric.ts AS ts, prom_rate(ts_range,val,ts,Int64(20000)) AS val |
| | Filter: prom_rate(ts_range,val,ts,Int64(20000)) IS NOT NULL |
| | Projection: metric.ts, prom_rate(ts_range, val, metric.ts, Int64(20000)) AS prom_rate(ts_range,val,ts,Int64(20000)) |
| | PromRangeManipulate: req range=[0..40000], interval=[10000], eval range=[20000], time index=[ts], values=["val"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-320000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with aggregation functions
WITH tql_agg(ts, summary) AS (
TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"})
@@ -104,6 +227,32 @@ SELECT round(avg(summary)) as avg_sum FROM tql_agg;
| 1.0 |
+---------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_agg(ts, summary) AS (
TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"})
)
SELECT round(avg(summary)) as avg_sum FROM tql_agg;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: round(avg(tql_agg.summary)) AS avg_sum |
| | Aggregate: groupBy=[[]], aggr=[[avg(tql_agg.summary)]] |
| | SubqueryAlias: tql_agg |
| | Projection: labels.ts AS ts, sum(labels.cpu) AS summary |
| | Aggregate: groupBy=[[labels.ts]], aggr=[[sum(labels.cpu)]] |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=["host"] |
| | Filter: labels.host ~ Utf8("^(?:host.*)$") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) |
| | TableScan: labels |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with label selectors
WITH host_metrics AS (
TQL EVAL (0, 40, '10s') labels{host="host1"}
@@ -116,6 +265,30 @@ SELECT count(*) as host1_points FROM host_metrics;
| 5 |
+--------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH host_metrics AS (
TQL EVAL (0, 40, '10s') labels{host="host1"}
)
SELECT count(*) as host1_points FROM host_metrics;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: count(Int64(1)) AS count(*) AS host1_points |
| | Aggregate: groupBy=[[]], aggr=[[count(host_metrics.ts) AS count(Int64(1))]] |
| | SubqueryAlias: host_metrics |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=["host"] |
| | Filter: labels.host = Utf8("host1") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) |
| | TableScan: labels |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with column reference
WITH host_metrics AS (
TQL EVAL (0, 40, '10s') labels{host="host1"}
@@ -132,6 +305,29 @@ SELECT host_metrics.ts, host_metrics.host FROM host_metrics;
| 1970-01-01T00:00:40 | host1 |
+---------------------+-------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH host_metrics AS (
TQL EVAL (0, 40, '10s') labels{host="host1"}
)
SELECT host_metrics.ts, host_metrics.host FROM host_metrics;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: host_metrics.ts, host_metrics.host |
| | SubqueryAlias: host_metrics |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=["host"] |
| | Filter: labels.host = Utf8("host1") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) |
| | TableScan: labels |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
-- Multiple TQL CTEs referencing different tables
WITH
metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric),
@@ -152,6 +348,63 @@ LIMIT 3;
| 8.0 | 0.7 |
+------------+-----------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric),
label_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"})
SELECT
m.val as metric_val,
l.cpu as label_val
FROM metric_data m, label_data l
WHERE m.ts = l.ts
ORDER BY m.ts
LIMIT 3;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: metric_val, label_val |
| | Sort: m.ts ASC NULLS LAST, fetch=3 |
| | Projection: m.val AS metric_val, l.cpu AS label_val, m.ts |
| | Inner Join: m.ts = l.ts |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: m |
| | SubqueryAlias: metric_data |
| | Projection: metric.ts AS ts, metric.val AS val |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| | Projection: l.ts, l.cpu |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: l |
| | SubqueryAlias: label_data |
| | Projection: labels.ts AS ts, labels.host AS host, labels.cpu AS cpu |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=["host"] |
| | Filter: labels.host = Utf8("host2") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) |
| | TableScan: labels |
| | ]] |
| physical_plan | ProjectionExec: expr=[metric_val@0 as metric_val, label_val@1 as label_val] |
| | SortPreservingMergeExec: [ts@2 ASC NULLS LAST], fetch=3 |
| | SortExec: TopK(fetch=3), expr=[ts@2 ASC NULLS LAST], preserve_REDACTED
| | ProjectionExec: expr=[val@1 as metric_val, cpu@2 as label_val, ts@0 as ts] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ts@0, ts@0)], projection=[ts@0, val@1, cpu@3] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: REDACTED
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: REDACTED
| | ProjectionExec: expr=[ts@0 as ts, cpu@2 as cpu] |
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with mathematical operations
WITH computed(ts, val) AS (
TQL EVAL (0, 40, '10s') metric * 2 + 1
@@ -164,6 +417,33 @@ SELECT min(val) as min_computed, max(val) as max_computed FROM computed;
| 1.0 | 17.0 |
+--------------+--------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH computed(ts, val) AS (
TQL EVAL (0, 40, '10s') metric * 2 + 1
)
SELECT min(val) as min_computed, max(val) as max_computed FROM computed;
+---------------+------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: min(computed.val) AS min_computed, max(computed.val) AS max_computed |
| | Aggregate: groupBy=[[]], aggr=[[min(computed.val), max(computed.val)]] |
| | SubqueryAlias: computed |
| | Projection: metric.ts AS ts, val * Float64(2) + Float64(1) AS val |
| | Projection: metric.ts, val * Float64(2) + Float64(1) AS val * Float64(2) + Float64(1) |
| | Projection: metric.ts, metric.val * Float64(2) AS val * Float64(2) |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with window functions in SQL part
WITH tql_base(ts, val) AS (
TQL EVAL (0, 40, '10s') metric
@@ -185,6 +465,41 @@ ORDER BY ts;
| 1970-01-01T00:00:40 | 3.0 | 2.0 |
+---------------------+-----+------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_base(ts, val) AS (
TQL EVAL (0, 40, '10s') metric
)
SELECT
ts,
val,
LAG(val, 1) OVER (ORDER BY ts) as prev_value
FROM tql_base
ORDER BY ts;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: tql_base.ts ASC NULLS LAST |
| | Projection: tql_base.ts, tql_base.val, lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS prev_value |
| | WindowAggr: windowExpr=[[lag(tql_base.val, Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: tql_base |
| | Projection: metric.ts AS ts, metric.val AS val |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | ProjectionExec: expr=[ts@0 as ts, val@1 as val, lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as prev_value] |
| | BoundedWindowAggExec: wdw=[lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] |
| | SortPreservingMergeExec: [ts@0 ASC NULLS LAST] |
| | SortExec: expr=[ts@0 ASC NULLS LAST], preserve_REDACTED
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with HAVING clause
WITH tql_grouped(ts, host, cpu) AS (
TQL EVAL (0, 40, '10s') labels
@@ -202,6 +517,37 @@ HAVING count(*) > 1;
| 1970-01-01T00:00:00 | 10 |
+---------------------+-------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_grouped(ts, host, cpu) AS (
TQL EVAL (0, 40, '10s') labels
)
SELECT
DATE_TRUNC('minute', ts) as minute,
count(*) as point_count
FROM tql_grouped
GROUP BY minute
HAVING count(*) > 1;
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: date_trunc(Utf8("minute"),tql_grouped.ts) AS minute, count(Int64(1)) AS count(*) AS point_count |
| | Filter: count(Int64(1)) > Int64(1) |
| | Aggregate: groupBy=[[date_trunc(Utf8("minute"), tql_grouped.ts)]], aggr=[[count(tql_grouped.ts) AS count(Int64(1))]] |
| | SubqueryAlias: tql_grouped |
| | Projection: labels.ts AS ts, labels.host AS host, labels.cpu AS cpu |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=["host"] |
| | Filter: labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) |
| | TableScan: labels |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with UNION
-- SQLNESS SORT_RESULT 3 1
WITH
@@ -226,6 +572,45 @@ SELECT 'host2' as source, ts, cpu FROM host2_data;
| host2 | 1970-01-01T00:00:40 | 0.5 |
+--------+---------------------+-----+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
host1_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host1"}),
host2_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"})
SELECT 'host1' as source, ts, cpu FROM host1_data
UNION ALL
SELECT 'host2' as source, ts, cpu FROM host2_data;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Union |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: Utf8("host1") AS source, host1_data.ts, host1_data.cpu |
| | SubqueryAlias: host1_data |
| | Projection: labels.ts AS ts, labels.host AS host, labels.cpu AS cpu |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=["host"] |
| | Filter: labels.host = Utf8("host1") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) |
| | TableScan: labels |
| | ]] |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: Utf8("host2") AS source, host2_data.ts, host2_data.cpu |
| | SubqueryAlias: host2_data |
| | Projection: labels.ts AS ts, labels.host AS host, labels.cpu AS cpu |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=["host"] |
| | Filter: labels.host = Utf8("host2") AND labels.ts >= TimestampMillisecond(-300000, None) AND labels.ts <= TimestampMillisecond(340000, None) |
| | TableScan: labels |
| | ]] |
| physical_plan | InterleaveExec |
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
-- Nested CTEs with TQL
WITH
base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric),
@@ -245,6 +630,46 @@ SELECT count(*) as high_values FROM final;
| 3 |
+-------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric),
processed(ts, percent) AS (
SELECT ts, val * 100 as percent
FROM base_tql
WHERE val > 0
),
final(ts, percent) AS (
SELECT * FROM processed WHERE percent > 200
)
SELECT count(*) as high_values FROM final;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: count(Int64(1)) AS count(*) AS high_values |
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
| | SubqueryAlias: final |
| | Projection: processed.ts AS ts, processed.percent AS percent |
| | Projection: processed.ts, processed.percent |
| | Filter: processed.percent > CAST(Int64(200) AS Float64) |
| | SubqueryAlias: processed |
| | Projection: base_tql.ts AS ts, percent AS percent |
| | Projection: base_tql.ts, base_tql.val * CAST(Int64(100) AS Float64) AS percent |
| | Filter: base_tql.val > CAST(Int64(0) AS Float64) |
| | SubqueryAlias: base_tql |
| | Projection: metric.ts AS ts, metric.val AS val |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with time-based functions
WITH time_shifted AS (
TQL EVAL (0, 40, '10s') metric offset 50s
@@ -261,6 +686,30 @@ SELECT * FROM time_shifted;
| 1970-01-01T00:00:40 | 3.0 |
+---------------------+-----+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH time_shifted AS (
TQL EVAL (0, 40, '10s') metric offset 50s
)
SELECT * FROM time_shifted;
+---------------+------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: time_shifted.ts, time_shifted.val |
| | SubqueryAlias: time_shifted |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesNormalize: offset=[50000], time index=[ts], filter NaN: [false] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-250000, None) AND metric.ts <= TimestampMillisecond(390000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with JOIN between TQL and regular table
-- SQLNESS SORT_RESULT 3 1
WITH tql_summary(ts, host, cpu) AS (
@@ -286,6 +735,67 @@ LIMIT 5;
| 1970-01-01T00:00:20 | host1 | host1 |
+---------------------+-----------+-------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_summary(ts, host, cpu) AS (
TQL EVAL (0, 40, '10s') avg_over_time(labels[30s])
)
SELECT
t.ts,
t.cpu as avg_value,
l.host
FROM tql_summary t
JOIN labels l ON DATE_TRUNC('second', t.ts) = DATE_TRUNC('second', l.ts)
WHERE l.host = 'host1'
ORDER BY t.ts, l.host, avg_value
LIMIT 5;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: t.ts ASC NULLS LAST, l.host ASC NULLS LAST, avg_value ASC NULLS LAST, fetch=5 |
| | Projection: t.ts, t.cpu AS avg_value, l.host |
| | Inner Join: date_trunc(Utf8("second"), t.ts) = date_trunc(Utf8("second"), l.ts) |
| | Projection: t.ts, t.cpu |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: t |
| | SubqueryAlias: tql_summary |
| | Projection: labels.ts AS ts, prom_avg_over_time(ts_range,cpu) AS host, labels.host AS cpu |
| | Filter: prom_avg_over_time(ts_range,cpu) IS NOT NULL |
| | Projection: labels.ts, prom_avg_over_time(ts_range, cpu) AS prom_avg_over_time(ts_range,cpu), labels.host |
| | PromRangeManipulate: req range=[0..40000], interval=[10000], eval range=[30000], time index=[ts], values=["cpu"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["host"] |
| | Filter: labels.ts >= TimestampMillisecond(-330000, None) AND labels.ts <= TimestampMillisecond(340000, None) |
| | TableScan: labels |
| | ]] |
| | Filter: l.host = Utf8("host1") |
| | Projection: l.ts, l.host |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: l |
| | TableScan: labels |
| | ]] |
| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST, host@2 ASC NULLS LAST, avg_value@1 ASC NULLS LAST], fetch=5 |
| | SortExec: TopK(fetch=5), expr=[ts@0 ASC NULLS LAST, host@2 ASC NULLS LAST, avg_value@1 ASC NULLS LAST], preserve_REDACTED
| | ProjectionExec: expr=[ts@0 as ts, cpu@1 as avg_value, host@2 as host] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(date_trunc(Utf8("second"),t.ts)@2, date_trunc(Utf8("second"),l.ts)@2)], projection=[ts@0, cpu@1, host@4] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: REDACTED
| | ProjectionExec: expr=[ts@0 as ts, cpu@2 as cpu, date_trunc(second, ts@0) as date_trunc(Utf8("second"),t.ts)] |
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: REDACTED
| | ProjectionExec: expr=[ts@0 as ts, host@1 as host, date_trunc(second, ts@0) as date_trunc(Utf8("second"),l.ts)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: host@1 = host1 |
| | ProjectionExec: expr=[ts@0 as ts, host@1 as host] |
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with JOIN and value aliasing
-- SQLNESS SORT_RESULT 3 1
WITH tql_summary AS (
@@ -311,6 +821,67 @@ LIMIT 5;
| 1970-01-01T00:00:20 | 0.5666666666666668 | host1 |
+---------------------+--------------------+-------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_summary AS (
TQL EVAL (0, 40, '10s') avg_over_time(labels[30s]) AS cpu
)
SELECT
t.ts,
t.cpu as avg_value,
l.host
FROM tql_summary t
JOIN labels l ON DATE_TRUNC('second', t.ts) = DATE_TRUNC('second', l.ts)
WHERE l.host = 'host1'
ORDER BY t.ts, l.host, avg_value
LIMIT 5;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: t.ts ASC NULLS LAST, l.host ASC NULLS LAST, avg_value ASC NULLS LAST, fetch=5 |
| | Projection: t.ts, t.cpu AS avg_value, l.host |
| | Inner Join: date_trunc(Utf8("second"), t.ts) = date_trunc(Utf8("second"), l.ts) |
| | Projection: t.cpu, t.ts |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: t |
| | SubqueryAlias: tql_summary |
| | Projection: prom_avg_over_time(ts_range,cpu) AS cpu, labels.host, labels.ts |
| | Filter: prom_avg_over_time(ts_range,cpu) IS NOT NULL |
| | Projection: labels.ts, prom_avg_over_time(ts_range, cpu) AS prom_avg_over_time(ts_range,cpu), labels.host |
| | PromRangeManipulate: req range=[0..40000], interval=[10000], eval range=[30000], time index=[ts], values=["cpu"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["host"] |
| | Filter: labels.ts >= TimestampMillisecond(-330000, None) AND labels.ts <= TimestampMillisecond(340000, None) |
| | TableScan: labels |
| | ]] |
| | Filter: l.host = Utf8("host1") |
| | Projection: l.ts, l.host |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: l |
| | TableScan: labels |
| | ]] |
| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST, host@2 ASC NULLS LAST, avg_value@1 ASC NULLS LAST], fetch=5 |
| | SortExec: TopK(fetch=5), expr=[ts@0 ASC NULLS LAST, host@2 ASC NULLS LAST, avg_value@1 ASC NULLS LAST], preserve_REDACTED
| | ProjectionExec: expr=[ts@1 as ts, cpu@0 as avg_value, host@2 as host] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(date_trunc(Utf8("second"),t.ts)@2, date_trunc(Utf8("second"),l.ts)@2)], projection=[cpu@0, ts@1, host@4] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: REDACTED
| | ProjectionExec: expr=[cpu@0 as cpu, ts@2 as ts, date_trunc(second, ts@2) as date_trunc(Utf8("second"),t.ts)] |
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: REDACTED
| | ProjectionExec: expr=[ts@0 as ts, host@1 as host, date_trunc(second, ts@0) as date_trunc(Utf8("second"),l.ts)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: host@1 = host1 |
| | ProjectionExec: expr=[ts@0 as ts, host@1 as host] |
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- Error case - TQL ANALYZE should fail
WITH tql_analyze AS (
TQL ANALYZE (0, 40, '10s') metric
@@ -319,6 +890,15 @@ SELECT * FROM tql_analyze limit 1;
Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_analyze AS (
TQL ANALYZE (0, 40, '10s') metric
)
SELECT * FROM tql_analyze limit 1;
Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs
-- Error case - TQL EXPLAIN should fail
WITH tql_explain AS (
TQL EXPLAIN (0, 40, '10s') metric
@@ -327,6 +907,15 @@ SELECT * FROM tql_explain limit 1;
Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_explain AS (
TQL EXPLAIN (0, 40, '10s') metric
)
SELECT * FROM tql_explain limit 1;
Error: 2000(InvalidSyntax), Invalid SQL, error: Only TQL EVAL is supported in CTEs
-- TQL CTE with lookback parameter
WITH tql_lookback AS (
TQL EVAL (0, 40, '10s', '15s') metric
@@ -339,6 +928,30 @@ SELECT count(*) FROM tql_lookback;
| 5 |
+----------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_lookback AS (
TQL EVAL (0, 40, '10s', '15s') metric
)
SELECT count(*) FROM tql_lookback;
+---------------+----------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: count(Int64(1)) AS count(*) |
| | Aggregate: groupBy=[[]], aggr=[[count(tql_lookback.ts) AS count(Int64(1))]] |
| | SubqueryAlias: tql_lookback |
| | PromInstantManipulate: range=[0..40000], lookback=[15000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-15000, None) AND metric.ts <= TimestampMillisecond(55000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------+
drop table metric;
Affected Rows: 0

View File

@@ -26,13 +26,29 @@ WITH tql as (
)
SELECT * FROM tql;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql as (
TQL EVAL (0, 40, '10s') metric
)
SELECT * FROM tql;
-- TQL CTE with column aliases
WITH tql (the_timestamp, the_value) as (
TQL EVAL (0, 40, '10s') metric
)
SELECT * FROM tql;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql (the_timestamp, the_value) as (
TQL EVAL (0, 40, '10s') metric
)
SELECT * FROM tql;
-- Explain TQL CTE
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql AS (
TQL EVAL (0, 40, '10s') metric
) SELECT * FROM tql;
@@ -43,30 +59,66 @@ WITH
filtered AS (SELECT * FROM tql_data WHERE val > 5)
SELECT count(*) FROM filtered;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') metric),
filtered AS (SELECT * FROM tql_data WHERE val > 5)
SELECT count(*) FROM filtered;
-- TQL CTE with complex PromQL expressions
WITH
tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])),
filtered (ts, val) AS (SELECT * FROM tql_data WHERE val > 0)
SELECT sum(val) FROM filtered;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
tql_data (ts, val) AS (TQL EVAL (0, 40, '10s') rate(metric[20s])),
filtered (ts, val) AS (SELECT * FROM tql_data WHERE val > 0)
SELECT sum(val) FROM filtered;
-- TQL CTE with aggregation functions
WITH tql_agg(ts, summary) AS (
TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"})
)
SELECT round(avg(summary)) as avg_sum FROM tql_agg;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_agg(ts, summary) AS (
TQL EVAL (0, 40, '10s') sum(labels{host=~"host.*"})
)
SELECT round(avg(summary)) as avg_sum FROM tql_agg;
-- TQL CTE with label selectors
WITH host_metrics AS (
TQL EVAL (0, 40, '10s') labels{host="host1"}
)
SELECT count(*) as host1_points FROM host_metrics;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH host_metrics AS (
TQL EVAL (0, 40, '10s') labels{host="host1"}
)
SELECT count(*) as host1_points FROM host_metrics;
-- TQL CTE with column reference
WITH host_metrics AS (
TQL EVAL (0, 40, '10s') labels{host="host1"}
)
SELECT host_metrics.ts, host_metrics.host FROM host_metrics;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH host_metrics AS (
TQL EVAL (0, 40, '10s') labels{host="host1"}
)
SELECT host_metrics.ts, host_metrics.host FROM host_metrics;
-- Multiple TQL CTEs referencing different tables
WITH
metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric),
@@ -79,12 +131,32 @@ WHERE m.ts = l.ts
ORDER BY m.ts
LIMIT 3;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
metric_data(ts, val) AS (TQL EVAL (0, 40, '10s') metric),
label_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"})
SELECT
m.val as metric_val,
l.cpu as label_val
FROM metric_data m, label_data l
WHERE m.ts = l.ts
ORDER BY m.ts
LIMIT 3;
-- TQL CTE with mathematical operations
WITH computed(ts, val) AS (
TQL EVAL (0, 40, '10s') metric * 2 + 1
)
SELECT min(val) as min_computed, max(val) as max_computed FROM computed;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH computed(ts, val) AS (
TQL EVAL (0, 40, '10s') metric * 2 + 1
)
SELECT min(val) as min_computed, max(val) as max_computed FROM computed;
-- TQL CTE with window functions in SQL part
WITH tql_base(ts, val) AS (
TQL EVAL (0, 40, '10s') metric
@@ -96,6 +168,18 @@ SELECT
FROM tql_base
ORDER BY ts;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_base(ts, val) AS (
TQL EVAL (0, 40, '10s') metric
)
SELECT
ts,
val,
LAG(val, 1) OVER (ORDER BY ts) as prev_value
FROM tql_base
ORDER BY ts;
-- TQL CTE with HAVING clause
WITH tql_grouped(ts, host, cpu) AS (
TQL EVAL (0, 40, '10s') labels
@@ -107,6 +191,18 @@ FROM tql_grouped
GROUP BY minute
HAVING count(*) > 1;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_grouped(ts, host, cpu) AS (
TQL EVAL (0, 40, '10s') labels
)
SELECT
DATE_TRUNC('minute', ts) as minute,
count(*) as point_count
FROM tql_grouped
GROUP BY minute
HAVING count(*) > 1;
-- TQL CTE with UNION
-- SQLNESS SORT_RESULT 3 1
WITH
@@ -116,6 +212,15 @@ SELECT 'host1' as source, ts, cpu FROM host1_data
UNION ALL
SELECT 'host2' as source, ts, cpu FROM host2_data;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
host1_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host1"}),
host2_data(ts, host, cpu) AS (TQL EVAL (0, 40, '10s') labels{host="host2"})
SELECT 'host1' as source, ts, cpu FROM host1_data
UNION ALL
SELECT 'host2' as source, ts, cpu FROM host2_data;
-- Nested CTEs with TQL
WITH
base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric),
@@ -129,12 +234,33 @@ WITH
)
SELECT count(*) as high_values FROM final;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH
base_tql(ts, val) AS (TQL EVAL (0, 40, '10s') metric),
processed(ts, percent) AS (
SELECT ts, val * 100 as percent
FROM base_tql
WHERE val > 0
),
final(ts, percent) AS (
SELECT * FROM processed WHERE percent > 200
)
SELECT count(*) as high_values FROM final;
-- TQL CTE with time-based functions
WITH time_shifted AS (
TQL EVAL (0, 40, '10s') metric offset 50s
)
SELECT * FROM time_shifted;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH time_shifted AS (
TQL EVAL (0, 40, '10s') metric offset 50s
)
SELECT * FROM time_shifted;
-- TQL CTE with JOIN between TQL and regular table
-- SQLNESS SORT_RESULT 3 1
WITH tql_summary(ts, host, cpu) AS (
@@ -150,6 +276,21 @@ WHERE l.host = 'host1'
ORDER BY t.ts, l.host, avg_value
LIMIT 5;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_summary(ts, host, cpu) AS (
TQL EVAL (0, 40, '10s') avg_over_time(labels[30s])
)
SELECT
t.ts,
t.cpu as avg_value,
l.host
FROM tql_summary t
JOIN labels l ON DATE_TRUNC('second', t.ts) = DATE_TRUNC('second', l.ts)
WHERE l.host = 'host1'
ORDER BY t.ts, l.host, avg_value
LIMIT 5;
-- TQL CTE with JOIN and value aliasing
-- SQLNESS SORT_RESULT 3 1
WITH tql_summary AS (
@@ -165,23 +306,59 @@ WHERE l.host = 'host1'
ORDER BY t.ts, l.host, avg_value
LIMIT 5;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_summary AS (
TQL EVAL (0, 40, '10s') avg_over_time(labels[30s]) AS cpu
)
SELECT
t.ts,
t.cpu as avg_value,
l.host
FROM tql_summary t
JOIN labels l ON DATE_TRUNC('second', t.ts) = DATE_TRUNC('second', l.ts)
WHERE l.host = 'host1'
ORDER BY t.ts, l.host, avg_value
LIMIT 5;
-- Error case - TQL ANALYZE should fail
WITH tql_analyze AS (
TQL ANALYZE (0, 40, '10s') metric
)
SELECT * FROM tql_analyze limit 1;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_analyze AS (
TQL ANALYZE (0, 40, '10s') metric
)
SELECT * FROM tql_analyze limit 1;
-- Error case - TQL EXPLAIN should fail
WITH tql_explain AS (
TQL EXPLAIN (0, 40, '10s') metric
)
SELECT * FROM tql_explain limit 1;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_explain AS (
TQL EXPLAIN (0, 40, '10s') metric
)
SELECT * FROM tql_explain limit 1;
-- TQL CTE with lookback parameter
WITH tql_lookback AS (
TQL EVAL (0, 40, '10s', '15s') metric
)
SELECT count(*) FROM tql_lookback;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN WITH tql_lookback AS (
TQL EVAL (0, 40, '10s', '15s') metric
)
SELECT count(*) FROM tql_lookback;
drop table metric;
drop table labels;