From bcd44b90c1a333e81a5d081cb53567f8b44d96b9 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Wed, 22 Feb 2023 11:28:09 +0800 Subject: [PATCH] feat: invoke TQL via SQL interface (#1047) * feat: impl TQL parser in sqlparser * feat: impl invoking TQL via SQL * chore: remove src/sql/src/tql_parser.rs * chore: fix typo * test: add tql test * chore: carry type Co-authored-by: LFC * chore: cr comments --------- Co-authored-by: LFC --- src/datanode/src/instance/sql.rs | 28 ++++ src/datanode/src/tests/promql_test.rs | 68 ++++++++ src/frontend/src/instance.rs | 5 +- src/query/src/datafusion/planner.rs | 2 + src/sql/src/parser.rs | 7 + src/sql/src/parsers.rs | 1 + src/sql/src/parsers/alter_parser.rs | 6 +- src/sql/src/parsers/tql_parser.rs | 220 ++++++++++++++++++++++++++ src/sql/src/statements.rs | 1 + src/sql/src/statements/statement.rs | 2 + src/sql/src/statements/tql.rs | 31 ++++ 11 files changed, 367 insertions(+), 4 deletions(-) create mode 100644 src/sql/src/parsers/tql_parser.rs create mode 100644 src/sql/src/statements/tql.rs diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 79710e9c15..9158624c53 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -30,6 +30,7 @@ use session::context::{QueryContext, QueryContextRef}; use snafu::prelude::*; use sql::ast::ObjectName; use sql::statements::statement::Statement; +use sql::statements::tql::Tql; use table::engine::TableReference; use table::requests::{CopyTableRequest, CreateDatabaseRequest, DropTableRequest}; @@ -198,6 +199,33 @@ impl Instance { .execute(SqlRequest::CopyTable(req), query_ctx) .await } + QueryStatement::Sql(Statement::Tql(tql)) => self.execute_tql(tql, query_ctx).await, + } + } + + pub(crate) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result { + match tql { + Tql::Eval(eval) => { + let promql = PromQuery { + start: eval.start, + end: eval.end, + step: eval.step, + query: eval.query, + }; + let stmt = QueryLanguageParser::parse_promql(&promql).context(ExecuteSqlSnafu)?; + let logical_plan = self + .query_engine + .statement_to_plan(stmt, query_ctx) + .context(ExecuteSqlSnafu)?; + + self.query_engine + .execute(&logical_plan) + .await + .context(ExecuteSqlSnafu) + } + Tql::Explain(_explain) => { + todo!("waiting for promql-parser ast adding a explain node") + } } } diff --git a/src/datanode/src/tests/promql_test.rs b/src/datanode/src/tests/promql_test.rs index cfd31ca81b..b64b08896d 100644 --- a/src/datanode/src/tests/promql_test.rs +++ b/src/datanode/src/tests/promql_test.rs @@ -53,6 +53,74 @@ async fn create_insert_query_assert( check_unordered_output_stream(query_output, expected).await; } +#[allow(clippy::too_many_arguments)] +async fn create_insert_tql_assert(create: &str, insert: &str, tql: &str, expected: &str) { + let instance = setup_test_instance("test_execute_insert").await; + let query_ctx = QueryContext::arc(); + instance + .inner() + .execute_sql(create, query_ctx.clone()) + .await + .unwrap(); + + instance + .inner() + .execute_sql(insert, query_ctx.clone()) + .await + .unwrap(); + + let query_output = instance + .inner() + .execute_sql(tql, query_ctx.clone()) + .await + .unwrap(); + let expected = String::from(expected); + check_unordered_output_stream(query_output, expected).await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn sql_insert_tql_query_ceil() { + create_insert_tql_assert( + r#"create table http_requests_total ( + host string, + cpu double, + memory double, + ts timestamp TIME INDEX, + PRIMARY KEY (host), + );"#, + r#"insert into http_requests_total(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 0), + ('host1', 66.6, 2048, 2000), + ('host1', 66.6, 4096, 5000), + ('host1', 43.1, 8192, 7000), + ('host1', 19.1, 10240, 9000), + ('host1', 99.1, 20480, 10000), + ('host1', 999.9, 40960, 21000), + ('host1', 31.9, 8192, 22000), + ('host1', 95.4, 333.3, 32000), + ('host1', 12423.1, 1333.3, 49000), + ('host1', 0, 2333.3, 80000), + ('host1', 49, 3333.3, 99000); + "#, + "TQL EVAL (0,100,10) ceil(http_requests_total{host=\"host1\"})", + "+---------------------+-------------------------------+----------------------------------+-------+\ + \n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) | host |\ + \n+---------------------+-------------------------------+----------------------------------+-------+\ + \n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\ + \n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\ + \n| 1970-01-01T00:00:20 | 100 | 20480 | host1 |\ + \n| 1970-01-01T00:00:30 | 32 | 8192 | host1 |\ + \n| 1970-01-01T00:00:40 | 96 | 334 | host1 |\ + \n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\ + \n| 1970-01-01T00:01:00 | 12424 | 1334 | host1 |\ + \n| 1970-01-01T00:01:10 | 12424 | 1334 | host1 |\ + \n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\ + \n| 1970-01-01T00:01:30 | 0 | 2334 | host1 |\ + \n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\ + \n+---------------------+-------------------------------+----------------------------------+-------+") + .await; +} + #[tokio::test(flavor = "multi_thread")] async fn sql_insert_promql_query_ceil() { create_insert_query_assert( diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 63a27e1088..feb4a8d326 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -397,6 +397,7 @@ impl Instance { | Statement::Delete(_) | Statement::Alter(_) | Statement::DropTable(_) + | Statement::Tql(_) | Statement::Copy(_) => self.sql_handler.do_statement_query(stmt, query_ctx).await, Statement::Use(db) => self.handle_use(db, query_ctx), Statement::ShowCreateTable(_) => NotSupportedSnafu { @@ -560,8 +561,8 @@ pub fn check_permission( } match stmt { - // query and explain will be checked in QueryEngineState - Statement::Query(_) | Statement::Explain(_) => {} + // query,explain and tql will be checked in QueryEngineState + Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) => {} // database ops won't be checked Statement::CreateDatabase(_) | Statement::ShowDatabases(_) | Statement::Use(_) => {} // show create table and alter are not supported yet diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index a7493caf9c..4a3f4878a1 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -93,6 +93,8 @@ where match statement { Statement::Query(qb) => self.query_to_plan(qb), Statement::Explain(explain) => self.explain_to_plan(explain), + // The TQL has it's a dedicated planner + Statement::Tql(_tql) => unreachable!(), Statement::ShowTables(_) | Statement::Delete(_) | Statement::ShowDatabases(_) diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index 8ad5fcff0d..454666bf6f 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -21,6 +21,7 @@ use sqlparser::tokenizer::{Token, Tokenizer}; use crate::error::{ self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result, SyntaxSnafu, TokenizerSnafu, }; +use crate::parsers::tql_parser; use crate::statements::describe::DescribeTable; use crate::statements::drop::DropTable; use crate::statements::explain::Explain; @@ -119,6 +120,12 @@ impl<'a> ParserContext<'a> { Keyword::COPY => self.parse_copy(), + Keyword::NoKeyword + if w.value.to_uppercase() == tql_parser::TQL && w.quote_style.is_none() => + { + self.parse_tql() + } + // todo(hl) support more statements. _ => self.unsupported(self.peek_token_as_string()), } diff --git a/src/sql/src/parsers.rs b/src/sql/src/parsers.rs index fb9dfc75de..d6f128bd62 100644 --- a/src/sql/src/parsers.rs +++ b/src/sql/src/parsers.rs @@ -18,3 +18,4 @@ pub(crate) mod create_parser; pub(crate) mod delete_parser; pub(crate) mod insert_parser; pub(crate) mod query_parser; +pub(crate) mod tql_parser; diff --git a/src/sql/src/parsers/alter_parser.rs b/src/sql/src/parsers/alter_parser.rs index 06e97394c8..cbd752d387 100644 --- a/src/sql/src/parsers/alter_parser.rs +++ b/src/sql/src/parsers/alter_parser.rs @@ -23,11 +23,13 @@ use crate::statements::statement::Statement; impl<'a> ParserContext<'a> { pub(crate) fn parse_alter(&mut self) -> Result { - let alter_table = self.parse().context(error::SyntaxSnafu { sql: self.sql })?; + let alter_table = self + .parse_alter_table() + .context(error::SyntaxSnafu { sql: self.sql })?; Ok(Statement::Alter(alter_table)) } - fn parse(&mut self) -> std::result::Result { + fn parse_alter_table(&mut self) -> std::result::Result { let parser = &mut self.parser; parser.expect_keywords(&[Keyword::ALTER, Keyword::TABLE])?; diff --git a/src/sql/src/parsers/tql_parser.rs b/src/sql/src/parsers/tql_parser.rs new file mode 100644 index 0000000000..7d72664073 --- /dev/null +++ b/src/sql/src/parsers/tql_parser.rs @@ -0,0 +1,220 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ResultExt; +use sqlparser::keywords::Keyword; +use sqlparser::parser::ParserError; +use sqlparser::tokenizer::Token; + +use crate::error::{self, Result}; +use crate::parser::ParserContext; +use crate::statements::statement::Statement; +use crate::statements::tql::{Tql, TqlEval, TqlExplain}; + +pub const TQL: &str = "TQL"; +const EVAL: &str = "EVAL"; +const EVALUATE: &str = "EVALUATE"; +const EXPLAIN: &str = "EXPLAIN"; +use sqlparser::parser::Parser; + +/// TQL extension parser, including: +/// - TQL EVAL +/// - TQL EXPLAIN +impl<'a> ParserContext<'a> { + pub(crate) fn parse_tql(&mut self) -> Result { + self.parser.next_token(); + + match self.parser.peek_token() { + Token::Word(w) => { + let uppercase = w.value.to_uppercase(); + match w.keyword { + Keyword::NoKeyword + if (uppercase == EVAL || uppercase == EVALUATE) + && w.quote_style.is_none() => + { + self.parser.next_token(); + self.parse_tql_eval() + .context(error::SyntaxSnafu { sql: self.sql }) + } + + Keyword::EXPLAIN => { + self.parser.next_token(); + self.parse_tql_explain() + } + + _ => self.unsupported(self.peek_token_as_string()), + } + } + unexpected => self.unsupported(unexpected.to_string()), + } + } + + fn parse_tql_eval(&mut self) -> std::result::Result { + let parser = &mut self.parser; + parser.expect_token(&Token::LParen)?; + let start = Self::parse_string_or_number(parser, Token::Comma)?; + let end = Self::parse_string_or_number(parser, Token::Comma)?; + let step = Self::parse_string_or_number(parser, Token::RParen)?; + let query = Self::parse_tql_query(parser, self.sql, ")")?; + + Ok(Statement::Tql(Tql::Eval(TqlEval { + start, + end, + step, + query, + }))) + } + + fn parse_string_or_number( + parser: &mut Parser, + token: Token, + ) -> std::result::Result { + let value = match parser.next_token() { + Token::Number(n, _) => n, + Token::DoubleQuotedString(s) | Token::SingleQuotedString(s) => s, + unexpected => { + return Err(ParserError::ParserError(format!( + "Expect number or string, but is {unexpected:?}" + ))); + } + }; + parser.expect_token(&token)?; + + Ok(value) + } + + fn parse_tql_query( + parser: &mut Parser, + sql: &str, + delimiter: &str, + ) -> std::result::Result { + let index = sql.to_uppercase().find(delimiter); + + if let Some(index) = index { + let index = index + delimiter.len() + 1; + if index >= sql.len() { + return Err(ParserError::ParserError("empty TQL query".to_string())); + } + + let query = &sql[index..]; + + while parser.next_token() != Token::EOF { + // consume all tokens + // TODO(dennis): supports multi TQL statements separated by ';'? + } + + Ok(query.trim().to_string()) + } else { + Err(ParserError::ParserError(format!("{delimiter} not found",))) + } + } + + fn parse_tql_explain(&mut self) -> Result { + let query = Self::parse_tql_query(&mut self.parser, self.sql, EXPLAIN) + .context(error::SyntaxSnafu { sql: self.sql })?; + + Ok(Statement::Tql(Tql::Explain(TqlExplain { query }))) + } +} + +#[cfg(test)] +mod tests { + use sqlparser::dialect::GenericDialect; + + use super::*; + #[test] + fn test_parse_tql() { + let sql = "TQL EVAL (1676887657, 1676887659, '1m') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + + let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + match statement { + Statement::Tql(Tql::Eval(eval)) => { + assert_eq!(eval.start, "1676887657"); + assert_eq!(eval.end, "1676887659"); + assert_eq!(eval.step, "1m"); + assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); + } + _ => unreachable!(), + } + + let sql = "TQL EVAL (1676887657.1, 1676887659.5, 30.3) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + + let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + match statement.clone() { + Statement::Tql(Tql::Eval(eval)) => { + assert_eq!(eval.start, "1676887657.1"); + assert_eq!(eval.end, "1676887659.5"); + assert_eq!(eval.step, "30.3"); + assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); + } + _ => unreachable!(), + } + + let sql = "TQL EVALUATE (1676887657.1, 1676887659.5, 30.3) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + + let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, result.len()); + + let statement2 = result.remove(0); + assert_eq!(statement, statement2); + + let sql = "tql eval ('2015-07-01T20:10:30.781Z', '2015-07-01T20:11:00.781Z', '30s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + + let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + match statement { + Statement::Tql(Tql::Eval(eval)) => { + assert_eq!(eval.start, "2015-07-01T20:10:30.781Z"); + assert_eq!(eval.end, "2015-07-01T20:11:00.781Z"); + assert_eq!(eval.step, "30s"); + assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); + } + _ => unreachable!(), + } + + let sql = "TQL EXPLAIN http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + + let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + match statement { + Statement::Tql(Tql::Explain(explain)) => { + assert_eq!(explain.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"); + } + _ => unreachable!(), + } + } + + #[test] + fn test_parse_tql_error() { + // Invalid duration + let sql = "TQL EVAL (1676887657, 1676887659, 1m) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap_err(); + assert!(result.to_string().contains("Expected ), found: m")); + + // missing end + let sql = "TQL EVAL (1676887657, '1m') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap_err(); + assert!(result.to_string().contains("Expected ,, found: )")); + } +} diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 593b746cb1..fd2486f724 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -23,6 +23,7 @@ pub mod insert; pub mod query; pub mod show; pub mod statement; +pub mod tql; use std::str::FromStr; diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 51e5c1e8bc..ff57921253 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -22,6 +22,7 @@ use crate::statements::explain::Explain; use crate::statements::insert::Insert; use crate::statements::query::Query; use crate::statements::show::{ShowCreateTable, ShowDatabases, ShowTables}; +use crate::statements::tql::Tql; /// Tokens parsed by `DFParser` are converted into these values. #[allow(clippy::large_enum_variant)] @@ -54,6 +55,7 @@ pub enum Statement { Use(String), // COPY Copy(CopyTable), + Tql(Tql), } /// Comment hints from SQL. diff --git a/src/sql/src/statements/tql.rs b/src/sql/src/statements/tql.rs new file mode 100644 index 0000000000..6967c46fb7 --- /dev/null +++ b/src/sql/src/statements/tql.rs @@ -0,0 +1,31 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Tql { + Eval(TqlEval), + Explain(TqlExplain), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TqlEval { + pub start: String, + pub end: String, + pub step: String, + pub query: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TqlExplain { + pub query: String, +}