feat: invoke TQL via SQL interface (#1047)

* feat: impl TQL parser in sqlparser

* feat: impl invoking TQL via SQL

* chore: remove src/sql/src/tql_parser.rs

* chore: fix typo

* test: add tql test

* chore: carry type

Co-authored-by: LFC <bayinamine@gmail.com>

* chore: cr comments

---------

Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
dennis zhuang
2023-02-22 11:28:09 +08:00
committed by GitHub
parent c6f2db8ae0
commit bcd44b90c1
11 changed files with 367 additions and 4 deletions

View File

@@ -30,6 +30,7 @@ use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
use table::engine::TableReference;
use table::requests::{CopyTableRequest, CreateDatabaseRequest, DropTableRequest};
@@ -198,6 +199,33 @@ impl Instance {
.execute(SqlRequest::CopyTable(req), query_ctx)
.await
}
QueryStatement::Sql(Statement::Tql(tql)) => self.execute_tql(tql, query_ctx).await,
}
}
pub(crate) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
match tql {
Tql::Eval(eval) => {
let promql = PromQuery {
start: eval.start,
end: eval.end,
step: eval.step,
query: eval.query,
};
let stmt = QueryLanguageParser::parse_promql(&promql).context(ExecuteSqlSnafu)?;
let logical_plan = self
.query_engine
.statement_to_plan(stmt, query_ctx)
.context(ExecuteSqlSnafu)?;
self.query_engine
.execute(&logical_plan)
.await
.context(ExecuteSqlSnafu)
}
Tql::Explain(_explain) => {
todo!("waiting for promql-parser ast adding a explain node")
}
}
}

View File

