From 36c41d129cb62e973a3f9ee05dfb1e593f747ef4 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Sat, 11 May 2024 20:01:48 +0900 Subject: [PATCH] feat: support to create & drop flow via grpc (#3915) * feat: support to create & drop flow via grpc * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: apply suggestions from CR --- src/frontend/src/instance/grpc.rs | 19 ++++++++++++++----- src/operator/src/error.rs | 10 +++++++++- src/operator/src/expr_factory.rs | 11 +++++------ src/operator/src/statement.rs | 16 +++++++++------- src/operator/src/statement/ddl.rs | 19 ++++++++++++++++--- 5 files changed, 53 insertions(+), 22 deletions(-) diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index a9eee1527e..c3b650a4ce 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -15,7 +15,7 @@ use api::v1::ddl_request::{Expr as DdlExpr, Expr}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; -use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests}; +use api::v1::{DeleteRequests, DropFlowExpr, InsertRequests, RowDeleteRequests, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_meta::table_name::TableName; @@ -143,11 +143,20 @@ impl GrpcQueryHandler for Instance { .truncate_table(table_name, ctx.clone()) .await? } - DdlExpr::CreateFlow(_) => { - unimplemented!() + DdlExpr::CreateFlow(expr) => { + self.statement_executor + .create_flow_inner(expr, ctx.clone()) + .await? } - DdlExpr::DropFlow(_) => { - unimplemented!() + DdlExpr::DropFlow(DropFlowExpr { + catalog_name, + flow_name, + drop_if_exists, + .. + }) => { + self.statement_executor + .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone()) + .await? } } } diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 8564ceefe8..40bce2101f 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -137,6 +137,13 @@ pub enum Error { source: datatypes::error::Error, }, + #[snafu(display("Failed to convert expr to struct"))] + InvalidExpr { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Invalid SQL, error: {}", err_msg))] InvalidSql { err_msg: String, @@ -707,7 +714,8 @@ impl ErrorExt for Error { | Error::SchemaIncompatible { .. } | Error::UnsupportedRegionRequest { .. } | Error::InvalidTableName { .. } - | Error::ConvertIdentifier { .. } => StatusCode::InvalidArguments, + | Error::ConvertIdentifier { .. } + | Error::InvalidExpr { .. } => StatusCode::InvalidArguments, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index d4ff07fe4f..e48a9ed193 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -18,12 +18,11 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, Column, ColumnDataType, - ColumnDataTypeExtension, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, + ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, + SemanticType, TableName, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; -use common_meta::rpc::ddl::CreateFlowTask; -use common_meta::table_name::TableName; use common_time::Timezone; use datafusion::sql::planner::object_name_to_table_reference; use datatypes::schema::{ColumnSchema, COMMENT_KEY}; @@ -517,7 +516,7 @@ pub(crate) fn to_alter_expr( pub fn to_create_flow_task_expr( create_flow: CreateFlow, query_ctx: &QueryContextRef, -) -> Result { +) -> Result { // retrieve sink table name let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true) @@ -561,11 +560,11 @@ pub fn to_create_flow_task_expr( }) .collect::>>()?; - Ok(CreateFlowTask { + Ok(CreateFlowExpr { catalog_name: query_ctx.current_catalog().to_string(), flow_name: create_flow.flow_name.to_string(), source_table_names, - sink_table_name, + sink_table_name: Some(sink_table_name), or_replace: create_flow.or_replace, create_if_not_exists: create_flow.if_not_exists, expire_when: create_flow diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index a803f4d080..27292ea672 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -167,13 +167,15 @@ impl StatementExecutor { let _ = self.create_external_table(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) } - Statement::CreateFlow(stmt) => { - self.create_flow(stmt, query_ctx).await?; - Ok(Output::new_with_affected_rows(0)) - } - Statement::DropFlow(_stmt) => { - // TODO(weny): implement it. - unimplemented!() + Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await, + Statement::DropFlow(stmt) => { + self.drop_flow( + query_ctx.current_catalog().to_string(), + format_raw_object_name(stmt.flow_name()), + stmt.drop_if_exists(), + query_ctx, + ) + .await } Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, Statement::DropTable(stmt) => { diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 5299b9b523..61afb4827e 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -16,7 +16,8 @@ use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::{column_def, AlterExpr, CreateTableExpr}; +use api::v1::meta::CreateFlowTask as PbCreateFlowTask; +use api::v1::{column_def, AlterExpr, CreateFlowExpr, CreateTableExpr}; use catalog::CatalogManagerRef; use chrono::Utc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -341,18 +342,30 @@ impl StatementExecutor { // TODO(ruihang): do some verification let expr = expr_factory::to_create_flow_task_expr(stmt, &query_context)?; + self.create_flow_inner(expr, query_context).await + } + + pub async fn create_flow_inner( + &self, + expr: CreateFlowExpr, + query_context: QueryContextRef, + ) -> Result { self.create_flow_procedure(expr, query_context).await?; Ok(Output::new_with_affected_rows(0)) } async fn create_flow_procedure( &self, - expr: CreateFlowTask, + expr: CreateFlowExpr, query_context: QueryContextRef, ) -> Result { + let task = CreateFlowTask::try_from(PbCreateFlowTask { + create_flow: Some(expr), + }) + .context(error::InvalidExprSnafu)?; let request = SubmitDdlTaskRequest { query_context, - task: DdlTask::new_create_flow(expr), + task: DdlTask::new_create_flow(task), }; self.procedure_executor