diff --git a/Cargo.lock b/Cargo.lock index b6bf3838e9..1bec2b7a53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1045,7 +1045,7 @@ dependencies = [ "common-base", "common-error", "common-grpc", - "common-insert", + "common-grpc-expr", "common-query", "common-recordbatch", "common-time", @@ -1213,12 +1213,13 @@ dependencies = [ ] [[package]] -name = "common-insert" +name = "common-grpc-expr" version = "0.1.0" dependencies = [ "api", "async-trait", "common-base", + "common-catalog", "common-error", "common-query", "common-telemetry", @@ -1740,7 +1741,7 @@ dependencies = [ "common-catalog", "common-error", "common-grpc", - "common-insert", + "common-grpc-expr", "common-query", "common-recordbatch", "common-runtime", @@ -2119,7 +2120,7 @@ dependencies = [ "common-catalog", "common-error", "common-grpc", - "common-insert", + "common-grpc-expr", "common-query", "common-recordbatch", "common-runtime", diff --git a/Cargo.toml b/Cargo.toml index be37ca3790..a0a067178f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ members = [ "src/common/recordbatch", "src/common/runtime", "src/common/substrait", - "src/common/insert", + "src/common/grpc-expr", "src/common/telemetry", "src/common/time", "src/datanode", diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index 73aa6c4363..d6c415d8cf 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod column_def; pub mod error; pub mod helper; pub mod prometheus; diff --git a/src/api/src/v1.rs b/src/api/src/v1.rs index 4438ce7870..380e810f09 100644 --- a/src/api/src/v1.rs +++ b/src/api/src/v1.rs @@ -21,4 +21,5 @@ pub mod codec { tonic::include_proto!("greptime.v1.codec"); } +mod column_def; pub mod meta; diff --git a/src/api/src/column_def.rs b/src/api/src/v1/column_def.rs similarity index 100% rename from src/api/src/column_def.rs rename to src/api/src/v1/column_def.rs diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 826cebe35c..14476803b4 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -13,7 +13,7 @@ common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } -common-insert = { path = "../common/insert" } +common-grpc-expr = { path = "../common/grpc-expr" } common-time = { path = "../common/time" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ "simd", diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 9cea7d5d85..3228a74cf8 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -23,7 +23,7 @@ use api::v1::{ }; use common_error::status_code::StatusCode; use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl}; -use common_insert::column_to_vector; +use common_grpc_expr::column_to_vector; use common_query::Output; use common_recordbatch::{RecordBatch, RecordBatches}; use datafusion::physical_plan::ExecutionPlan; diff --git a/src/client/src/error.rs b/src/client/src/error.rs index add1a0989e..953fcb44f9 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -103,7 +103,7 @@ pub enum Error { #[snafu(display("Failed to convert column to vector, source: {}", source))] ColumnToVector { #[snafu(backtrace)] - source: common_insert::error::Error, + source: common_grpc_expr::error::Error, }, } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index cedc7ba19c..7856c66e16 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -97,10 +97,7 @@ mod tests { #[test] fn test_start_node_error() { fn throw_datanode_error() -> StdResult { - datanode::error::MissingFieldSnafu { - field: "test_field", - } - .fail() + datanode::error::MissingNodeIdSnafu {}.fail() } let e = throw_datanode_error() diff --git a/src/common/insert/Cargo.toml b/src/common/grpc-expr/Cargo.toml similarity index 87% rename from src/common/insert/Cargo.toml rename to src/common/grpc-expr/Cargo.toml index 8dca21eb9a..5adef32823 100644 --- a/src/common/insert/Cargo.toml +++ b/src/common/grpc-expr/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "common-insert" +name = "common-grpc-expr" version = "0.1.0" edition = "2021" license = "Apache-2.0" @@ -11,6 +11,7 @@ common-base = { path = "../base" } common-error = { path = "../error" } common-telemetry = { path = "../telemetry" } common-time = { path = "../time" } +common-catalog = { path = "../catalog" } common-query = { path = "../query" } datatypes = { path = "../../datatypes" } snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs new file mode 100644 index 0000000000..cdef37cbcb --- /dev/null +++ b/src/common/grpc-expr/src/alter.rs @@ -0,0 +1,234 @@ +// Copyright 2022 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::alter_expr::Kind; +use api::v1::{AlterExpr, CreateExpr, DropColumns}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; +use snafu::{ensure, OptionExt, ResultExt}; +use table::metadata::TableId; +use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest}; + +use crate::error::{ + ColumnNotFoundSnafu, CreateSchemaSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, + MissingTimestampColumnSnafu, Result, +}; + +/// Convert an [`AlterExpr`] to an optional [`AlterTableRequest`] +pub fn alter_expr_to_request(expr: AlterExpr) -> Result> { + match expr.kind { + Some(Kind::AddColumns(add_columns)) => { + let add_column_requests = add_columns + .add_columns + .into_iter() + .map(|ac| { + let column_def = ac.column_def.context(MissingFieldSnafu { + field: "column_def", + })?; + + let schema = + column_def + .try_as_column_schema() + .context(InvalidColumnDefSnafu { + column: &column_def.name, + })?; + Ok(AddColumnRequest { + column_schema: schema, + is_key: ac.is_key, + }) + }) + .collect::>>()?; + + let alter_kind = AlterKind::AddColumns { + columns: add_column_requests, + }; + + let request = AlterTableRequest { + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + table_name: expr.table_name, + alter_kind, + }; + Ok(Some(request)) + } + Some(Kind::DropColumns(DropColumns { drop_columns })) => { + let alter_kind = AlterKind::DropColumns { + names: drop_columns.into_iter().map(|c| c.name).collect(), + }; + + let request = AlterTableRequest { + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + table_name: expr.table_name, + alter_kind, + }; + Ok(Some(request)) + } + None => Ok(None), + } +} + +pub fn create_table_schema(expr: &CreateExpr) -> Result { + let column_schemas = expr + .column_defs + .iter() + .map(|x| { + x.try_as_column_schema() + .context(InvalidColumnDefSnafu { column: &x.name }) + }) + .collect::>>()?; + + ensure!( + column_schemas + .iter() + .any(|column| column.name == expr.time_index), + MissingTimestampColumnSnafu { + msg: format!("CreateExpr: {:?}", expr) + } + ); + + let column_schemas = column_schemas + .into_iter() + .map(|column_schema| { + if column_schema.name == expr.time_index { + column_schema.with_time_index(true) + } else { + column_schema + } + }) + .collect::>(); + + Ok(Arc::new( + SchemaBuilder::try_from(column_schemas) + .context(CreateSchemaSnafu)? + .build() + .context(CreateSchemaSnafu)?, + )) +} + +pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result { + let schema = create_table_schema(&expr)?; + let primary_key_indices = expr + .primary_keys + .iter() + .map(|key| { + schema + .column_index_by_name(key) + .context(ColumnNotFoundSnafu { + column_name: key, + table_name: &expr.table_name, + }) + }) + .collect::>>()?; + + let catalog_name = expr + .catalog_name + .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()); + let schema_name = expr + .schema_name + .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); + + let region_ids = if expr.region_ids.is_empty() { + vec![0] + } else { + expr.region_ids + }; + + Ok(CreateTableRequest { + id: table_id, + catalog_name, + schema_name, + table_name: expr.table_name, + desc: expr.desc, + schema, + region_numbers: region_ids, + primary_key_indices, + create_if_not_exists: expr.create_if_not_exists, + table_options: expr.table_options, + }) +} + +#[cfg(test)] +mod tests { + use api::v1::{AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn}; + use datatypes::prelude::ConcreteDataType; + + use super::*; + + #[test] + fn test_alter_expr_to_request() { + let expr = AlterExpr { + catalog_name: None, + schema_name: None, + table_name: "monitor".to_string(), + + kind: Some(Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some(ColumnDef { + name: "mem_usage".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: false, + default_constraint: None, + }), + is_key: false, + }], + })), + }; + + let alter_request = alter_expr_to_request(expr).unwrap().unwrap(); + assert_eq!(None, alter_request.catalog_name); + assert_eq!(None, alter_request.schema_name); + assert_eq!("monitor".to_string(), alter_request.table_name); + let add_column = match alter_request.alter_kind { + AlterKind::AddColumns { mut columns } => columns.pop().unwrap(), + _ => unreachable!(), + }; + + assert!(!add_column.is_key); + assert_eq!("mem_usage", add_column.column_schema.name); + assert_eq!( + ConcreteDataType::float64_datatype(), + add_column.column_schema.data_type + ); + } + + #[test] + fn test_drop_column_expr() { + let expr = AlterExpr { + catalog_name: Some("test_catalog".to_string()), + schema_name: Some("test_schema".to_string()), + table_name: "monitor".to_string(), + + kind: Some(Kind::DropColumns(DropColumns { + drop_columns: vec![DropColumn { + name: "mem_usage".to_string(), + }], + })), + }; + + let alter_request = alter_expr_to_request(expr).unwrap().unwrap(); + assert_eq!(Some("test_catalog".to_string()), alter_request.catalog_name); + assert_eq!(Some("test_schema".to_string()), alter_request.schema_name); + assert_eq!("monitor".to_string(), alter_request.table_name); + + let mut drop_names = match alter_request.alter_kind { + AlterKind::DropColumns { names } => names, + _ => unreachable!(), + }; + assert_eq!(1, drop_names.len()); + assert_eq!("mem_usage".to_string(), drop_names.pop().unwrap()); + } +} diff --git a/src/common/insert/src/error.rs b/src/common/grpc-expr/src/error.rs similarity index 74% rename from src/common/insert/src/error.rs rename to src/common/grpc-expr/src/error.rs index dbd455b2ec..dc0df10c46 100644 --- a/src/common/insert/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -22,7 +22,7 @@ use snafu::{Backtrace, ErrorCompat}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Column {} not found in table {}", column_name, table_name))] + #[snafu(display("Column `{}` not found in table `{}`", column_name, table_name))] ColumnNotFound { column_name: String, table_name: String, @@ -57,8 +57,8 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Missing timestamp column in request"))] - MissingTimestampColumn { backtrace: Backtrace }, + #[snafu(display("Missing timestamp column, msg: {}", msg))] + MissingTimestampColumn { msg: String, backtrace: Backtrace }, #[snafu(display("Invalid column proto: {}", err_msg))] InvalidColumnProto { @@ -70,6 +70,26 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display("Missing required field in protobuf, field: {}", field))] + MissingField { field: String, backtrace: Backtrace }, + + #[snafu(display("Invalid column default constraint, source: {}", source))] + ColumnDefaultConstraint { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display( + "Invalid column proto definition, column: {}, source: {}", + column, + source + ))] + InvalidColumnDef { + column: String, + #[snafu(backtrace)] + source: api::error::Error, + }, } pub type Result = std::result::Result; @@ -87,6 +107,9 @@ impl ErrorExt for Error { | Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments, Error::InvalidColumnProto { .. } => StatusCode::InvalidArguments, Error::CreateVector { .. } => StatusCode::InvalidArguments, + Error::MissingField { .. } => StatusCode::InvalidArguments, + Error::ColumnDefaultConstraint { source, .. } => source.status_code(), + Error::InvalidColumnDef { source, .. } => source.status_code(), } } fn backtrace_opt(&self) -> Option<&Backtrace> { diff --git a/src/common/insert/src/insert.rs b/src/common/grpc-expr/src/insert.rs similarity index 99% rename from src/common/insert/src/insert.rs rename to src/common/grpc-expr/src/insert.rs index 4f597d3d37..e37139e037 100644 --- a/src/common/insert/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -245,7 +245,10 @@ pub fn build_create_expr_from_insertion( } } - ensure!(timestamp_index != usize::MAX, MissingTimestampColumnSnafu); + ensure!( + timestamp_index != usize::MAX, + MissingTimestampColumnSnafu { msg: table_name } + ); let timestamp_field_name = columns[timestamp_index].column_name.clone(); let primary_keys = primary_key_indices diff --git a/src/common/insert/src/lib.rs b/src/common/grpc-expr/src/lib.rs similarity index 86% rename from src/common/insert/src/lib.rs rename to src/common/grpc-expr/src/lib.rs index 3bac3e0969..2c3beea6ff 100644 --- a/src/common/insert/src/lib.rs +++ b/src/common/grpc-expr/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(assert_matches)] // Copyright 2022 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,8 +13,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod alter; pub mod error; mod insert; + +pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema}; pub use insert::{ build_alter_table_request, build_create_expr_from_insertion, column_to_vector, find_new_columns, insert_batches, insertion_expr_to_request, diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 5538239b11..c673947a47 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -23,7 +23,7 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -common-insert = { path = "../common/insert" } +common-grpc-expr = { path = "../common/grpc-expr" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ "simd", ] } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a6ecd963a4..a15ee9e385 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -82,9 +82,6 @@ pub enum Error { table_name: String, }, - #[snafu(display("Missing required field in protobuf, field: {}", field))] - MissingField { field: String, backtrace: Backtrace }, - #[snafu(display("Missing timestamp column in request"))] MissingTimestampColumn { backtrace: Backtrace }, @@ -202,21 +199,16 @@ pub enum Error { source: common_grpc::Error, }, - #[snafu(display("Column datatype error, source: {}", source))] - ColumnDataType { + #[snafu(display("Failed to convert alter expr to request: {}", source))] + AlterExprToRequest { #[snafu(backtrace)] - source: api::error::Error, + source: common_grpc_expr::error::Error, }, - #[snafu(display( - "Invalid column proto definition, column: {}, source: {}", - column, - source - ))] - InvalidColumnDef { - column: String, + #[snafu(display("Failed to convert create expr to request: {}", source))] + CreateExprToRequest { #[snafu(backtrace)] - source: api::error::Error, + source: common_grpc_expr::error::Error, }, #[snafu(display("Failed to parse SQL, source: {}", source))] @@ -263,7 +255,7 @@ pub enum Error { #[snafu(display("Failed to insert data, source: {}", source))] InsertData { #[snafu(backtrace)] - source: common_insert::error::Error, + source: common_grpc_expr::error::Error, }, #[snafu(display("Insert batch is empty"))] @@ -316,6 +308,8 @@ impl ErrorExt for Error { source.status_code() } + Error::AlterExprToRequest { source, .. } + | Error::CreateExprToRequest { source, .. } => source.status_code(), Error::CreateSchema { source, .. } | Error::ConvertSchema { source, .. } | Error::VectorComputation { source } => source.status_code(), @@ -324,7 +318,6 @@ impl ErrorExt for Error { | Error::InvalidSql { .. } | Error::KeyColumnNotFound { .. } | Error::InvalidPrimaryKey { .. } - | Error::MissingField { .. } | Error::MissingTimestampColumn { .. } | Error::CatalogNotFound { .. } | Error::SchemaNotFound { .. } @@ -343,10 +336,6 @@ impl ErrorExt for Error { | Error::UnsupportedExpr { .. } | Error::Catalog { .. } => StatusCode::Internal, - Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => { - source.status_code() - } - Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), Error::StartScriptManager { source } => source.status_code(), diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index c863743d29..e9e0a48f19 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -22,7 +22,7 @@ use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_grpc::select::to_object_result; -use common_insert::insertion_expr_to_request; +use common_grpc_expr::insertion_expr_to_request; use common_query::Output; use query::plan::LogicalPlan; use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler}; @@ -56,7 +56,7 @@ impl Instance { .context(SchemaNotFoundSnafu { name: schema_name })?; let insert_batches = - common_insert::insert_batches(&values.values).context(InsertDataSnafu)?; + common_grpc_expr::insert_batches(&values.values).context(InsertDataSnafu)?; ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu); diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 7a3980c6f6..8c5a888ee6 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -12,22 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use api::result::AdminResultBuilder; -use api::v1::alter_expr::Kind; -use api::v1::{AdminResult, AlterExpr, CreateExpr, DropColumns}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use api::v1::{AdminResult, AlterExpr, CreateExpr}; use common_error::prelude::{ErrorExt, StatusCode}; +use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; use common_query::Output; use common_telemetry::{error, info}; -use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::TryFutureExt; use snafu::prelude::*; -use table::metadata::TableId; -use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest}; -use crate::error::{self, BumpTableIdSnafu, MissingFieldSnafu, Result}; +use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu}; use crate::instance::Instance; use crate::sql::SqlRequest; @@ -75,7 +69,7 @@ impl Instance { } }; - let request = create_expr_to_request(table_id, expr).await; + let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu); let result = futures::future::ready(request) .and_then(|request| self.sql_handler().execute(SqlRequest::CreateTable(request))) .await; @@ -94,14 +88,17 @@ impl Instance { } pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult { - let request = match alter_expr_to_request(expr).transpose() { - Some(req) => req, + let request = match alter_expr_to_request(expr) + .context(AlterExprToRequestSnafu) + .transpose() + { None => { return AdminResultBuilder::default() .status_code(StatusCode::Success as u32) .mutate_result(0, 0) .build() } + Some(req) => req, }; let result = futures::future::ready(request) @@ -121,154 +118,25 @@ impl Instance { } } -async fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result { - let schema = create_table_schema(&expr)?; - let primary_key_indices = expr - .primary_keys - .iter() - .map(|key| { - schema - .column_index_by_name(key) - .context(error::KeyColumnNotFoundSnafu { name: key }) - }) - .collect::>>()?; - - let catalog_name = expr - .catalog_name - .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()); - let schema_name = expr - .schema_name - .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); - - let region_ids = if expr.region_ids.is_empty() { - vec![0] - } else { - expr.region_ids - }; - - Ok(CreateTableRequest { - id: table_id, - catalog_name, - schema_name, - table_name: expr.table_name, - desc: expr.desc, - schema, - region_numbers: region_ids, - primary_key_indices, - create_if_not_exists: expr.create_if_not_exists, - table_options: expr.table_options, - }) -} - -fn alter_expr_to_request(expr: AlterExpr) -> Result> { - match expr.kind { - Some(Kind::AddColumns(add_columns)) => { - let mut add_column_requests = vec![]; - for add_column_expr in add_columns.add_columns { - let column_def = add_column_expr.column_def.context(MissingFieldSnafu { - field: "column_def", - })?; - - let schema = - column_def - .try_as_column_schema() - .context(error::InvalidColumnDefSnafu { - column: &column_def.name, - })?; - add_column_requests.push(AddColumnRequest { - column_schema: schema, - is_key: add_column_expr.is_key, - }) - } - - let alter_kind = AlterKind::AddColumns { - columns: add_column_requests, - }; - - let request = AlterTableRequest { - catalog_name: expr.catalog_name, - schema_name: expr.schema_name, - table_name: expr.table_name, - alter_kind, - }; - Ok(Some(request)) - } - Some(Kind::DropColumns(DropColumns { drop_columns })) => { - let alter_kind = AlterKind::DropColumns { - names: drop_columns.into_iter().map(|c| c.name).collect(), - }; - - let request = AlterTableRequest { - catalog_name: expr.catalog_name, - schema_name: expr.schema_name, - table_name: expr.table_name, - alter_kind, - }; - Ok(Some(request)) - } - None => Ok(None), - } -} - -fn create_table_schema(expr: &CreateExpr) -> Result { - let column_schemas = expr - .column_defs - .iter() - .map(|x| { - x.try_as_column_schema() - .context(error::InvalidColumnDefSnafu { column: &x.name }) - }) - .collect::>>()?; - - ensure!( - column_schemas - .iter() - .any(|column| column.name == expr.time_index), - error::KeyColumnNotFoundSnafu { - name: &expr.time_index, - } - ); - - let column_schemas = column_schemas - .into_iter() - .map(|column_schema| { - if column_schema.name == expr.time_index { - column_schema.with_time_index(true) - } else { - column_schema - } - }) - .collect::>(); - - Ok(Arc::new( - SchemaBuilder::try_from(column_schemas) - .context(error::CreateSchemaSnafu)? - .build() - .context(error::CreateSchemaSnafu)?, - )) -} - #[cfg(test)] mod tests { + use std::sync::Arc; + use api::v1::ColumnDef; use common_catalog::consts::MIN_USER_TABLE_ID; + use common_grpc_expr::create_table_schema; use datatypes::prelude::ConcreteDataType; - use datatypes::schema::ColumnDefaultConstraint; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef}; use datatypes::value::Value; use super::*; - use crate::tests::test_util; #[tokio::test(flavor = "multi_thread")] async fn test_create_expr_to_request() { common_telemetry::init_default_ut_logging(); - let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("create_expr_to_request"); - let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); - instance.start().await.unwrap(); - let expr = testing_create_expr(); - let request = create_expr_to_request(1024, expr).await.unwrap(); - assert_eq!(request.id, common_catalog::consts::MIN_USER_TABLE_ID); + let request = create_expr_to_request(1024, expr).unwrap(); + assert_eq!(request.id, MIN_USER_TABLE_ID); assert_eq!(request.catalog_name, "greptime".to_string()); assert_eq!(request.schema_name, "public".to_string()); assert_eq!(request.table_name, "my-metrics"); @@ -279,12 +147,13 @@ mod tests { let mut expr = testing_create_expr(); expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; - let result = create_expr_to_request(1025, expr).await; - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Specified timestamp key or primary key column not found: not-exist-column")); + let result = create_expr_to_request(1025, expr); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"), + "{}", + err_msg + ); } #[test] @@ -295,11 +164,12 @@ mod tests { expr.time_index = "not-exist-column".to_string(); let result = create_table_schema(&expr); - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Specified timestamp key or primary key column not found: not-exist-column")); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Missing timestamp column"), + "actual: {}", + err_msg + ); } #[test] diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 401ea70aaa..a56affcf17 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -20,7 +20,7 @@ common-catalog = { path = "../common/catalog" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -common-insert = { path = "../common/insert" } +common-grpc-expr = { path = "../common/grpc-expr" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ "simd", ] } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 9a23a2320a..2e56517db8 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -277,25 +277,25 @@ pub enum Error { #[snafu(display("Failed to build CreateExpr on insertion: {}", source))] BuildCreateExprOnInsertion { #[snafu(backtrace)] - source: common_insert::error::Error, + source: common_grpc_expr::error::Error, }, #[snafu(display("Failed to find new columns on insertion: {}", source))] FindNewColumnsOnInsertion { #[snafu(backtrace)] - source: common_insert::error::Error, + source: common_grpc_expr::error::Error, }, #[snafu(display("Failed to deserialize insert batching: {}", source))] DeserializeInsertBatch { #[snafu(backtrace)] - source: common_insert::error::Error, + source: common_grpc_expr::error::Error, }, #[snafu(display("Failed to deserialize insert batching: {}", source))] InsertBatchToRequest { #[snafu(backtrace)] - source: common_insert::error::Error, + source: common_grpc_expr::error::Error, }, #[snafu(display("Failed to find catalog by name: {}", catalog_name))] @@ -427,6 +427,15 @@ pub enum Error { #[snafu(display("Missing meta_client_opts section in config"))] MissingMetasrvOpts { backtrace: Backtrace }, + + #[snafu(display("Failed to convert AlterExpr to AlterRequest, source: {}", source))] + AlterExprToRequest { + #[snafu(backtrace)] + source: common_grpc_expr::error::Error, + }, + + #[snafu(display("Failed to find leaders when altering table, table: {}", table))] + LeaderNotFound { table: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -514,6 +523,8 @@ impl ErrorExt for Error { source.status_code() } Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments, + Error::AlterExprToRequest { source, .. } => source.status_code(), + Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index c8c7646c2c..99532257de 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -61,7 +61,7 @@ impl CreateExprFactory for DefaultCreateExprFactory { batch: &[InsertBatch], ) -> Result { let table_id = None; - let create_expr = common_insert::build_create_expr_from_insertion( + let create_expr = common_grpc_expr::build_create_expr_from_insertion( catalog_name, schema_name, table_id, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 57dd9a634f..8d49fc32a3 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -268,11 +268,15 @@ impl Instance { /// Handle alter expr pub async fn handle_alter(&self, expr: AlterExpr) -> Result { - self.admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME)) - .alter(expr) - .await - .and_then(admin_result_to_output) - .context(AlterTableSnafu) + match &self.dist_instance { + Some(dist_instance) => dist_instance.handle_alter_table(expr).await, + None => self + .admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME)) + .alter(expr) + .await + .and_then(admin_result_to_output) + .context(AlterTableSnafu), + } } /// Handle batch inserts @@ -333,8 +337,8 @@ impl Instance { region_number: u32, values: &insert_expr::Values, ) -> Result { - let insert_batches = - common_insert::insert_batches(&values.values).context(DeserializeInsertBatchSnafu)?; + let insert_batches = common_grpc_expr::insert_batches(&values.values) + .context(DeserializeInsertBatchSnafu)?; self.create_or_alter_table_on_demand( catalog_name, schema_name, @@ -399,8 +403,9 @@ impl Instance { } Some(table) => { let schema = table.schema(); - if let Some(add_columns) = common_insert::find_new_columns(&schema, insert_batches) - .context(FindNewColumnsOnInsertionSnafu)? + if let Some(add_columns) = + common_grpc_expr::find_new_columns(&schema, insert_batches) + .context(FindNewColumnsOnInsertionSnafu)? { info!( "Find new columns {:?} on insertion, try to alter table: {}.{}.{}", diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index f3b133096f..b5b4a6a9af 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -16,7 +16,8 @@ use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::{CreateDatabaseExpr, CreateExpr}; +use api::v1::{AlterExpr, CreateDatabaseExpr, CreateExpr}; +use catalog::CatalogList; use chrono::DateTime; use client::admin::{admin_result_to_output, Admin}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -42,10 +43,12 @@ use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ - self, CatalogEntrySerdeSnafu, ColumnDataTypeSnafu, PrimaryKeyNotFoundSnafu, RequestMetaSnafu, - Result, StartMetaClientSnafu, + self, CatalogEntrySerdeSnafu, CatalogNotFoundSnafu, CatalogSnafu, ColumnDataTypeSnafu, + PrimaryKeyNotFoundSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu, + TableNotFoundSnafu, }; use crate::partitioning::{PartitionBound, PartitionDef}; +use crate::table::DistTable; #[derive(Clone)] pub(crate) struct DistInstance { @@ -166,6 +169,34 @@ impl DistInstance { Ok(Output::AffectedRows(1)) } + pub async fn handle_alter_table(&self, expr: AlterExpr) -> Result { + let catalog_name = expr.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); + let schema_name = expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); + let table_name = expr.table_name.as_str(); + let table = self + .catalog_manager + .catalog(catalog_name) + .context(CatalogSnafu)? + .context(CatalogNotFoundSnafu { catalog_name })? + .schema(schema_name) + .context(CatalogSnafu)? + .context(SchemaNotFoundSnafu { + schema_info: format!("{}.{}", catalog_name, schema_name), + })? + .table(table_name) + .context(CatalogSnafu)? + .context(TableNotFoundSnafu { + table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name), + })?; + + let dist_table = table + .as_any() + .downcast_ref::() + .expect("Table impl must be DistTable in distributed mode"); + dist_table.alter_by_expr(expr).await?; + Ok(Output::AffectedRows(0)) + } + async fn create_table_in_meta( &self, create_table: &CreateExpr, diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 846d4f4527..8441094d50 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -20,7 +20,7 @@ use api::v1::InsertExpr; use async_trait::async_trait; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; -use common_insert::column_to_vector; +use common_grpc_expr::column_to_vector; use servers::influxdb::InfluxdbRequest; use servers::query_handler::InfluxdbLineProtocolHandler; use servers::{error as server_error, Mode}; @@ -66,7 +66,7 @@ impl Instance { for insert in inserts { let self_clone = self.clone(); let insert_batches = match &insert.expr.unwrap() { - Expr::Values(values) => common_insert::insert_batches(&values.values) + Expr::Values(values) => common_grpc_expr::insert_batches(&values.values) .context(DeserializeInsertBatchSnafu)?, Expr::Sql(_) => unreachable!(), }; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 0c07fd14f7..91211f88d9 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -18,13 +18,17 @@ use std::any::Any; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use api::v1::AlterExpr; use async_trait::async_trait; +use client::admin::Admin; use client::Database; +use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; +use common_telemetry::debug; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr as DfExpr; use datafusion::physical_plan::{ @@ -43,7 +47,7 @@ use table::Table; use tokio::sync::RwLock; use crate::datanode::DatanodeClients; -use crate::error::{self, Error, Result}; +use crate::error::{self, Error, LeaderNotFoundSnafu, RequestDatanodeSnafu, Result}; use crate::partitioning::columns::RangeColumnsPartitionRule; use crate::partitioning::range::RangePartitionRule; use crate::partitioning::{ @@ -348,6 +352,36 @@ impl DistTable { }; Ok(partition_rule) } + + /// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between + /// [`table::requests::AlterTableRequest`] and [`AlterExpr`]. + pub(crate) async fn alter_by_expr(&self, expr: AlterExpr) -> Result<()> { + let table_routes = self.table_routes.get_route(&self.table_name).await?; + let leaders = table_routes.find_leaders(); + ensure!( + !leaders.is_empty(), + LeaderNotFoundSnafu { + table: format!( + "{:?}.{:?}.{}", + expr.catalog_name, expr.schema_name, expr.table_name + ) + } + ); + for datanode in leaders { + let admin = Admin::new( + DEFAULT_CATALOG_NAME, + self.datanode_clients.get_client(&datanode).await, + ); + debug!("Sent alter table {:?} to {:?}", expr, admin); + let result = admin + .alter(expr.clone()) + .await + .context(RequestDatanodeSnafu)?; + debug!("Alter table result: {:?}", result); + // TODO(hl): We should further check and track alter result in some global DDL task tracker + } + Ok(()) + } } fn project_schema(table_schema: SchemaRef, projection: &Option>) -> SchemaRef { diff --git a/src/table/src/table.rs b/src/table/src/table.rs index f3ba11245b..9aff8a061f 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -69,7 +69,8 @@ pub trait Table: Send + Sync { Ok(FilterPushDownType::Unsupported) } - async fn alter(&self, _request: AlterTableRequest) -> Result<()> { + async fn alter(&self, request: AlterTableRequest) -> Result<()> { + let _ = request; unimplemented!() } }