feat: Implement SHOW CREATE FLOW (#4040)

* feat: Implement SHOW CREATE FLOW

* fmt

* stmt for display

* Update src/operator/src/statement.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* test: add sqlness test

* fix test

* parse query in parser

* test: move test to standalone

* reuse ParserContext::new()

* Update tests/cases/standalone/show_create_flow.result

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* add line breaks

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
irenjj
2024-06-07 11:24:56 +08:00
committed by GitHub
parent 4719569e4f
commit 9c42825f5d
14 changed files with 318 additions and 18 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod flow_info;
pub mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flownode_flow;
pub(crate) mod table_flow;

View File

@@ -141,6 +141,26 @@ impl FlowInfoValue {
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids
}
pub fn flow_name(&self) -> &String {
&self.flow_name
}
pub fn sink_table_name(&self) -> &TableName {
&self.sink_table_name
}
pub fn raw_sql(&self) -> &String {
&self.raw_sql
}
pub fn expire_after(&self) -> Option<i64> {
self.expire_after
}
pub fn comment(&self) -> &String {
&self.comment
}
}
pub type FlowInfoManagerRef = Arc<FlowInfoManager>;

View File

@@ -514,6 +514,9 @@ pub fn check_permission(
Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
}
Statement::ShowCreateFlow(stmt) => {
validate_param(&stmt.flow_name, query_ctx)?;
}
Statement::CreateExternalTable(stmt) => {
validate_param(&stmt.name, query_ctx)?;
}

View File

