feat: support to create & drop flow via grpc (#3915)

* feat: support to create & drop flow via grpc

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-05-11 20:01:48 +09:00
committed by GitHub
parent 89da42dbc1
commit 36c41d129c
5 changed files with 53 additions and 22 deletions

View File

@@ -15,7 +15,7 @@
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
use api::v1::{DeleteRequests, DropFlowExpr, InsertRequests, RowDeleteRequests, RowInsertRequests};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_meta::table_name::TableName;
@@ -143,11 +143,20 @@ impl GrpcQueryHandler for Instance {
.truncate_table(table_name, ctx.clone())
.await?
}
DdlExpr::CreateFlow(_) => {
unimplemented!()
DdlExpr::CreateFlow(expr) => {
self.statement_executor
.create_flow_inner(expr, ctx.clone())
.await?
}
DdlExpr::DropFlow(_) => {
unimplemented!()
DdlExpr::DropFlow(DropFlowExpr {
catalog_name,
flow_name,
drop_if_exists,
..
}) => {
self.statement_executor
.drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
.await?
}
}
}

View File

@@ -137,6 +137,13 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display("Failed to convert expr to struct"))]
InvalidExpr {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Invalid SQL, error: {}", err_msg))]
InvalidSql {
err_msg: String,
@@ -707,7 +714,8 @@ impl ErrorExt for Error {
| Error::SchemaIncompatible { .. }
| Error::UnsupportedRegionRequest { .. }
| Error::InvalidTableName { .. }
| Error::ConvertIdentifier { .. } => StatusCode::InvalidArguments,
| Error::ConvertIdentifier { .. }
| Error::InvalidExpr { .. } => StatusCode::InvalidArguments,
Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,

View File

@@ -18,12 +18,11 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, Column, ColumnDataType,
ColumnDataTypeExtension, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType,
ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable,
SemanticType, TableName,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
use common_meta::rpc::ddl::CreateFlowTask;
use common_meta::table_name::TableName;
use common_time::Timezone;
use datafusion::sql::planner::object_name_to_table_reference;
use datatypes::schema::{ColumnSchema, COMMENT_KEY};
@@ -517,7 +516,7 @@ pub(crate) fn to_alter_expr(
pub fn to_create_flow_task_expr(
create_flow: CreateFlow,
query_ctx: &QueryContextRef,
) -> Result<CreateFlowTask> {
) -> Result<CreateFlowExpr> {
// retrieve sink table name
let sink_table_ref =
object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
@@ -561,11 +560,11 @@ pub fn to_create_flow_task_expr(
})
.collect::<Result<Vec<_>>>()?;
Ok(CreateFlowTask {
Ok(CreateFlowExpr {
catalog_name: query_ctx.current_catalog().to_string(),
flow_name: create_flow.flow_name.to_string(),
source_table_names,
sink_table_name,
sink_table_name: Some(sink_table_name),
or_replace: create_flow.or_replace,
create_if_not_exists: create_flow.if_not_exists,
expire_when: create_flow

View File

@@ -167,13 +167,15 @@ impl StatementExecutor {
let _ = self.create_external_table(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::CreateFlow(stmt) => {
self.create_flow(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::DropFlow(_stmt) => {
// TODO(weny): implement it.
unimplemented!()
Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
Statement::DropFlow(stmt) => {
self.drop_flow(
query_ctx.current_catalog().to_string(),
format_raw_object_name(stmt.flow_name()),
stmt.drop_if_exists(),
query_ctx,
)
.await
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {

View File

@@ -16,7 +16,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{column_def, AlterExpr, CreateTableExpr};
use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
use api::v1::{column_def, AlterExpr, CreateFlowExpr, CreateTableExpr};
use catalog::CatalogManagerRef;
use chrono::Utc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -341,18 +342,30 @@ impl StatementExecutor {
// TODO(ruihang): do some verification
let expr = expr_factory::to_create_flow_task_expr(stmt, &query_context)?;
self.create_flow_inner(expr, query_context).await
}
pub async fn create_flow_inner(
&self,
expr: CreateFlowExpr,
query_context: QueryContextRef,
) -> Result<Output> {
self.create_flow_procedure(expr, query_context).await?;
Ok(Output::new_with_affected_rows(0))
}
async fn create_flow_procedure(
&self,
expr: CreateFlowTask,
expr: CreateFlowExpr,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let task = CreateFlowTask::try_from(PbCreateFlowTask {
create_flow: Some(expr),
})
.context(error::InvalidExprSnafu)?;
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_flow(expr),
task: DdlTask::new_create_flow(task),
};
self.procedure_executor