diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index d38e0390b1..f0eda492b4 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -171,9 +171,7 @@ impl Instance { ) -> Result { let query = PromQuery { query: promql.to_string(), - start: "0".to_string(), - end: "0".to_string(), - step: "5m".to_string(), + ..PromQuery::default() }; let mut stmt = QueryLanguageParser::parse_promql(&query).context(ExecuteSqlSnafu)?; match &mut stmt { diff --git a/src/frontend/src/statement/tql.rs b/src/frontend/src/statement/tql.rs index 0f4a4f50a9..9b7f72c821 100644 --- a/src/frontend/src/statement/tql.rs +++ b/src/frontend/src/statement/tql.rs @@ -12,20 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use common_query::Output; -use query::parser::{PromQuery, QueryLanguageParser}; +use query::parser::{PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, EXPLAIN_NODE_NAME}; use session::context::QueryContextRef; use snafu::ResultExt; use sql::statements::tql::Tql; -use crate::error::{ - ExecLogicalPlanSnafu, NotSupportedSnafu, ParseQuerySnafu, PlanStatementSnafu, Result, -}; +use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Result}; use crate::statement::StatementExecutor; impl StatementExecutor { pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result { - let plan = match tql { + let stmt = match tql { Tql::Eval(eval) => { let promql = PromQuery { start: eval.start, @@ -33,20 +33,39 @@ impl StatementExecutor { step: eval.step, query: eval.query, }; - let stmt = QueryLanguageParser::parse_promql(&promql).context(ParseQuerySnafu)?; - self.query_engine - .planner() - .plan(stmt, query_ctx.clone()) - .await - .context(PlanStatementSnafu)? + QueryLanguageParser::parse_promql(&promql).context(ParseQuerySnafu)? } - Tql::Explain(_) => { - return NotSupportedSnafu { - feat: "TQL EXPLAIN", - } - .fail() + Tql::Explain(explain) => { + let promql = PromQuery { + query: explain.query, + ..PromQuery::default() + }; + let params = HashMap::from([("name".to_string(), EXPLAIN_NODE_NAME.to_string())]); + QueryLanguageParser::parse_promql(&promql) + .context(ParseQuerySnafu)? + .post_process(params) + .unwrap() + } + Tql::Analyze(tql_analyze) => { + let promql = PromQuery { + start: tql_analyze.start, + end: tql_analyze.end, + step: tql_analyze.step, + query: tql_analyze.query, + }; + let params = HashMap::from([("name".to_string(), ANALYZE_NODE_NAME.to_string())]); + QueryLanguageParser::parse_promql(&promql) + .context(ParseQuerySnafu)? + .post_process(params) + .unwrap() } }; + let plan = self + .query_engine + .planner() + .plan(stmt, query_ctx.clone()) + .await + .context(PlanStatementSnafu)?; self.query_engine .execute(plan, query_ctx) .await diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 69a7930978..13743cd2ea 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -393,10 +393,30 @@ impl PromPlanner { .build() .context(DataFusionPlanningSnafu)? } - PromExpr::Extension(_) => UnsupportedExprSnafu { - name: "Prom Extension", + PromExpr::Extension(promql_parser::parser::ast::Extension { expr }) => { + let children = expr.children(); + let plan = self.prom_expr_to_plan(children[0].clone()).await?; + // Wrapper for the explanation/analyze of the existing plan + // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain + // if `analyze` is true, runs the actual plan and produces + // information about metrics during run. + // if `verbose` is true, prints out additional details when VERBOSE keyword is specified + match expr.name() { + "ANALYZE" => LogicalPlanBuilder::from(plan) + .explain(false, true) + .unwrap() + .build() + .context(DataFusionPlanningSnafu)?, + "EXPLAIN" => LogicalPlanBuilder::from(plan) + .explain(false, false) + .unwrap() + .build() + .context(DataFusionPlanningSnafu)?, + _ => LogicalPlanBuilder::empty(true) + .build() + .context(DataFusionPlanningSnafu)?, + } } - .fail()?, }; Ok(res) } @@ -559,7 +579,7 @@ impl PromPlanner { Ok(logical_plan) } - /// Convert [AggModifier] to [Column] exprs for aggregation. + /// Convert [LabelModifier] to [Column] exprs for aggregation. /// Timestamp column and tag columns will be included. /// /// # Side effect diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 393728d9a9..3661dd7b5d 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -26,6 +26,12 @@ pub enum Error { #[snafu(display("Unsupported expr type: {}", name))] UnsupportedExpr { name: String, location: Location }, + #[snafu(display("Operation {} not implemented yet", operation))] + Unimplemented { + operation: String, + location: Location, + }, + #[snafu(display("General catalog error: {}", source))] Catalog { #[snafu(backtrace)] @@ -183,6 +189,7 @@ impl ErrorExt for Error { match self { QueryParse { .. } | MultipleStatements { .. } => StatusCode::InvalidSyntax, UnsupportedExpr { .. } + | Unimplemented { .. } | CatalogNotFound { .. } | SchemaNotFound { .. } | TableNotFound { .. } diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 799dd6997b..49971771e8 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use chrono::DateTime; @@ -19,7 +22,9 @@ use common_error::ext::PlainError; use common_error::prelude::BoxedError; use common_error::status_code::StatusCode; use common_telemetry::timer; -use promql_parser::parser::EvalStmt; +use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr}; +use promql_parser::parser::Expr::Extension; +use promql_parser::parser::{EvalStmt, Expr, ValueType}; use snafu::ResultExt; use sql::dialect::GenericDialect; use sql::parser::ParserContext; @@ -27,10 +32,13 @@ use sql::statements::statement::Statement; use crate::error::{ MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, QueryParseSnafu, Result, + UnimplementedSnafu, }; use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED}; const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m +pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN"; +pub const ANALYZE_NODE_NAME: &str = "ANALYZE"; #[derive(Debug, Clone)] pub enum QueryStatement { @@ -38,6 +46,43 @@ pub enum QueryStatement { Promql(EvalStmt), } +impl QueryStatement { + pub fn post_process(&self, params: HashMap) -> Result { + match self { + QueryStatement::Sql(_) => UnimplementedSnafu { + operation: "sql post process", + } + .fail(), + QueryStatement::Promql(eval_stmt) => { + let node_name = match params.get("name") { + Some(name) => name.as_str(), + None => "", + }; + let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr); + Ok(QueryStatement::Promql(EvalStmt { + expr: Extension(extension_node.unwrap()), + start: eval_stmt.start, + end: eval_stmt.end, + interval: eval_stmt.interval, + lookback_delta: eval_stmt.lookback_delta, + })) + } + } + } + + fn create_extension_node(node_name: &str, expr: &Expr) -> Option { + match node_name { + ANALYZE_NODE_NAME => Some(NodeExtension { + expr: Arc::new(AnalyzeExpr { expr: expr.clone() }), + }), + EXPLAIN_NODE_NAME => Some(NodeExtension { + expr: Arc::new(ExplainExpr { expr: expr.clone() }), + }), + _ => None, + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct PromQuery { pub query: String, @@ -46,6 +91,17 @@ pub struct PromQuery { pub step: String, } +impl Default for PromQuery { + fn default() -> Self { + PromQuery { + query: String::new(), + start: String::from("0"), + end: String::from("0"), + step: String::from("5m"), + } + } +} + pub struct QueryLanguageParser {} impl QueryLanguageParser { @@ -66,7 +122,6 @@ impl QueryLanguageParser { } } - // TODO(ruihang): implement this method when parser is ready. pub fn parse_promql(query: &PromQuery) -> Result { let _timer = timer!(METRIC_PARSE_PROMQL_ELAPSED); @@ -142,6 +197,51 @@ fn max_system_timestamp() -> SystemTime { .unwrap() } +macro_rules! define_node_ast_extension { + ($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => { + /// The implementation of the `$name_expr` extension AST node + #[derive(Debug, Clone)] + pub struct $name_expr { + pub expr: $expr_type, + } + + impl ExtensionExpr for $name_expr { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + $extension_name + } + + fn value_type(&self) -> ValueType { + self.expr.value_type() + } + + fn children(&self) -> &[Expr] { + std::slice::from_ref(&self.expr) + } + } + + #[allow(rustdoc::broken_intra_doc_links)] + #[derive(Debug, Clone)] + pub struct $name { + pub expr: Arc<$name_expr>, + } + + impl $name { + pub fn new(expr: $expr_type) -> Self { + Self { + expr: Arc::new($name_expr { expr }), + } + } + } + }; +} + +define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME); +define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME); + #[cfg(test)] mod test { use super::*; diff --git a/src/sql/src/parsers/tql_parser.rs b/src/sql/src/parsers/tql_parser.rs index 8b82a9082e..9ef6d97ea4 100644 --- a/src/sql/src/parsers/tql_parser.rs +++ b/src/sql/src/parsers/tql_parser.rs @@ -20,7 +20,7 @@ use sqlparser::tokenizer::Token; use crate::error::{self, Result}; use crate::parser::ParserContext; use crate::statements::statement::Statement; -use crate::statements::tql::{Tql, TqlEval, TqlExplain}; +use crate::statements::tql::{Tql, TqlAnalyze, TqlEval, TqlExplain}; pub const TQL: &str = "TQL"; const EVAL: &str = "EVAL"; @@ -31,6 +31,7 @@ use sqlparser::parser::Parser; /// TQL extension parser, including: /// - TQL EVAL /// - TQL EXPLAIN +/// - TQL ANALYZE impl<'a> ParserContext<'a> { pub(crate) fn parse_tql(&mut self) -> Result { self.parser.next_token(); @@ -53,6 +54,11 @@ impl<'a> ParserContext<'a> { self.parse_tql_explain() } + Keyword::ANALYZE => { + self.parser.next_token(); + self.parse_tql_analyze() + .context(error::SyntaxSnafu { sql: self.sql }) + } _ => self.unsupported(self.peek_token_as_string()), } } @@ -122,10 +128,39 @@ impl<'a> ParserContext<'a> { } fn parse_tql_explain(&mut self) -> Result { - let query = Self::parse_tql_query(&mut self.parser, self.sql, EXPLAIN) + let parser = &mut self.parser; + let delimiter = match parser.expect_token(&Token::LParen) { + Ok(_) => ")", + Err(_) => EXPLAIN, + }; + let start = Self::parse_string_or_number(parser, Token::Comma).unwrap_or("0".to_string()); + let end = Self::parse_string_or_number(parser, Token::Comma).unwrap_or("0".to_string()); + let step = Self::parse_string_or_number(parser, Token::RParen).unwrap_or("5m".to_string()); + let query = Self::parse_tql_query(parser, self.sql, delimiter) .context(error::SyntaxSnafu { sql: self.sql })?; - Ok(Statement::Tql(Tql::Explain(TqlExplain { query }))) + Ok(Statement::Tql(Tql::Explain(TqlExplain { + query, + start, + end, + step, + }))) + } + + // TODO code reuse from `parse_tql_eval` + fn parse_tql_analyze(&mut self) -> std::result::Result { + let parser = &mut self.parser; + parser.expect_token(&Token::LParen)?; + let start = Self::parse_string_or_number(parser, Token::Comma)?; + let end = Self::parse_string_or_number(parser, Token::Comma)?; + let step = Self::parse_string_or_number(parser, Token::RParen)?; + let query = Self::parse_tql_query(parser, self.sql, ")")?; + Ok(Statement::Tql(Tql::Analyze(TqlAnalyze { + start, + end, + step, + query, + }))) } } @@ -135,7 +170,7 @@ mod tests { use super::*; #[test] - fn test_parse_tql() { + fn test_parse_tql_eval() { let sql = "TQL EVAL (1676887657, 1676887659, '1m') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); @@ -191,7 +226,10 @@ mod tests { } _ => unreachable!(), } + } + #[test] + fn test_parse_tql_explain() { let sql = "TQL EXPLAIN http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); @@ -201,6 +239,42 @@ mod tests { match statement { Statement::Tql(Tql::Explain(explain)) => { assert_eq!(explain.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); + assert_eq!(explain.start, "0"); + assert_eq!(explain.end, "0"); + assert_eq!(explain.step, "5m"); + } + _ => unreachable!(), + } + + let sql = "TQL EXPLAIN (20,100,10) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + + let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + match statement { + Statement::Tql(Tql::Explain(explain)) => { + assert_eq!(explain.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); + assert_eq!(explain.start, "20"); + assert_eq!(explain.end, "100"); + assert_eq!(explain.step, "10"); + } + _ => unreachable!(), + } + } + + #[test] + fn test_parse_tql_analyze() { + let sql = "TQL ANALYZE (1676887657.1, 1676887659.5, 30.3) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, result.len()); + let statement = result.remove(0); + match statement { + Statement::Tql(Tql::Analyze(analyze)) => { + assert_eq!(analyze.start, "1676887657.1"); + assert_eq!(analyze.end, "1676887659.5"); + assert_eq!(analyze.step, "30.3"); + assert_eq!(analyze.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); } _ => unreachable!(), } diff --git a/src/sql/src/statements/tql.rs b/src/sql/src/statements/tql.rs index 6967c46fb7..d9b7187bf7 100644 --- a/src/sql/src/statements/tql.rs +++ b/src/sql/src/statements/tql.rs @@ -15,6 +15,7 @@ pub enum Tql { Eval(TqlEval), Explain(TqlExplain), + Analyze(TqlAnalyze), } #[derive(Debug, Clone, PartialEq, Eq)] @@ -25,7 +26,20 @@ pub struct TqlEval { pub query: String, } +/// TQL EXPLAIN (like SQL EXPLAIN): doesn't execute the query but tells how the query would be executed. #[derive(Debug, Clone, PartialEq, Eq)] pub struct TqlExplain { + pub start: String, + pub end: String, + pub step: String, + pub query: String, +} + +/// TQL ANALYZE (like SQL ANALYZE): executes the plan and tells the detailed per-step execution time. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TqlAnalyze { + pub start: String, + pub end: String, + pub step: String, pub query: String, } diff --git a/tests-integration/src/tests/promql_test.rs b/tests-integration/src/tests/promql_test.rs index 9e8fbfb1ad..61b1f177f0 100644 --- a/tests-integration/src/tests/promql_test.rs +++ b/tests-integration/src/tests/promql_test.rs @@ -42,9 +42,7 @@ async fn create_insert_query_assert( let query = PromQuery { query: promql.to_string(), - start: "0".to_string(), - end: "0".to_string(), - step: "5m".to_string(), + ..PromQuery::default() }; let QueryStatement::Promql(mut eval_stmt) = QueryLanguageParser::parse_promql(&query).unwrap() else { unreachable!() }; eval_stmt.start = start; diff --git a/tests/cases/standalone/common/tql/analyze.result b/tests/cases/standalone/common/tql/analyze.result new file mode 100644 index 0000000000..cfe9239883 --- /dev/null +++ b/tests/cases/standalone/common/tql/analyze.result @@ -0,0 +1,31 @@ +CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY); + +Affected Rows: 0 + +INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a"); + +Affected Rows: 3 + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +TQL ANALYZE (0, 10, '5s') test; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| Plan with Metrics | CoalescePartitionsExec, REDACTED +|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED +|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED +|_|_PromSeriesDivideExec: tags=["k"], REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_ExecutionPlan(PlaceHolder), REDACTED +|_|_| ++-+-+ + +DROP TABLE test; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/tql/analyze.sql b/tests/cases/standalone/common/tql/analyze.sql new file mode 100644 index 0000000000..677e9069bc --- /dev/null +++ b/tests/cases/standalone/common/tql/analyze.sql @@ -0,0 +1,13 @@ +CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY); + +-- insert two points at 1ms and one point at 2ms +INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a"); + +-- analyze at 0s, 5s and 10s. No point at 0s. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +TQL ANALYZE (0, 10, '5s') test; + +DROP TABLE test; diff --git a/tests/cases/standalone/common/tql/explain.result b/tests/cases/standalone/common/tql/explain.result new file mode 100644 index 0000000000..ee22ed598d --- /dev/null +++ b/tests/cases/standalone/common/tql/explain.result @@ -0,0 +1,31 @@ +CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY); + +Affected Rows: 0 + +INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a"); + +Affected Rows: 3 + +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +TQL EXPLAIN (0, 10, '5s') test; + ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | +| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivide: tags=["k"] | +| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST | +| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] | +| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | +| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivideExec: tags=["k"] | +| | RepartitionExec: partitioning=REDACTED +| | ExecutionPlan(PlaceHolder) | +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+ + +DROP TABLE test; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/tql/explain.sql b/tests/cases/standalone/common/tql/explain.sql new file mode 100644 index 0000000000..448c185c01 --- /dev/null +++ b/tests/cases/standalone/common/tql/explain.sql @@ -0,0 +1,10 @@ +CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY); + +-- insert two points at 1ms and one point at 2ms +INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a"); + +-- explain at 0s, 5s and 10s. No point at 0s. +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +TQL EXPLAIN (0, 10, '5s') test; + +DROP TABLE test;