feat(tql): add initial implementation for explain & analyze (#1427)

* feat(tql): resolve conflicts after merge,formatting and clippy issues, add sqlness tests, adjust explain with start, end, step

* feat(tql): adjust sqlness assertions
This commit is contained in:
Eugene Tolbakov
2023-05-16 00:28:24 +01:00
committed by GitHub
parent 2fd1075c4f
commit 122bd5f0ab
12 changed files with 347 additions and 32 deletions

View File

@@ -171,9 +171,7 @@ impl Instance {
) -> Result<Output> {
let query = PromQuery {
query: promql.to_string(),
start: "0".to_string(),
end: "0".to_string(),
step: "5m".to_string(),
..PromQuery::default()
};
let mut stmt = QueryLanguageParser::parse_promql(&query).context(ExecuteSqlSnafu)?;
match &mut stmt {

View File

@@ -12,20 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_query::Output;
use query::parser::{PromQuery, QueryLanguageParser};
use query::parser::{PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, EXPLAIN_NODE_NAME};
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::tql::Tql;
use crate::error::{
ExecLogicalPlanSnafu, NotSupportedSnafu, ParseQuerySnafu, PlanStatementSnafu, Result,
};
use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Result};
use crate::statement::StatementExecutor;
impl StatementExecutor {
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
let plan = match tql {
let stmt = match tql {
Tql::Eval(eval) => {
let promql = PromQuery {
start: eval.start,
@@ -33,20 +33,39 @@ impl StatementExecutor {
step: eval.step,
query: eval.query,
};
let stmt = QueryLanguageParser::parse_promql(&promql).context(ParseQuerySnafu)?;
self.query_engine
.planner()
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?
QueryLanguageParser::parse_promql(&promql).context(ParseQuerySnafu)?
}
Tql::Explain(_) => {
return NotSupportedSnafu {
feat: "TQL EXPLAIN",
}
.fail()
Tql::Explain(explain) => {
let promql = PromQuery {
query: explain.query,
..PromQuery::default()
};
let params = HashMap::from([("name".to_string(), EXPLAIN_NODE_NAME.to_string())]);
QueryLanguageParser::parse_promql(&promql)
.context(ParseQuerySnafu)?
.post_process(params)
.unwrap()
}
Tql::Analyze(tql_analyze) => {
let promql = PromQuery {
start: tql_analyze.start,
end: tql_analyze.end,
step: tql_analyze.step,
query: tql_analyze.query,
};
let params = HashMap::from([("name".to_string(), ANALYZE_NODE_NAME.to_string())]);
QueryLanguageParser::parse_promql(&promql)
.context(ParseQuerySnafu)?
.post_process(params)
.unwrap()
}
};
let plan = self
.query_engine
.planner()
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?;
self.query_engine
.execute(plan, query_ctx)
.await

View File

@@ -393,10 +393,30 @@ impl PromPlanner {
.build()
.context(DataFusionPlanningSnafu)?
}
PromExpr::Extension(_) => UnsupportedExprSnafu {
name: "Prom Extension",
PromExpr::Extension(promql_parser::parser::ast::Extension { expr }) => {
let children = expr.children();
let plan = self.prom_expr_to_plan(children[0].clone()).await?;
// Wrapper for the explanation/analyze of the existing plan
// https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
// if `analyze` is true, runs the actual plan and produces
// information about metrics during run.
// if `verbose` is true, prints out additional details when VERBOSE keyword is specified
match expr.name() {
"ANALYZE" => LogicalPlanBuilder::from(plan)
.explain(false, true)
.unwrap()
.build()
.context(DataFusionPlanningSnafu)?,
"EXPLAIN" => LogicalPlanBuilder::from(plan)
.explain(false, false)
.unwrap()
.build()
.context(DataFusionPlanningSnafu)?,
_ => LogicalPlanBuilder::empty(true)
.build()
.context(DataFusionPlanningSnafu)?,
}
}
.fail()?,
};
Ok(res)
}
@@ -559,7 +579,7 @@ impl PromPlanner {
Ok(logical_plan)
}
/// Convert [AggModifier] to [Column] exprs for aggregation.
/// Convert [LabelModifier] to [Column] exprs for aggregation.
/// Timestamp column and tag columns will be included.
///
/// # Side effect

View File

@@ -26,6 +26,12 @@ pub enum Error {
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String, location: Location },
#[snafu(display("Operation {} not implemented yet", operation))]
Unimplemented {
operation: String,
location: Location,
},
#[snafu(display("General catalog error: {}", source))]
Catalog {
#[snafu(backtrace)]
@@ -183,6 +189,7 @@ impl ErrorExt for Error {
match self {
QueryParse { .. } | MultipleStatements { .. } => StatusCode::InvalidSyntax,
UnsupportedExpr { .. }
| Unimplemented { .. }
| CatalogNotFound { .. }
| SchemaNotFound { .. }
| TableNotFound { .. }

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use chrono::DateTime;
@@ -19,7 +22,9 @@ use common_error::ext::PlainError;
use common_error::prelude::BoxedError;
use common_error::status_code::StatusCode;
use common_telemetry::timer;
use promql_parser::parser::EvalStmt;
use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
use promql_parser::parser::Expr::Extension;
use promql_parser::parser::{EvalStmt, Expr, ValueType};
use snafu::ResultExt;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
@@ -27,10 +32,13 @@ use sql::statements::statement::Statement;
use crate::error::{
MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, QueryParseSnafu, Result,
UnimplementedSnafu,
};
use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED};
const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m
pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
pub const ANALYZE_NODE_NAME: &str = "ANALYZE";
#[derive(Debug, Clone)]
pub enum QueryStatement {
@@ -38,6 +46,43 @@ pub enum QueryStatement {
Promql(EvalStmt),
}
impl QueryStatement {
pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
match self {
QueryStatement::Sql(_) => UnimplementedSnafu {
operation: "sql post process",
}
.fail(),
QueryStatement::Promql(eval_stmt) => {
let node_name = match params.get("name") {
Some(name) => name.as_str(),
None => "",
};
let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
Ok(QueryStatement::Promql(EvalStmt {
expr: Extension(extension_node.unwrap()),
start: eval_stmt.start,
end: eval_stmt.end,
interval: eval_stmt.interval,
lookback_delta: eval_stmt.lookback_delta,
}))
}
}
}
fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
match node_name {
ANALYZE_NODE_NAME => Some(NodeExtension {
expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
}),
EXPLAIN_NODE_NAME => Some(NodeExtension {
expr: Arc::new(ExplainExpr { expr: expr.clone() }),
}),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PromQuery {
pub query: String,
@@ -46,6 +91,17 @@ pub struct PromQuery {
pub step: String,
}
impl Default for PromQuery {
fn default() -> Self {
PromQuery {
query: String::new(),
start: String::from("0"),
end: String::from("0"),
step: String::from("5m"),
}
}
}
pub struct QueryLanguageParser {}
impl QueryLanguageParser {
@@ -66,7 +122,6 @@ impl QueryLanguageParser {
}
}
// TODO(ruihang): implement this method when parser is ready.
pub fn parse_promql(query: &PromQuery) -> Result<QueryStatement> {
let _timer = timer!(METRIC_PARSE_PROMQL_ELAPSED);
@@ -142,6 +197,51 @@ fn max_system_timestamp() -> SystemTime {
.unwrap()
}
macro_rules! define_node_ast_extension {
($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
/// The implementation of the `$name_expr` extension AST node
#[derive(Debug, Clone)]
pub struct $name_expr {
pub expr: $expr_type,
}
impl ExtensionExpr for $name_expr {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
$extension_name
}
fn value_type(&self) -> ValueType {
self.expr.value_type()
}
fn children(&self) -> &[Expr] {
std::slice::from_ref(&self.expr)
}
}
#[allow(rustdoc::broken_intra_doc_links)]
#[derive(Debug, Clone)]
pub struct $name {
pub expr: Arc<$name_expr>,
}
impl $name {
pub fn new(expr: $expr_type) -> Self {
Self {
expr: Arc::new($name_expr { expr }),
}
}
}
};
}
define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);
#[cfg(test)]
mod test {
use super::*;

View File

@@ -20,7 +20,7 @@ use sqlparser::tokenizer::Token;
use crate::error::{self, Result};
use crate::parser::ParserContext;
use crate::statements::statement::Statement;
use crate::statements::tql::{Tql, TqlEval, TqlExplain};
use crate::statements::tql::{Tql, TqlAnalyze, TqlEval, TqlExplain};
pub const TQL: &str = "TQL";
const EVAL: &str = "EVAL";
@@ -31,6 +31,7 @@ use sqlparser::parser::Parser;
/// TQL extension parser, including:
/// - TQL EVAL <query>
/// - TQL EXPLAIN <query>
/// - TQL ANALYZE <query>
impl<'a> ParserContext<'a> {
pub(crate) fn parse_tql(&mut self) -> Result<Statement> {
self.parser.next_token();
@@ -53,6 +54,11 @@ impl<'a> ParserContext<'a> {
self.parse_tql_explain()
}
Keyword::ANALYZE => {
self.parser.next_token();
self.parse_tql_analyze()
.context(error::SyntaxSnafu { sql: self.sql })
}
_ => self.unsupported(self.peek_token_as_string()),
}
}
@@ -122,10 +128,39 @@ impl<'a> ParserContext<'a> {
}
fn parse_tql_explain(&mut self) -> Result<Statement> {
let query = Self::parse_tql_query(&mut self.parser, self.sql, EXPLAIN)
let parser = &mut self.parser;
let delimiter = match parser.expect_token(&Token::LParen) {
Ok(_) => ")",
Err(_) => EXPLAIN,
};
let start = Self::parse_string_or_number(parser, Token::Comma).unwrap_or("0".to_string());
let end = Self::parse_string_or_number(parser, Token::Comma).unwrap_or("0".to_string());
let step = Self::parse_string_or_number(parser, Token::RParen).unwrap_or("5m".to_string());
let query = Self::parse_tql_query(parser, self.sql, delimiter)
.context(error::SyntaxSnafu { sql: self.sql })?;
Ok(Statement::Tql(Tql::Explain(TqlExplain { query })))
Ok(Statement::Tql(Tql::Explain(TqlExplain {
query,
start,
end,
step,
})))
}
// TODO code reuse from `parse_tql_eval`
fn parse_tql_analyze(&mut self) -> std::result::Result<Statement, ParserError> {
let parser = &mut self.parser;
parser.expect_token(&Token::LParen)?;
let start = Self::parse_string_or_number(parser, Token::Comma)?;
let end = Self::parse_string_or_number(parser, Token::Comma)?;
let step = Self::parse_string_or_number(parser, Token::RParen)?;
let query = Self::parse_tql_query(parser, self.sql, ")")?;
Ok(Statement::Tql(Tql::Analyze(TqlAnalyze {
start,
end,
step,
query,
})))
}
}
@@ -135,7 +170,7 @@ mod tests {
use super::*;
#[test]
fn test_parse_tql() {
fn test_parse_tql_eval() {
let sql = "TQL EVAL (1676887657, 1676887659, '1m') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
@@ -191,7 +226,10 @@ mod tests {
}
_ => unreachable!(),
}
}
#[test]
fn test_parse_tql_explain() {
let sql = "TQL EXPLAIN http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
@@ -201,6 +239,42 @@ mod tests {
match statement {
Statement::Tql(Tql::Explain(explain)) => {
assert_eq!(explain.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
assert_eq!(explain.start, "0");
assert_eq!(explain.end, "0");
assert_eq!(explain.step, "5m");
}
_ => unreachable!(),
}
let sql = "TQL EXPLAIN (20,100,10) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
match statement {
Statement::Tql(Tql::Explain(explain)) => {
assert_eq!(explain.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
assert_eq!(explain.start, "20");
assert_eq!(explain.end, "100");
assert_eq!(explain.step, "10");
}
_ => unreachable!(),
}
}
#[test]
fn test_parse_tql_analyze() {
let sql = "TQL ANALYZE (1676887657.1, 1676887659.5, 30.3) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
match statement {
Statement::Tql(Tql::Analyze(analyze)) => {
assert_eq!(analyze.start, "1676887657.1");
assert_eq!(analyze.end, "1676887659.5");
assert_eq!(analyze.step, "30.3");
assert_eq!(analyze.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
}
_ => unreachable!(),
}

View File

@@ -15,6 +15,7 @@
pub enum Tql {
Eval(TqlEval),
Explain(TqlExplain),
Analyze(TqlAnalyze),
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -25,7 +26,20 @@ pub struct TqlEval {
pub query: String,
}
/// TQL EXPLAIN (like SQL EXPLAIN): doesn't execute the query but tells how the query would be executed.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TqlExplain {
pub start: String,
pub end: String,
pub step: String,
pub query: String,
}
/// TQL ANALYZE (like SQL ANALYZE): executes the plan and tells the detailed per-step execution time.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TqlAnalyze {
pub start: String,
pub end: String,
pub step: String,
pub query: String,
}

View File

@@ -42,9 +42,7 @@ async fn create_insert_query_assert(
let query = PromQuery {
query: promql.to_string(),
start: "0".to_string(),
end: "0".to_string(),
step: "5m".to_string(),
..PromQuery::default()
};
let QueryStatement::Promql(mut eval_stmt) = QueryLanguageParser::parse_promql(&query).unwrap() else { unreachable!() };
eval_stmt.start = start;

View File

@@ -0,0 +1,31 @@
CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY);
Affected Rows: 0
INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
Affected Rows: 3
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
TQL ANALYZE (0, 10, '5s') test;
+-+-+
| plan_type_| plan_|
+-+-+
| Plan with Metrics | CoalescePartitionsExec, REDACTED
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_ExecutionPlan(PlaceHolder), REDACTED
|_|_|
+-+-+
DROP TABLE test;
Affected Rows: 1

View File

@@ -0,0 +1,13 @@
CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY);
-- insert two points at 1ms and one point at 2ms
INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
-- analyze at 0s, 5s and 10s. No point at 0s.
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
TQL ANALYZE (0, 10, '5s') test;
DROP TABLE test;

View File

@@ -0,0 +1,31 @@
CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY);
Affected Rows: 0
INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
Affected Rows: 3
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
TQL EXPLAIN (0, 10, '5s') test;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST |
| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | RepartitionExec: partitioning=REDACTED
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
DROP TABLE test;
Affected Rows: 1

View File

@@ -0,0 +1,10 @@
CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY);
-- insert two points at 1ms and one point at 2ms
INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
-- explain at 0s, 5s and 10s. No point at 0s.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
TQL EXPLAIN (0, 10, '5s') test;
DROP TABLE test;