@@ -239,6 +239,44 @@ impl StatementExecutor {
self.show_create_table(table_name, table_ref, query_ctx)
.await
}
Statement::ShowCreateFlow(show) => {
let obj_name = &show.flow_name;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[table] => (query_ctx.current_catalog().to_string(), table.value.clone()),
[catalog, table] => (catalog.value.clone(), table.value.clone()),
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
),
}
.fail()
}
};
let flow_name_val = self
.flow_metadata_manager
.flow_name_manager()
.get(&catalog_name, &flow_name)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::FlowNotFoundSnafu {
flow_name: &flow_name,
})?;
let flow_val = self
.flow_metadata_manager
.flow_info_manager()
.get(flow_name_val.flow_id())
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::FlowNotFoundSnafu {
flow_name: &flow_name,
})?;
self.show_create_flow(obj_name.clone(), flow_val, query_ctx)
.await
}
Statement::SetVariables(set_var) => {
let var_name = set_var.variable.to_string().to_uppercase();
match var_name.as_str() {

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::Output;
use common_telemetry::tracing;
use partition::manager::PartitionInfo;
@@ -23,6 +24,7 @@ use sql::statements::create::Partitions;
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
};
use sqlparser::ast::ObjectName;
use table::metadata::TableType;
use table::table_name::TableName;
use table::TableRef;
@@ -105,6 +107,17 @@ impl StatementExecutor {
.context(error::ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub async fn show_create_flow(
&self,
flow_name: ObjectName,
flow_val: FlowInfoValue,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_create_flow(flow_name, flow_val, query_ctx)
.context(error::ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)

View File

@@ -31,6 +31,7 @@ use common_datasource::file_format::{infer_schemas, FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::Output;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
@@ -49,10 +50,13 @@ use regex::Regex;
use session::context::QueryContextRef;
pub use show_create_table::create_table_stmt;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::Partitions;
use sql::ast::Ident;
use sql::parser::ParserContext;
use sql::statements::create::{CreateFlow, Partitions};
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
};
use sqlparser::ast::ObjectName;
use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
use table::TableRef;
@@ -134,6 +138,13 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
]))
});
static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
]))
});
fn null() -> Expr {
lit(ScalarValue::Null)
}
@@ -606,6 +617,46 @@ pub fn show_create_table(
Ok(Output::new_with_record_batches(records))
}
pub fn show_create_flow(
flow_name: ObjectName,
flow_val: FlowInfoValue,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut parser_ctx =
ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
let query = parser_ctx.parser_query().context(error::SqlSnafu)?;
let comment = if flow_val.comment().is_empty() {
None
} else {
Some(flow_val.comment().clone())
};
let stmt = CreateFlow {
flow_name,
sink_table_name: ObjectName(vec![Ident {
value: flow_val.sink_table_name().table_name.clone(),
quote_style: None,
}]),
or_replace: true,
if_not_exists: true,
expire_after: flow_val.expire_after(),
comment,
query,
};
let sql = format!("{}", stmt);
let columns = vec![
Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(records))
}
pub fn describe_table(table: TableRef) -> Result<Output> {
let table_info = table.table_info();
let columns_schemas = table_info.meta.schema.column_schemas();

View File

@@ -141,6 +141,9 @@ pub enum Error {
#[snafu(display("Invalid table name: {}", name))]
InvalidTableName { name: String },
#[snafu(display("Invalid flow name: {}", name))]
InvalidFlowName { name: String },
#[snafu(display("Invalid default constraint, column: {}", column))]
InvalidDefault {
column: String,
@@ -274,6 +277,7 @@ impl ErrorExt for Error {
| InvalidDatabaseOption { .. }
| ColumnTypeMismatch { .. }
| InvalidTableName { .. }
| InvalidFlowName { .. }
| InvalidSqlValue { .. }
| TimestampOverflow { .. }
| InvalidTableOption { .. }

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use snafu::ResultExt;
use sqlparser::ast::Ident;
use sqlparser::ast::{Ident, Query};
use sqlparser::dialect::Dialect;
use sqlparser::keywords::Keyword;
use sqlparser::parser::{Parser, ParserError, ParserOptions};
@@ -38,6 +38,21 @@ pub struct ParserContext<'a> {
}
impl<'a> ParserContext<'a> {
/// Construct a new ParserContext.
pub fn new(dialect: &'a dyn Dialect, sql: &'a str) -> Result<ParserContext<'a>> {
let parser = Parser::new(dialect)
.with_options(ParserOptions::new().with_trailing_commas(true))
.try_with_sql(sql)
.context(SyntaxSnafu)?;
Ok(ParserContext { parser, sql })
}
/// Parses parser context to Query.
pub fn parser_query(&mut self) -> Result<Box<Query>> {
Ok(Box::new(self.parser.parse_query().context(SyntaxSnafu)?))
}
/// Parses SQL with given dialect
pub fn create_with_dialect(
sql: &'a str,
@@ -46,11 +61,7 @@ impl<'a> ParserContext<'a> {
) -> Result<Vec<Statement>> {
let mut stmts: Vec<Statement> = Vec::new();
let parser = Parser::new(dialect)
.with_options(ParserOptions::new().with_trailing_commas(true))
.try_with_sql(sql)
.context(SyntaxSnafu)?;
let mut parser_ctx = ParserContext { sql, parser };
let mut parser_ctx = ParserContext::new(dialect, sql)?;
let mut expecting_statement_delimiter = false;
loop {

View File

@@ -16,11 +16,13 @@ use snafu::{ensure, ResultExt};
use sqlparser::keywords::Keyword;
use sqlparser::tokenizer::Token;
use crate::error::{self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result};
use crate::error::{
self, InvalidDatabaseNameSnafu, InvalidFlowNameSnafu, InvalidTableNameSnafu, Result,
};
use crate::parser::ParserContext;
use crate::statements::show::{
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus, ShowTables,
ShowVariables,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus,
ShowTables, ShowVariables,
};
use crate::statements::statement::Statement;
@@ -62,6 +64,8 @@ impl<'a> ParserContext<'a> {
} else if self.consume_token("CREATE") {
if self.consume_token("TABLE") {
self.parse_show_create_table()
} else if self.consume_token("FLOW") {
self.parse_show_create_flow()
} else {
self.unsupported(self.peek_token_as_string())
}
@@ -109,6 +113,24 @@ impl<'a> ParserContext<'a> {
Ok(Statement::ShowCreateTable(ShowCreateTable { table_name }))
}
fn parse_show_create_flow(&mut self) -> Result<Statement> {
let raw_flow_name = self
.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "a flow name",
actual: self.peek_token_as_string(),
})?;
let flow_name = Self::canonicalize_object_name(raw_flow_name);
ensure!(
!flow_name.0.is_empty(),
InvalidFlowNameSnafu {
name: flow_name.to_string(),
}
);
Ok(Statement::ShowCreateFlow(ShowCreateFlow { flow_name }))
}
fn parse_show_table_name(&mut self) -> Result<String> {
self.parser.next_token();
let table_name = self

View File

@@ -269,17 +269,17 @@ impl Display for CreateFlow {
if self.or_replace {
write!(f, "OR REPLACE ")?;
}
write!(f, "TASK ")?;
write!(f, "FLOW ")?;
if self.if_not_exists {
write!(f, "IF NOT EXISTS ")?;
}
write!(f, "{} ", &self.flow_name)?;
write!(f, "OUTPUT AS {} ", &self.sink_table_name)?;
writeln!(f, "{}", &self.flow_name)?;
writeln!(f, "SINK TO {}", &self.sink_table_name)?;
if let Some(expire_after) = &self.expire_after {
write!(f, "EXPIRE AFTER {} ", expire_after)?;
writeln!(f, "EXPIRE AFTER {} ", expire_after)?;
}
if let Some(comment) = &self.comment {
write!(f, "COMMENT '{}' ", comment)?;
writeln!(f, "COMMENT '{}'", comment)?;
}
write!(f, "AS {}", &self.query)
}
@@ -604,4 +604,37 @@ WITH(
}
}
}
#[test]
fn test_display_create_flow() {
let sql = r"CREATE FLOW filter_numbers
SINK TO out_num_cnt
AS SELECT number FROM numbers_input where number > 10;";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, result.len());
match &result[0] {
Statement::CreateFlow(c) => {
let new_sql = format!("\n{}", c);
assert_eq!(
r#"
CREATE FLOW filter_numbers
SINK TO out_num_cnt
AS SELECT number FROM numbers_input WHERE number > 10"#,
&new_sql
);
let new_result = ParserContext::create_with_dialect(
&new_sql,
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap();
assert_eq!(result, new_result);
}
_ => unreachable!(),
}
}
}

View File

@@ -132,6 +132,19 @@ impl Display for ShowCreateTable {
}
}
/// SQL structure for `SHOW CREATE FLOW`.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub struct ShowCreateFlow {
pub flow_name: ObjectName,
}
impl Display for ShowCreateFlow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let flow_name = &self.flow_name;
write!(f, "SHOW CREATE FLOW {flow_name}")
}
}
/// SQL structure for `SHOW VARIABLES xxx`.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub struct ShowVariables {
@@ -241,6 +254,35 @@ mod tests {
.is_err());
}
#[test]
pub fn test_show_create_flow() {
let sql = "SHOW CREATE FLOW test";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::ShowCreateFlow { .. });
match &stmts[0] {
Statement::ShowCreateFlow(show) => {
let flow_name = show.flow_name.to_string();
assert_eq!(flow_name, "test");
}
_ => {
unreachable!();
}
}
}
#[test]
pub fn test_show_create_missing_flow() {
let sql = "SHOW CREATE FLOW";
assert!(ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default()
)
.is_err());
}
#[test]
fn test_display_show_variables() {
let sql = r"show variables v1;";

View File

@@ -31,8 +31,8 @@ use crate::statements::insert::Insert;
use crate::statements::query::Query;
use crate::statements::set_variables::SetVariables;
use crate::statements::show::{
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus, ShowTables,
ShowVariables,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus,
ShowTables, ShowVariables,
};
use crate::statements::tql::Tql;
use crate::statements::truncate::TruncateTable;
@@ -81,6 +81,8 @@ pub enum Statement {
ShowIndex(ShowIndex),
// SHOW CREATE TABLE
ShowCreateTable(ShowCreateTable),
// SHOW CREATE FLOW
ShowCreateFlow(ShowCreateFlow),
// SHOW STATUS
ShowStatus(ShowStatus),
// DESCRIBE TABLE
@@ -118,6 +120,7 @@ impl Display for Statement {
Statement::ShowColumns(s) => s.fmt(f),
Statement::ShowIndex(s) => s.fmt(f),
Statement::ShowCreateTable(s) => s.fmt(f),
Statement::ShowCreateFlow(s) => s.fmt(f),
Statement::ShowStatus(s) => s.fmt(f),
Statement::DescribeTable(s) => s.fmt(f),
Statement::Explain(s) => s.fmt(f),

View File

@@ -0,0 +1,41 @@
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
create table out_num_cnt (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
Affected Rows: 0
SHOW CREATE FLOW filter_numbers;
+----------------+-------------------------------------------------------+
| Flow | Create Flow |
+----------------+-------------------------------------------------------+
| filter_numbers | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers |
| | SINK TO out_num_cnt |
| | AS SELECT number FROM numbers_input WHERE number > 10 |
+----------------+-------------------------------------------------------+
drop flow filter_numbers;
Affected Rows: 0
drop table out_num_cnt;
Affected Rows: 0
drop table numbers_input;
Affected Rows: 0

View File

@@ -0,0 +1,19 @@
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
create table out_num_cnt (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
SHOW CREATE FLOW filter_numbers;
drop flow filter_numbers;
drop table out_num_cnt;
drop table numbers_input;