feat: Add grpc implementation for alter table opeartions (#239)

* feat: grpc-alter impl

* fix: format

* fix cr

* Update src/datanode/src/error.rs

Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>

* Update src/datanode/src/server/grpc/ddl.rs

Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>

* fix bug

* Update src/datanode/src/server/grpc/ddl.rs

Co-authored-by: Ning Sun <sunng@protonmail.com>

* fix:format

* fix bug

Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
This commit is contained in:
Morranto
2022-09-10 21:50:21 +08:00
committed by GitHub
parent d52d1eb122
commit 628cdb89e8
6 changed files with 89 additions and 5 deletions

View File

@@ -32,6 +32,17 @@ impl Admin {
Ok(self.do_request(vec![expr]).await?.remove(0))
}
pub async fn alter(&self, expr: AlterExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = AdminExpr {
header: Some(header),
expr: Some(admin_expr::Expr::Alter(expr)),
};
Ok(self.do_request(vec![expr]).await?.remove(0))
}
/// Invariants: the lengths of input vec (`Vec<AdminExpr>`) and output vec (`Vec<AdminResult>`) are equal.
async fn do_request(&self, exprs: Vec<AdminExpr>) -> Result<Vec<AdminResult>> {
let expr_count = exprs.len();

View File

@@ -58,6 +58,9 @@ pub enum Error {
table_name: String,
},
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display(
"Columns and values number mismatch, columns: {}, values: {}",
columns,
@@ -214,6 +217,7 @@ impl ErrorExt for Error {
| Error::SqlTypeNotSupported { .. }
| Error::CreateSchema { .. }
| Error::KeyColumnNotFound { .. }
| Error::MissingField { .. }
| Error::ConstraintNotSupported { .. }
| Error::InvalidColumnDef { .. } => StatusCode::InvalidArguments,
// TODO(yingwen): Further categorize http error.

View File

@@ -328,6 +328,7 @@ impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result<AdminResult> {
let admin_resp = match expr.expr {
Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await,
Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),

View File

@@ -1,4 +1,4 @@
mod create;
mod ddl;
pub(crate) mod handler;
pub(crate) mod insert;
pub(crate) mod plan;

View File

@@ -1,15 +1,15 @@
use std::sync::Arc;
use api::v1::{AdminResult, ColumnDataType, ColumnDef, CreateExpr};
use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDataType, ColumnDef, CreateExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use futures::TryFutureExt;
use query::Output;
use snafu::prelude::*;
use table::requests::CreateTableRequest;
use table::requests::{AlterKind, AlterTableRequest, CreateTableRequest};
use crate::error::{self, Result};
use crate::error::{self, MissingFieldSnafu, Result};
use crate::instance::Instance;
use crate::server::grpc::handler::AdminResultBuilder;
use crate::sql::SqlRequest;
@@ -34,6 +34,33 @@ impl Instance {
}
}
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult {
let request = match self.alter_expr_to_request(expr).transpose() {
Some(req) => req,
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(0, 0)
.build()
}
};
let result = futures::future::ready(request)
.and_then(|request| self.sql_handler().execute(SqlRequest::Alter(request)))
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Ok(Output::RecordBatch(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
fn create_expr_to_request(&self, expr: CreateExpr) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
@@ -61,6 +88,27 @@ impl Instance {
table_options: expr.table_options,
})
}
fn alter_expr_to_request(&self, expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
match expr.kind {
Some(Kind::AddColumn(add_column)) => {
let column_def = add_column.column_def.context(MissingFieldSnafu {
field: "column_def",
})?;
let alter_kind = AlterKind::AddColumn {
new_column: create_column_schema(&column_def)?,
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
}
None => Ok(None),
}
}
}
fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {

View File

@@ -4,8 +4,10 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::ColumnDataType;
use api::v1::{
admin_result, codec::InsertBatch, column, Column, ColumnDef, CreateExpr, MutateResult,
admin_result, alter_expr::Kind, codec::InsertBatch, column, AddColumn, AlterExpr, Column,
ColumnDef, CreateExpr, MutateResult,
};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
@@ -86,6 +88,24 @@ async fn test_insert_and_select() {
}))
);
//alter
let add_column = ColumnDef {
name: "test_column".to_string(),
data_type: ColumnDataType::Int64.into(),
is_nullable: true,
};
let kind = Kind::AddColumn(AddColumn {
column_def: Some(add_column),
});
let expr = AlterExpr {
table_name: "test_table".to_string(),
catalog_name: None,
schema_name: None,
kind: Some(kind),
};
let result = admin.alter(expr).await.unwrap();
assert_eq!(result.result, None);
// insert
let values = vec![InsertBatch {
columns: vec![