@@ -53,6 +53,74 @@ async fn create_insert_query_assert(
check_unordered_output_stream(query_output, expected).await;
}
#[allow(clippy::too_many_arguments)]
async fn create_insert_tql_assert(create: &str, insert: &str, tql: &str, expected: &str) {
let instance = setup_test_instance("test_execute_insert").await;
let query_ctx = QueryContext::arc();
instance
.inner()
.execute_sql(create, query_ctx.clone())
.await
.unwrap();
instance
.inner()
.execute_sql(insert, query_ctx.clone())
.await
.unwrap();
let query_output = instance
.inner()
.execute_sql(tql, query_ctx.clone())
.await
.unwrap();
let expected = String::from(expected);
check_unordered_output_stream(query_output, expected).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn sql_insert_tql_query_ceil() {
create_insert_tql_assert(
r#"create table http_requests_total (
host string,
cpu double,
memory double,
ts timestamp TIME INDEX,
PRIMARY KEY (host),
);"#,
r#"insert into http_requests_total(host, cpu, memory, ts) values
('host1', 66.6, 1024, 0),
('host1', 66.6, 2048, 2000),
('host1', 66.6, 4096, 5000),
('host1', 43.1, 8192, 7000),
('host1', 19.1, 10240, 9000),
('host1', 99.1, 20480, 10000),
('host1', 999.9, 40960, 21000),
('host1', 31.9, 8192, 22000),
('host1', 95.4, 333.3, 32000),
('host1', 12423.1, 1333.3, 49000),
('host1', 0, 2333.3, 80000),
('host1', 49, 3333.3, 99000);
"#,
"TQL EVAL (0,100,10) ceil(http_requests_total{host=\"host1\"})",
"+---------------------+-------------------------------+----------------------------------+-------+\
\n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) | host |\
\n+---------------------+-------------------------------+----------------------------------+-------+\
\n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\
\n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\
\n| 1970-01-01T00:00:20 | 100 | 20480 | host1 |\
\n| 1970-01-01T00:00:30 | 32 | 8192 | host1 |\
\n| 1970-01-01T00:00:40 | 96 | 334 | host1 |\
\n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:00 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:10 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\
\n| 1970-01-01T00:01:30 | 0 | 2334 | host1 |\
\n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\
\n+---------------------+-------------------------------+----------------------------------+-------+")
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn sql_insert_promql_query_ceil() {
create_insert_query_assert(

View File

@@ -397,6 +397,7 @@ impl Instance {
| Statement::Delete(_)
| Statement::Alter(_)
| Statement::DropTable(_)
| Statement::Tql(_)
| Statement::Copy(_) => self.sql_handler.do_statement_query(stmt, query_ctx).await,
Statement::Use(db) => self.handle_use(db, query_ctx),
Statement::ShowCreateTable(_) => NotSupportedSnafu {
@@ -560,8 +561,8 @@ pub fn check_permission(
}
match stmt {
// query and explain will be checked in QueryEngineState
Statement::Query(_) | Statement::Explain(_) => {}
// query,explain and tql will be checked in QueryEngineState
Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) => {}
// database ops won't be checked
Statement::CreateDatabase(_) | Statement::ShowDatabases(_) | Statement::Use(_) => {}
// show create table and alter are not supported yet

View File

@@ -93,6 +93,8 @@ where
match statement {
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Explain(explain) => self.explain_to_plan(explain),
// The TQL has it's a dedicated planner
Statement::Tql(_tql) => unreachable!(),
Statement::ShowTables(_)
| Statement::Delete(_)
| Statement::ShowDatabases(_)

View File

@@ -21,6 +21,7 @@ use sqlparser::tokenizer::{Token, Tokenizer};
use crate::error::{
self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result, SyntaxSnafu, TokenizerSnafu,
};
use crate::parsers::tql_parser;
use crate::statements::describe::DescribeTable;
use crate::statements::drop::DropTable;
use crate::statements::explain::Explain;
@@ -119,6 +120,12 @@ impl<'a> ParserContext<'a> {
Keyword::COPY => self.parse_copy(),
Keyword::NoKeyword
if w.value.to_uppercase() == tql_parser::TQL && w.quote_style.is_none() =>
{
self.parse_tql()
}
// todo(hl) support more statements.
_ => self.unsupported(self.peek_token_as_string()),
}

View File

@@ -18,3 +18,4 @@ pub(crate) mod create_parser;
pub(crate) mod delete_parser;
pub(crate) mod insert_parser;
pub(crate) mod query_parser;
pub(crate) mod tql_parser;

View File

@@ -23,11 +23,13 @@ use crate::statements::statement::Statement;
impl<'a> ParserContext<'a> {
pub(crate) fn parse_alter(&mut self) -> Result<Statement> {
let alter_table = self.parse().context(error::SyntaxSnafu { sql: self.sql })?;
let alter_table = self
.parse_alter_table()
.context(error::SyntaxSnafu { sql: self.sql })?;
Ok(Statement::Alter(alter_table))
}
fn parse(&mut self) -> std::result::Result<AlterTable, ParserError> {
fn parse_alter_table(&mut self) -> std::result::Result<AlterTable, ParserError> {
let parser = &mut self.parser;
parser.expect_keywords(&[Keyword::ALTER, Keyword::TABLE])?;

View File

@@ -0,0 +1,220 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::ResultExt;
use sqlparser::keywords::Keyword;
use sqlparser::parser::ParserError;
use sqlparser::tokenizer::Token;
use crate::error::{self, Result};
use crate::parser::ParserContext;
use crate::statements::statement::Statement;
use crate::statements::tql::{Tql, TqlEval, TqlExplain};
pub const TQL: &str = "TQL";
const EVAL: &str = "EVAL";
const EVALUATE: &str = "EVALUATE";
const EXPLAIN: &str = "EXPLAIN";
use sqlparser::parser::Parser;
/// TQL extension parser, including:
/// - TQL EVAL <query>
/// - TQL EXPLAIN <query>
impl<'a> ParserContext<'a> {
pub(crate) fn parse_tql(&mut self) -> Result<Statement> {
self.parser.next_token();
match self.parser.peek_token() {
Token::Word(w) => {
let uppercase = w.value.to_uppercase();
match w.keyword {
Keyword::NoKeyword
if (uppercase == EVAL || uppercase == EVALUATE)
&& w.quote_style.is_none() =>
{
self.parser.next_token();
self.parse_tql_eval()
.context(error::SyntaxSnafu { sql: self.sql })
}
Keyword::EXPLAIN => {
self.parser.next_token();
self.parse_tql_explain()
}
_ => self.unsupported(self.peek_token_as_string()),
}
}
unexpected => self.unsupported(unexpected.to_string()),
}
}
fn parse_tql_eval(&mut self) -> std::result::Result<Statement, ParserError> {
let parser = &mut self.parser;
parser.expect_token(&Token::LParen)?;
let start = Self::parse_string_or_number(parser, Token::Comma)?;
let end = Self::parse_string_or_number(parser, Token::Comma)?;
let step = Self::parse_string_or_number(parser, Token::RParen)?;
let query = Self::parse_tql_query(parser, self.sql, ")")?;
Ok(Statement::Tql(Tql::Eval(TqlEval {
start,
end,
step,
query,
})))
}
fn parse_string_or_number(
parser: &mut Parser,
token: Token,
) -> std::result::Result<String, ParserError> {
let value = match parser.next_token() {
Token::Number(n, _) => n,
Token::DoubleQuotedString(s) | Token::SingleQuotedString(s) => s,
unexpected => {
return Err(ParserError::ParserError(format!(
"Expect number or string, but is {unexpected:?}"
)));
}
};
parser.expect_token(&token)?;
Ok(value)
}
fn parse_tql_query(
parser: &mut Parser,
sql: &str,
delimiter: &str,
) -> std::result::Result<String, ParserError> {
let index = sql.to_uppercase().find(delimiter);
if let Some(index) = index {
let index = index + delimiter.len() + 1;
if index >= sql.len() {
return Err(ParserError::ParserError("empty TQL query".to_string()));
}
let query = &sql[index..];
while parser.next_token() != Token::EOF {
// consume all tokens
// TODO(dennis): supports multi TQL statements separated by ';'?
}
Ok(query.trim().to_string())
} else {
Err(ParserError::ParserError(format!("{delimiter} not found",)))
}
}
fn parse_tql_explain(&mut self) -> Result<Statement> {
let query = Self::parse_tql_query(&mut self.parser, self.sql, EXPLAIN)
.context(error::SyntaxSnafu { sql: self.sql })?;
Ok(Statement::Tql(Tql::Explain(TqlExplain { query })))
}
}
#[cfg(test)]
mod tests {
use sqlparser::dialect::GenericDialect;
use super::*;
#[test]
fn test_parse_tql() {
let sql = "TQL EVAL (1676887657, 1676887659, '1m') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
match statement {
Statement::Tql(Tql::Eval(eval)) => {
assert_eq!(eval.start, "1676887657");
assert_eq!(eval.end, "1676887659");
assert_eq!(eval.step, "1m");
assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
}
_ => unreachable!(),
}
let sql = "TQL EVAL (1676887657.1, 1676887659.5, 30.3) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
match statement.clone() {
Statement::Tql(Tql::Eval(eval)) => {
assert_eq!(eval.start, "1676887657.1");
assert_eq!(eval.end, "1676887659.5");
assert_eq!(eval.step, "30.3");
assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
}
_ => unreachable!(),
}
let sql = "TQL EVALUATE (1676887657.1, 1676887659.5, 30.3) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
assert_eq!(1, result.len());
let statement2 = result.remove(0);
assert_eq!(statement, statement2);
let sql = "tql eval ('2015-07-01T20:10:30.781Z', '2015-07-01T20:11:00.781Z', '30s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
match statement {
Statement::Tql(Tql::Eval(eval)) => {
assert_eq!(eval.start, "2015-07-01T20:10:30.781Z");
assert_eq!(eval.end, "2015-07-01T20:11:00.781Z");
assert_eq!(eval.step, "30s");
assert_eq!(eval.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
}
_ => unreachable!(),
}
let sql = "TQL EXPLAIN http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
match statement {
Statement::Tql(Tql::Explain(explain)) => {
assert_eq!(explain.query, "http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m");
}
_ => unreachable!(),
}
}
#[test]
fn test_parse_tql_error() {
// Invalid duration
let sql = "TQL EVAL (1676887657, 1676887659, 1m) http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap_err();
assert!(result.to_string().contains("Expected ), found: m"));
// missing end
let sql = "TQL EVAL (1676887657, '1m') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap_err();
assert!(result.to_string().contains("Expected ,, found: )"));
}
}

View File

@@ -23,6 +23,7 @@ pub mod insert;
pub mod query;
pub mod show;
pub mod statement;
pub mod tql;
use std::str::FromStr;

View File

@@ -22,6 +22,7 @@ use crate::statements::explain::Explain;
use crate::statements::insert::Insert;
use crate::statements::query::Query;
use crate::statements::show::{ShowCreateTable, ShowDatabases, ShowTables};
use crate::statements::tql::Tql;
/// Tokens parsed by `DFParser` are converted into these values.
#[allow(clippy::large_enum_variant)]
@@ -54,6 +55,7 @@ pub enum Statement {
Use(String),
// COPY
Copy(CopyTable),
Tql(Tql),
}
/// Comment hints from SQL.

View File

@@ -0,0 +1,31 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Tql {
Eval(TqlEval),
Explain(TqlExplain),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TqlEval {
pub start: String,
pub end: String,
pub step: String,
pub query: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TqlExplain {
pub query: String,
}