From 628cdb89e83ae01f88095dd3c18cd0265605f79e Mon Sep 17 00:00:00 2001 From: Morranto <56924967+Morranto@users.noreply.github.com> Date: Sat, 10 Sep 2022 21:50:21 +0800 Subject: [PATCH] feat: Add grpc implementation for alter table opeartions (#239) * feat: grpc-alter impl * fix: format * fix cr * Update src/datanode/src/error.rs Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> * Update src/datanode/src/server/grpc/ddl.rs Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> * fix bug * Update src/datanode/src/server/grpc/ddl.rs Co-authored-by: Ning Sun * fix:format * fix bug Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> Co-authored-by: Ning Sun --- src/client/src/admin.rs | 11 ++++ src/datanode/src/error.rs | 4 ++ src/datanode/src/instance.rs | 1 + src/datanode/src/server/grpc.rs | 2 +- .../src/server/grpc/{create.rs => ddl.rs} | 54 +++++++++++++++++-- src/datanode/src/tests/grpc_test.rs | 22 +++++++- 6 files changed, 89 insertions(+), 5 deletions(-) rename src/datanode/src/server/grpc/{create.rs => ddl.rs} (81%) diff --git a/src/client/src/admin.rs b/src/client/src/admin.rs index 7c71d83083..fa4c3146b7 100644 --- a/src/client/src/admin.rs +++ b/src/client/src/admin.rs @@ -32,6 +32,17 @@ impl Admin { Ok(self.do_request(vec![expr]).await?.remove(0)) } + pub async fn alter(&self, expr: AlterExpr) -> Result { + let header = ExprHeader { + version: PROTOCOL_VERSION, + }; + let expr = AdminExpr { + header: Some(header), + expr: Some(admin_expr::Expr::Alter(expr)), + }; + Ok(self.do_request(vec![expr]).await?.remove(0)) + } + /// Invariants: the lengths of input vec (`Vec`) and output vec (`Vec`) are equal. async fn do_request(&self, exprs: Vec) -> Result> { let expr_count = exprs.len(); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a4c22290a3..ef922fe66a 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -58,6 +58,9 @@ pub enum Error { table_name: String, }, + #[snafu(display("Missing required field in protobuf, field: {}", field))] + MissingField { field: String, backtrace: Backtrace }, + #[snafu(display( "Columns and values number mismatch, columns: {}, values: {}", columns, @@ -214,6 +217,7 @@ impl ErrorExt for Error { | Error::SqlTypeNotSupported { .. } | Error::CreateSchema { .. } | Error::KeyColumnNotFound { .. } + | Error::MissingField { .. } | Error::ConstraintNotSupported { .. } | Error::InvalidColumnDef { .. } => StatusCode::InvalidArguments, // TODO(yingwen): Further categorize http error. diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 0acf68ce4f..1900a4cc03 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -328,6 +328,7 @@ impl GrpcAdminHandler for Instance { async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result { let admin_resp = match expr.expr { Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await, + Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await, other => { return servers::error::NotSupportedSnafu { feat: format!("{:?}", other), diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index c6c9ffb732..9b089c722b 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -1,4 +1,4 @@ -mod create; +mod ddl; pub(crate) mod handler; pub(crate) mod insert; pub(crate) mod plan; diff --git a/src/datanode/src/server/grpc/create.rs b/src/datanode/src/server/grpc/ddl.rs similarity index 81% rename from src/datanode/src/server/grpc/create.rs rename to src/datanode/src/server/grpc/ddl.rs index 94c7b56435..dc9397dbf1 100644 --- a/src/datanode/src/server/grpc/create.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -1,15 +1,15 @@ use std::sync::Arc; -use api::v1::{AdminResult, ColumnDataType, ColumnDef, CreateExpr}; +use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDataType, ColumnDef, CreateExpr}; use common_error::prelude::{ErrorExt, StatusCode}; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::TryFutureExt; use query::Output; use snafu::prelude::*; -use table::requests::CreateTableRequest; +use table::requests::{AlterKind, AlterTableRequest, CreateTableRequest}; -use crate::error::{self, Result}; +use crate::error::{self, MissingFieldSnafu, Result}; use crate::instance::Instance; use crate::server::grpc::handler::AdminResultBuilder; use crate::sql::SqlRequest; @@ -34,6 +34,33 @@ impl Instance { } } + pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult { + let request = match self.alter_expr_to_request(expr).transpose() { + Some(req) => req, + None => { + return AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(0, 0) + .build() + } + }; + + let result = futures::future::ready(request) + .and_then(|request| self.sql_handler().execute(SqlRequest::Alter(request))) + .await; + match result { + Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + Ok(Output::RecordBatch(_)) => unreachable!(), + Err(err) => AdminResultBuilder::default() + .status_code(err.status_code() as u32) + .err_msg(err.to_string()) + .build(), + } + } + fn create_expr_to_request(&self, expr: CreateExpr) -> Result { let schema = create_table_schema(&expr)?; @@ -61,6 +88,27 @@ impl Instance { table_options: expr.table_options, }) } + + fn alter_expr_to_request(&self, expr: AlterExpr) -> Result> { + match expr.kind { + Some(Kind::AddColumn(add_column)) => { + let column_def = add_column.column_def.context(MissingFieldSnafu { + field: "column_def", + })?; + let alter_kind = AlterKind::AddColumn { + new_column: create_column_schema(&column_def)?, + }; + let request = AlterTableRequest { + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + table_name: expr.table_name, + alter_kind, + }; + Ok(Some(request)) + } + None => Ok(None), + } + } } fn create_table_schema(expr: &CreateExpr) -> Result { diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 3785ceb3ae..bce0c7e831 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -4,8 +4,10 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use api::v1::ColumnDataType; use api::v1::{ - admin_result, codec::InsertBatch, column, Column, ColumnDef, CreateExpr, MutateResult, + admin_result, alter_expr::Kind, codec::InsertBatch, column, AddColumn, AlterExpr, Column, + ColumnDef, CreateExpr, MutateResult, }; use client::admin::Admin; use client::{Client, Database, ObjectResult}; @@ -86,6 +88,24 @@ async fn test_insert_and_select() { })) ); + //alter + let add_column = ColumnDef { + name: "test_column".to_string(), + data_type: ColumnDataType::Int64.into(), + is_nullable: true, + }; + let kind = Kind::AddColumn(AddColumn { + column_def: Some(add_column), + }); + let expr = AlterExpr { + table_name: "test_table".to_string(), + catalog_name: None, + schema_name: None, + kind: Some(kind), + }; + let result = admin.alter(expr).await.unwrap(); + assert_eq!(result.result, None); + // insert let values = vec![InsertBatch { columns: vec![