From 711e27d9faa2e5925a5855fbd139bb00565db56a Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Tue, 5 Sep 2023 14:07:50 +0800 Subject: [PATCH] feat: distributed alter table in region server (#2311) * feat: distributed alter table in region server * rebase --- Cargo.lock | 2 +- Cargo.toml | 2 +- benchmarks/src/bin/nyc-taxi.rs | 59 +++-- src/api/src/v1/column_def.rs | 2 +- src/client/examples/logical.rs | 11 +- src/common/grpc-expr/src/alter.rs | 30 ++- src/common/grpc-expr/src/insert.rs | 32 ++- src/common/grpc-expr/src/util.rs | 8 +- src/common/meta/Cargo.toml | 2 +- src/common/meta/src/ddl/alter_table.rs | 246 ++++++++++++++------ src/common/meta/src/ddl/create_table.rs | 31 +-- src/common/meta/src/ddl_manager.rs | 59 ++--- src/common/meta/src/error.rs | 16 +- src/common/meta/src/metrics.rs | 1 + src/common/query/src/lib.rs | 4 +- src/datanode/Cargo.toml | 1 + src/datanode/src/instance/grpc.rs | 79 +++---- src/datanode/src/server/grpc.rs | 56 ++--- src/frontend/src/catalog.rs | 17 +- src/frontend/src/expr_factory.rs | 36 ++- src/frontend/src/inserter.rs | 1 - src/frontend/src/instance/distributed.rs | 6 +- src/meta-srv/src/error.rs | 2 +- src/meta-srv/src/procedure/tests.rs | 283 +++++++++++++++++------ src/meta-srv/src/procedure/utils.rs | 12 +- src/meta-srv/src/service/ddl.rs | 2 +- src/sql/src/statements.rs | 12 +- src/store-api/src/error.rs | 53 +++++ src/store-api/src/lib.rs | 1 + src/store-api/src/metadata.rs | 18 +- src/store-api/src/region_request.rs | 28 ++- src/store-api/src/storage/requests.rs | 169 +++++++++++++- src/table/src/error.rs | 2 +- 33 files changed, 902 insertions(+), 381 deletions(-) create mode 100644 src/store-api/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 822f3ee276..f334ab921b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4090,7 +4090,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b585955dbbade25951d725251b66457fdd77143f#b585955dbbade25951d725251b66457fdd77143f" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2#1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 80c6f21673..bd8e99879c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b585955dbbade25951d725251b66457fdd77143f" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 424bdcd7d6..1d24f5741d 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch; use clap::Parser; use client::api::v1::column::Values; use client::api::v1::{ - Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, + Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, SemanticType, }; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; @@ -253,117 +253,136 @@ fn create_table_expr() -> CreateTableExpr { column_defs: vec![ ColumnDef { name: "VendorID".to_string(), - datatype: ColumnDataType::Int64 as i32, + data_type: ColumnDataType::Int64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }, ColumnDef { name: "tpep_pickup_datetime".to_string(), - datatype: ColumnDataType::TimestampMicrosecond as i32, + data_type: ColumnDataType::TimestampMicrosecond as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ColumnDef { name: "tpep_dropoff_datetime".to_string(), - datatype: ColumnDataType::TimestampMicrosecond as i32, + data_type: ColumnDataType::TimestampMicrosecond as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "passenger_count".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "trip_distance".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "RatecodeID".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "store_and_fwd_flag".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "PULocationID".to_string(), - datatype: ColumnDataType::Int64 as i32, + data_type: ColumnDataType::Int64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "DOLocationID".to_string(), - datatype: ColumnDataType::Int64 as i32, + data_type: ColumnDataType::Int64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "payment_type".to_string(), - datatype: ColumnDataType::Int64 as i32, + data_type: ColumnDataType::Int64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "fare_amount".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "extra".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "mta_tax".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "tip_amount".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "tolls_amount".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "improvement_surcharge".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "total_amount".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "congestion_surcharge".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "airport_fee".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ], time_index: "tpep_pickup_datetime".to_string(), diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index aa15974c12..bd29d1d986 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -20,7 +20,7 @@ use crate::helper::ColumnDataTypeWrapper; use crate::v1::ColumnDef; pub fn try_as_column_schema(column_def: &ColumnDef) -> Result { - let data_type = ColumnDataTypeWrapper::try_new(column_def.datatype)?; + let data_type = ColumnDataTypeWrapper::try_new(column_def.data_type)?; let constraint = if column_def.default_constraint.is_empty() { None diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index b4d1bd6719..0524d562ec 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId}; +use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType, TableId}; use client::{Client, Database}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use prost::Message; @@ -41,21 +41,24 @@ async fn run() { column_defs: vec![ ColumnDef { name: "timestamp".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, + data_type: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ColumnDef { name: "key".to_string(), - datatype: ColumnDataType::Uint64 as i32, + data_type: ColumnDataType::Uint64 as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }, ColumnDef { name: "value".to_string(), - datatype: ColumnDataType::Uint64 as i32, + data_type: ColumnDataType::Uint64 as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ], time_index: "timestamp".to_string(), diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index bc51fe85de..cb5cafb919 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::add_column::location::LocationType; -use api::v1::add_column::Location; +use api::v1::add_column_location::LocationType; use api::v1::alter_expr::Kind; -use api::v1::{column_def, AlterExpr, CreateTableExpr, DropColumns, RenameTable}; +use api::v1::{ + column_def, AddColumnLocation as Location, AlterExpr, CreateTableExpr, DropColumns, + RenameTable, SemanticType, +}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::AddColumnLocation; use datatypes::schema::{ColumnSchema, RawSchema}; @@ -55,7 +57,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result Result) -> Result Self { Self { context, - data: AlterTableData::new(task, alter_table_request, table_info_value, cluster_id), + data: AlterTableData::new(task, table_info_value, cluster_id), } } @@ -70,45 +79,36 @@ impl AlterTableProcedure { // Checks whether the table exists. async fn on_prepare(&mut self) -> Result { - let request = &self.data.alter_table_request; + let alter_expr = &self.alter_expr(); + let catalog = &alter_expr.catalog_name; + let schema = &alter_expr.schema_name; let manager = &self.context.table_metadata_manager; - if let AlterKind::RenameTable { new_table_name } = &request.alter_kind { + if let Kind::RenameTable(RenameTable { new_table_name }) = self.alter_kind()? { + let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name); + let exist = manager .table_name_manager() - .exists(TableNameKey::new( - &request.catalog_name, - &request.schema_name, - new_table_name, - )) + .exists(new_table_name_key) .await?; ensure!( !exist, error::TableAlreadyExistsSnafu { - table_name: common_catalog::format_full_table_name( - &request.catalog_name, - &request.schema_name, - new_table_name, - ), + table_name: TableName::from(new_table_name_key).to_string(), } ) } - let exist = manager - .table_name_manager() - .exists(TableNameKey::new( - &request.catalog_name, - &request.schema_name, - &request.table_name, - )) - .await?; + let table_name_key = TableNameKey::new(catalog, schema, &alter_expr.table_name); + + let exist = manager.table_name_manager().exists(table_name_key).await?; ensure!( exist, error::TableNotFoundSnafu { - table_name: request.table_ref().to_string() + table_name: TableName::from(table_name_key).to_string() } ); @@ -117,6 +117,124 @@ impl AlterTableProcedure { Ok(Status::executing(true)) } + fn alter_expr(&self) -> &AlterExpr { + &self.data.task.alter_table + } + + fn alter_kind(&self) -> Result<&Kind> { + self.alter_expr() + .kind + .as_ref() + .context(InvalidProtoMsgSnafu { + err_msg: "'kind' is absent", + }) + } + + pub fn create_alter_region_request(&self, region_id: RegionId) -> Result { + let table_info = self.data.table_info(); + + let kind = + match self.alter_kind()? { + Kind::AddColumns(x) => { + let mut next_column_id = table_info.meta.next_column_id; + + let add_columns = + x.add_columns + .iter() + .map(|add_column| { + let column_def = add_column.column_def.as_ref().context( + InvalidProtoMsgSnafu { + err_msg: "'column_def' is absent", + }, + )?; + + let column_id = next_column_id; + next_column_id += 1; + + let column_def = RegionColumnDef { + column_def: Some(column_def.clone()), + column_id, + }; + + Ok(AddColumn { + column_def: Some(column_def), + location: add_column.location.clone(), + }) + }) + .collect::>>()?; + + alter_request::Kind::AddColumns(AddColumns { add_columns }) + } + Kind::DropColumns(x) => { + let drop_columns = x + .drop_columns + .iter() + .map(|x| DropColumn { + name: x.name.clone(), + }) + .collect::>(); + + alter_request::Kind::DropColumns(DropColumns { drop_columns }) + } + Kind::RenameTable(_) => unreachable!(), + }; + + Ok(AlterRequest { + region_id: region_id.as_u64(), + schema_version: table_info.ident.version, + kind: Some(kind), + }) + } + + pub async fn submit_alter_region_requests(&self) -> Result { + let table_id = self.data.table_id(); + let table_ref = self.data.table_ref(); + + let TableRouteValue { region_routes, .. } = self + .context + .table_metadata_manager + .table_route_manager() + .get(table_id) + .await? + .with_context(|| TableRouteNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + let leaders = find_leaders(®ion_routes); + let mut alter_region_tasks = Vec::with_capacity(leaders.len()); + + for datanode in leaders { + let datanode_manager = self.context.datanode_manager.clone(); + + let regions = find_leader_regions(®ion_routes, &datanode); + + alter_region_tasks.push(async move { + for region in regions { + let region_id = RegionId::new(table_id, region); + let request = self.create_alter_region_request(region_id)?; + let request = RegionRequest { + header: None, + body: Some(region_request::Body::Alter(request)), + }; + debug!("Submitting {request:?} to {datanode}"); + + let requester = datanode_manager.datanode(&datanode).await; + if let Err(e) = requester.handle(request).await { + return Err(handle_operate_region_error(datanode)(e)); + } + } + Ok(()) + }); + } + + future::join_all(alter_region_tasks) + .await + .into_iter() + .collect::>>()?; + + Ok(Status::Done) + } + /// Update table metadata for rename table operation. async fn on_update_metadata_for_rename(&self, new_table_name: String) -> Result<()> { let table_metadata_manager = &self.context.table_metadata_manager; @@ -148,7 +266,8 @@ impl AlterTableProcedure { let table_ref = self.data.table_ref(); - let request = &self.data.alter_table_request; + let request = alter_expr_to_request(self.data.table_id(), self.alter_expr().clone()) + .context(ConvertAlterTableRequestSnafu)?; let new_meta = table_info .meta @@ -172,11 +291,9 @@ impl AlterTableProcedure { /// Update table metadata. async fn on_update_metadata(&mut self) -> Result { - let request = &self.data.alter_table_request; let table_id = self.data.table_id(); let table_ref = self.data.table_ref(); let new_info = self.build_new_table_info()?; - let table_metadata_manager = &self.context.table_metadata_manager; debug!( "starting update table: {} metadata, new table info {:?}", @@ -184,7 +301,7 @@ impl AlterTableProcedure { new_info ); - if let AlterKind::RenameTable { new_table_name } = &request.alter_kind { + if let Kind::RenameTable(RenameTable { new_table_name }) = self.alter_kind()? { self.on_update_metadata_for_rename(new_table_name.to_string()) .await?; } else { @@ -193,27 +310,18 @@ impl AlterTableProcedure { info!("Updated table metadata for table {table_id}"); - let TableRouteValue { region_routes, .. } = table_metadata_manager - .table_route_manager() - .get(table_id) - .await? - .with_context(|| error::TableRouteNotFoundSnafu { - table_name: table_ref.to_string(), - })?; - - self.data.region_routes = Some(region_routes); self.data.state = AlterTableState::InvalidateTableCache; Ok(Status::executing(true)) } /// Broadcasts the invalidating table cache instructions. async fn on_broadcast(&mut self) -> Result { - let table_name = self.data.table_name(); + let table_ref = self.data.table_ref(); let table_ident = TableIdent { - catalog: table_name.catalog_name, - schema: table_name.schema_name, - table: table_name.table_name, + catalog: table_ref.catalog.to_string(), + schema: table_ref.schema.to_string(), + table: table_ref.table.to_string(), table_id: self.data.table_id(), engine: self.data.table_info().meta.engine.to_string(), }; @@ -228,8 +336,14 @@ impl AlterTableProcedure { ) .await?; - self.data.state = AlterTableState::DatanodeAlterTable; - Ok(Status::executing(true)) + let alter_kind = self.alter_kind()?; + if matches!(alter_kind, Kind::RenameTable { .. }) { + Ok(Status::Done) + } else { + self.data.state = AlterTableState::SubmitAlterRegionRequests; + + Ok(Status::executing(true)) + } } fn lock_key_inner(&self) -> Vec { @@ -241,8 +355,7 @@ impl AlterTableProcedure { ); let mut lock_key = vec![table_key]; - if let AlterKind::RenameTable { new_table_name } = &self.data.alter_table_request.alter_kind - { + if let Ok(Kind::RenameTable(RenameTable { new_table_name })) = self.alter_kind() { lock_key.push(common_catalog::format_full_table_name( table_ref.catalog, table_ref.schema, @@ -269,11 +382,18 @@ impl Procedure for AlterTableProcedure { } }; - match self.data.state { + let state = &self.data.state; + + let _timer = common_telemetry::timer!( + metrics::METRIC_META_PROCEDURE_ALTER_TABLE, + &[("step", state.as_ref().to_string())] + ); + + match state { AlterTableState::Prepare => self.on_prepare().await, AlterTableState::UpdateMetadata => self.on_update_metadata().await, AlterTableState::InvalidateTableCache => self.on_broadcast().await, - AlterTableState::DatanodeAlterTable => todo!(), + AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await, } .map_err(error_handler) } @@ -289,7 +409,7 @@ impl Procedure for AlterTableProcedure { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, AsRefStr)] enum AlterTableState { /// Prepares to alter the table Prepare, @@ -297,32 +417,22 @@ enum AlterTableState { UpdateMetadata, /// Broadcasts the invalidating table cache instruction. InvalidateTableCache, - /// Datanode alters the table. - DatanodeAlterTable, + SubmitAlterRegionRequests, } #[derive(Debug, Serialize, Deserialize)] pub struct AlterTableData { state: AlterTableState, task: AlterTableTask, - alter_table_request: AlterTableRequest, - region_routes: Option>, table_info_value: TableInfoValue, cluster_id: u64, } impl AlterTableData { - pub fn new( - task: AlterTableTask, - alter_table_request: AlterTableRequest, - table_info_value: TableInfoValue, - cluster_id: u64, - ) -> Self { + pub fn new(task: AlterTableTask, table_info_value: TableInfoValue, cluster_id: u64) -> Self { Self { state: AlterTableState::Prepare, task, - alter_table_request, - region_routes: None, table_info_value, cluster_id, } @@ -332,10 +442,6 @@ impl AlterTableData { self.task.table_ref() } - fn table_name(&self) -> TableName { - self.task.table_name() - } - fn table_id(&self) -> TableId { self.table_info().ident.table_id } diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 82f2903a7a..dbaf0f82ce 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -14,9 +14,9 @@ use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{ - ColumnDef, CreateRequest as PbCreateRegionRequest, RegionRequest, RegionRequestHeader, + CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, }; -use api::v1::SemanticType; +use api::v1::{ColumnDef, SemanticType}; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; @@ -42,7 +42,6 @@ pub struct CreateTableProcedure { pub creator: TableCreator, } -#[allow(dead_code)] impl CreateTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; @@ -124,13 +123,15 @@ impl CreateTableProcedure { SemanticType::Field }; - ColumnDef { - name: c.name.clone(), + RegionColumnDef { + column_def: Some(ColumnDef { + name: c.name.clone(), + data_type: c.data_type, + is_nullable: c.is_nullable, + default_constraint: c.default_constraint.clone(), + semantic_type: semantic_type as i32, + }), column_id: i as u32, - datatype: c.datatype, - is_nullable: c.is_nullable, - default_constraint: c.default_constraint.clone(), - semantic_type: semantic_type as i32, } }) .collect::>(); @@ -142,11 +143,13 @@ impl CreateTableProcedure { column_defs .iter() .find_map(|c| { - if &c.name == key { - Some(c.column_id) - } else { - None - } + c.column_def.as_ref().and_then(|x| { + if &x.name == key { + Some(c.column_id) + } else { + None + } + }) }) .context(error::PrimaryKeyNotFoundSnafu { key }) }) diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 2df1505b8b..569c213a20 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -14,11 +14,9 @@ use std::sync::Arc; -use common_grpc_expr::alter_expr_to_request; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::{error, info}; use snafu::{OptionExt, ResultExt}; -use table::requests::AlterTableRequest; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; @@ -27,10 +25,11 @@ use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::{DdlContext, DdlExecutor, ExecutorContext, TableCreatorContext, TableCreatorRef}; use crate::error::{ - self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, UnsupportedSnafu, - WaitProcedureSnafu, + self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu, + UnsupportedSnafu, WaitProcedureSnafu, }; use crate::key::table_info::TableInfoValue; +use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::key::TableMetadataManagerRef; use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable}; @@ -127,18 +126,12 @@ impl DdlManager { &self, cluster_id: u64, alter_table_task: AlterTableTask, - alter_table_request: AlterTableRequest, table_info_value: TableInfoValue, ) -> Result { let context = self.create_context(); - let procedure = AlterTableProcedure::new( - cluster_id, - alter_table_task, - alter_table_request, - table_info_value, - context, - ); + let procedure = + AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -255,23 +248,24 @@ async fn handle_truncate_table_task( async fn handle_alter_table_task( ddl_manager: &DdlManager, cluster_id: u64, - mut alter_table_task: AlterTableTask, + alter_table_task: AlterTableTask, ) -> Result { - let table_id = alter_table_task - .alter_table - .table_id - .as_ref() - .context(error::UnexpectedSnafu { - err_msg: "expected table id ", - })? - .id; - - let mut alter_table_request = - alter_expr_to_request(table_id, alter_table_task.alter_table.clone()) - .context(error::ConvertGrpcExprSnafu)?; - let table_ref = alter_table_task.table_ref(); + let table_id = ddl_manager + .table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + table_ref.catalog, + table_ref.schema, + table_ref.table, + )) + .await? + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })? + .table_id(); + let table_info_value = ddl_manager .table_metadata_manager() .table_info_manager() @@ -281,19 +275,8 @@ async fn handle_alter_table_task( table_name: table_ref.to_string(), })?; - let table_info = &table_info_value.table_info; - - // Sets alter_table's table_version - alter_table_task.alter_table.table_version = table_info.ident.version; - alter_table_request.table_version = Some(table_info.ident.version); - let id = ddl_manager - .submit_alter_table_task( - cluster_id, - alter_table_task, - alter_table_request, - table_info_value, - ) + .submit_alter_table_task(cluster_id, alter_table_task, table_info_value) .await?; info!("Table: {table_id} is altered via procedure_id {id:?}"); diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 6eab1bd79a..d7d021c3db 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -26,12 +26,6 @@ use crate::peer::Peer; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Failed to convert grpc expr, source: {}", source))] - ConvertGrpcExpr { - location: Location, - source: common_grpc_expr::error::Error, - }, - #[snafu(display("Table info not found: {}", table_name))] TableInfoNotFound { table_name: String, @@ -146,7 +140,13 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid protobuf message, err: {}", err_msg))] + #[snafu(display("Failed to convert alter table request, source: {source}, at {location}"))] + ConvertAlterTableRequest { + source: common_grpc_expr::error::Error, + location: Location, + }, + + #[snafu(display("Invalid protobuf message: {err_msg}, at {location}"))] InvalidProtoMsg { err_msg: String, location: Location }, #[snafu(display("Unexpected: {err_msg}"))] @@ -265,7 +265,6 @@ impl ErrorExt for Error { | MoveRegion { .. } | Unexpected { .. } | External { .. } - | ConvertGrpcExpr { .. } | TableInfoNotFound { .. } | InvalidHeartbeatResponse { .. } => StatusCode::Unexpected, @@ -300,6 +299,7 @@ impl ErrorExt for Error { ExecuteDdl { source, .. } => source.status_code(), MetaSrv { source, .. } => source.status_code(), InvalidCatalogValue { source, .. } => source.status_code(), + ConvertAlterTableRequest { source, .. } => source.status_code(), } } diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 2f54e15a77..ed672a7721 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -16,3 +16,4 @@ pub const METRIC_META_TXN_REQUEST: &str = "meta.txn_request"; pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table"; pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table"; +pub(crate) const METRIC_META_PROCEDURE_ALTER_TABLE: &str = "meta.procedure.alter_table"; diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 1c65267219..5d7922ade2 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -14,8 +14,8 @@ use std::fmt::{Debug, Formatter}; -use api::greptime_proto::v1::add_column::location::LocationType; -use api::greptime_proto::v1::add_column::Location; +use api::greptime_proto::v1::add_column_location::LocationType; +use api::greptime_proto::v1::AddColumnLocation as Location; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use serde::{Deserialize, Serialize}; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index a6e36f792a..7fc1d0eec5 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -82,5 +82,6 @@ client = { workspace = true } common-query = { workspace = true } common-test-util = { workspace = true } datafusion-common.workspace = true +meta-srv = { workspace = true, features = ["mock"] } mito = { workspace = true, features = ["test"] } session = { workspace = true } diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index f6cd5dae45..2213ff8388 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -323,13 +323,13 @@ async fn new_dummy_catalog_list( #[cfg(test)] mod test { - use api::v1::add_column::location::LocationType; - use api::v1::add_column::Location; + use api::v1::add_column_location::LocationType; use api::v1::column::Values; use api::v1::{ - alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr, InsertRequest, - InsertRequests, QueryRequest, RenameTable, SemanticType, TableId, TruncateTableExpr, + alter_expr, AddColumn, AddColumnLocation as Location, AddColumns, AlterExpr, Column, + ColumnDataType, ColumnDef, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, + DropTableExpr, InsertRequest, InsertRequests, QueryRequest, RenameTable, SemanticType, + TableId, TruncateTableExpr, }; use common_catalog::consts::MITO_ENGINE; use common_error::ext::ErrorExt; @@ -378,15 +378,17 @@ mod test { column_defs: vec![ ColumnDef { name: "a".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }, ColumnDef { name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, + data_type: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ], time_index: "ts".to_string(), @@ -432,15 +434,17 @@ mod test { column_defs: vec![ ColumnDef { name: "a".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }, ColumnDef { name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, + data_type: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ], time_index: "ts".to_string(), @@ -500,15 +504,17 @@ mod test { column_defs: vec![ ColumnDef { name: "a".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, + data_type: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ], time_index: "ts".to_string(), @@ -531,22 +537,16 @@ mod test { add_columns: vec![AddColumn { column_def: Some(ColumnDef { name: "b".to_string(), - datatype: ColumnDataType::Int32 as i32, + data_type: ColumnDataType::Int32 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }), - is_key: true, location: None, }], })), - ..Default::default() })), }); - let output = instance - .do_query(query.clone(), QueryContext::arc()) - .await - .unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); assert!(matches!(output, Output::AffectedRows(0))); @@ -561,16 +561,14 @@ mod test { add_columns: vec![AddColumn { column_def: Some(ColumnDef { name: "b".to_string(), - datatype: ColumnDataType::Int32 as i32, + data_type: ColumnDataType::Int32 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }), - is_key: true, location: None, }], })), - table_version: 1, - ..Default::default() })), }); let err = instance @@ -582,7 +580,6 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_rename_table_twice() { - common_telemetry::init_default_ut_logging(); let instance = MockInstance::new("test_alter_table_twice").await; let instance = instance.inner(); @@ -605,15 +602,17 @@ mod test { column_defs: vec![ ColumnDef { name: "a".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, + data_type: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ], time_index: "ts".to_string(), @@ -636,8 +635,6 @@ mod test { kind: Some(alter_expr::Kind::RenameTable(RenameTable { new_table_name: "new_my_table".to_string(), })), - table_id: Some(TableId { id: 1025 }), - ..Default::default() })), }); let output = instance @@ -647,11 +644,8 @@ mod test { assert!(matches!(output, Output::AffectedRows(0))); // renames it again. - let output = instance - .do_query(query.clone(), QueryContext::arc()) - .await - .unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); + let result = instance.do_query(query, QueryContext::arc()).await; + assert!(matches!(result, Err(error::Error::TableNotFound { .. }))); } #[tokio::test(flavor = "multi_thread")] @@ -678,15 +672,17 @@ mod test { column_defs: vec![ ColumnDef { name: "a".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }, ColumnDef { name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, + data_type: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ], time_index: "ts".to_string(), @@ -707,21 +703,21 @@ mod test { AddColumn { column_def: Some(ColumnDef { name: "b".to_string(), - datatype: ColumnDataType::Int32 as i32, + data_type: ColumnDataType::Int32 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }), - is_key: true, location: None, }, AddColumn { column_def: Some(ColumnDef { name: "c".to_string(), - datatype: ColumnDataType::Int32 as i32, + data_type: ColumnDataType::Int32 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }), - is_key: true, location: Some(Location { location_type: LocationType::First.into(), after_column_name: "".to_string(), @@ -730,11 +726,11 @@ mod test { AddColumn { column_def: Some(ColumnDef { name: "d".to_string(), - datatype: ColumnDataType::Int32 as i32, + data_type: ColumnDataType::Int32 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }), - is_key: true, location: Some(Location { location_type: LocationType::After.into(), after_column_name: "a".to_string(), @@ -742,7 +738,6 @@ mod test { }, ], })), - ..Default::default() })), }); let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 99ab743bed..12ddecca14 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -78,25 +78,20 @@ impl Instance { expr: AlterExpr, ctx: QueryContextRef, ) -> Result { - let table_id = match expr.table_id.as_ref() { - None => { - self.catalog_manager - .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name( - &expr.catalog_name, - &expr.schema_name, - &expr.table_name, - ), - })? - .table_info() - .ident - .table_id - } - Some(table_id) => table_id.id, // For requests from Metasrv. - }; + let table_id = self + .catalog_manager + .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &expr.catalog_name, + &expr.schema_name, + &expr.table_name, + ), + })? + .table_info() + .table_id(); let request = alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?; self.sql_handler() @@ -165,7 +160,7 @@ impl Instance { #[cfg(test)] mod tests { - use api::v1::{column_def, ColumnDataType, ColumnDef, TableId}; + use api::v1::{column_def, ColumnDataType, ColumnDef, SemanticType, TableId}; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; use common_grpc_expr::create_table_schema; use datatypes::prelude::ConcreteDataType; @@ -218,9 +213,10 @@ mod tests { fn test_create_column_schema() { let column_def = ColumnDef { name: "a".to_string(), - datatype: 1024, + data_type: 1024, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }; let result = column_def::try_as_column_schema(&column_def); assert!(matches!( @@ -230,9 +226,10 @@ mod tests { let column_def = ColumnDef { name: "a".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }; let column_schema = column_def::try_as_column_schema(&column_def).unwrap(); assert_eq!(column_schema.name, "a"); @@ -242,9 +239,10 @@ mod tests { let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value")); let column_def = ColumnDef { name: "a".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: default_constraint.clone().try_into().unwrap(), + semantic_type: SemanticType::Tag as i32, }; let column_schema = column_def::try_as_column_schema(&column_def).unwrap(); assert_eq!(column_schema.name, "a"); @@ -260,27 +258,31 @@ mod tests { let column_defs = vec![ ColumnDef { name: "host".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }, ColumnDef { name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, + data_type: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ColumnDef { name: "cpu".to_string(), - datatype: ColumnDataType::Float32 as i32, + data_type: ColumnDataType::Float32 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "memory".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ]; CreateTableExpr { diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 5019a2d576..022b6291f0 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -229,15 +229,16 @@ impl CatalogManager for FrontendCatalogManager { None } }) - .collect(); + .collect::>(); - let column_defs = expr_factory::column_schemas_to_defs(request.schema.column_schemas) - .map_err(|e| { - InvalidSystemTableDefSnafu { - err_msg: e.to_string(), - } - .build() - })?; + let column_defs = + expr_factory::column_schemas_to_defs(request.schema.column_schemas, &primary_keys) + .map_err(|e| { + InvalidSystemTableDefSnafu { + err_msg: e.to_string(), + } + .build() + })?; let mut create_table = CreateTableExpr { catalog_name: request.catalog_name, diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index 164550d4f1..65bad120dc 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -18,7 +18,7 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, CreateTableExpr, DropColumn, - DropColumns, RenameTable, + DropColumns, RenameTable, SemanticType, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; @@ -106,12 +106,14 @@ pub(crate) async fn create_external_expr( serde_json::to_string(&meta).context(EncodeJsonSnafu)?, ); + let primary_keys = vec![]; + let expr = CreateTableExpr { catalog_name, schema_name, table_name, desc: "".to_string(), - column_defs: column_schemas_to_defs(schema.column_schemas)?, + column_defs: column_schemas_to_defs(schema.column_schemas, &primary_keys)?, time_index: "".to_string(), primary_keys: vec![], create_if_not_exists: create.if_not_exists, @@ -135,14 +137,17 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul &TableOptions::try_from(&to_lowercase_options_map(&create.options)) .context(UnrecognizedTableOptionSnafu)?, ); + + let primary_keys = find_primary_keys(&create.columns, &create.constraints)?; + let expr = CreateTableExpr { catalog_name, schema_name, table_name, desc: "".to_string(), - column_defs: columns_to_expr(&create.columns, &time_index)?, + column_defs: columns_to_expr(&create.columns, &time_index, &primary_keys)?, time_index, - primary_keys: find_primary_keys(&create.columns, &create.constraints)?, + primary_keys, create_if_not_exists: create.if_not_exists, table_options, table_id: None, @@ -231,16 +236,22 @@ pub fn find_time_index(constraints: &[TableConstraint]) -> Result { Ok(time_index.first().unwrap().to_string()) } -fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result> { +fn columns_to_expr( + column_defs: &[ColumnDef], + time_index: &str, + primary_keys: &[String], +) -> Result> { let column_schemas = column_defs .iter() .map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu)) .collect::>>()?; - column_schemas_to_defs(column_schemas) + + column_schemas_to_defs(column_schemas, primary_keys) } pub(crate) fn column_schemas_to_defs( column_schemas: Vec, + primary_keys: &[String], ) -> Result> { let column_datatypes = column_schemas .iter() @@ -255,9 +266,17 @@ pub(crate) fn column_schemas_to_defs( .iter() .zip(column_datatypes) .map(|(schema, datatype)| { + let semantic_type = if schema.is_time_index() { + SemanticType::Timestamp + } else if primary_keys.contains(&schema.name) { + SemanticType::Tag + } else { + SemanticType::Field + } as i32; + Ok(api::v1::ColumnDef { name: schema.name.clone(), - datatype: datatype as i32, + data_type: datatype as i32, is_nullable: schema.is_nullable(), default_constraint: match schema.default_constraint() { None => vec![], @@ -269,6 +288,7 @@ pub(crate) fn column_schemas_to_defs( })? } }, + semantic_type, }) }) .collect() @@ -300,7 +320,6 @@ pub(crate) fn to_alter_expr( .map_err(BoxedError::new) .context(ExternalSnafu)?, ), - is_key: false, location: location.as_ref().map(From::from), }], }), @@ -319,7 +338,6 @@ pub(crate) fn to_alter_expr( schema_name, table_name, kind: Some(kind), - ..Default::default() }) } diff --git a/src/frontend/src/inserter.rs b/src/frontend/src/inserter.rs index a5d06173cf..9dd8f72c6a 100644 --- a/src/frontend/src/inserter.rs +++ b/src/frontend/src/inserter.rs @@ -184,7 +184,6 @@ impl<'a> Inserter<'a> { schema_name: schema_name.to_string(), table_name: table_name.to_string(), kind: Some(Kind::AddColumns(add_columns)), - ..Default::default() }; let req = Request::Ddl(DdlRequest { diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 4a31b2e623..ef42d43e34 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -407,7 +407,7 @@ impl DistInstance { Ok(()) } - async fn handle_alter_table(&self, mut expr: AlterExpr) -> Result { + async fn handle_alter_table(&self, expr: AlterExpr) -> Result { let catalog_name = if expr.catalog_name.is_empty() { DEFAULT_CATALOG_NAME } else { @@ -432,8 +432,6 @@ impl DistInstance { })?; let table_id = table.table_info().ident.table_id; - expr.table_id = Some(api::v1::TableId { id: table_id }); - self.verify_alter(table_id, table.table_info(), expr.clone())?; let req = SubmitDdlTaskRequest { @@ -824,7 +822,7 @@ fn find_partition_entries( for column in column_defs { let column_name = &column.name; let data_type = ConcreteDataType::from( - ColumnDataTypeWrapper::try_new(column.datatype).context(ColumnDataTypeSnafu)?, + ColumnDataTypeWrapper::try_new(column.data_type).context(ColumnDataTypeSnafu)?, ); column_name_and_type.push((column_name, data_type)); } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 9a0a1fb2ea..a664f1770e 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -279,7 +279,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Unexpected, violated: {}", violated))] + #[snafu(display("Unexpected, violated: {violated}, at {location}"))] Unexpected { violated: String, location: Location, diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index bea5138549..eb89954af8 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -15,24 +15,31 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; +use api::v1::add_column_location::LocationType; +use api::v1::alter_expr::Kind; use api::v1::region::region_request::{self, Body as PbRegionRequest}; -use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest}; -use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr, SemanticType}; +use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef}; +use api::v1::{ + region, AddColumn, AddColumnLocation, AddColumns, AlterExpr, ColumnDataType, + ColumnDef as PbColumnDef, CreateTableExpr, DropColumn, DropColumns, SemanticType, +}; use client::client_manager::DatanodeClients; use common_catalog::consts::MITO2_ENGINE; +use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::ddl::alter_table::AlterTableProcedure; use common_meta::ddl::create_table::*; use common_meta::ddl::drop_table::DropTableProcedure; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; -use common_meta::rpc::ddl::{CreateTableTask, DropTableTask}; -use common_meta::rpc::router::find_leaders; +use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DropTableTask}; +use common_meta::rpc::router::{find_leaders, RegionRoute}; use common_procedure::Status; use store_api::storage::RegionId; use crate::procedure::utils::mock::EchoRegionServer; use crate::procedure::utils::test_data; -fn create_table_procedure() -> CreateTableProcedure { +fn create_table_task() -> CreateTableTask { let create_table_expr = CreateTableExpr { catalog_name: "my_catalog".to_string(), schema_name: "my_schema".to_string(), @@ -41,27 +48,31 @@ fn create_table_procedure() -> CreateTableProcedure { column_defs: vec![ PbColumnDef { name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, + data_type: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, PbColumnDef { name: "my_tag1".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }, PbColumnDef { name: "my_tag2".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }, PbColumnDef { name: "my_field_column".to_string(), - datatype: ColumnDataType::Int32 as i32, + data_type: ColumnDataType::Int32 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ], time_index: "ts".to_string(), @@ -73,17 +84,17 @@ fn create_table_procedure() -> CreateTableProcedure { engine: MITO2_ENGINE.to_string(), }; - CreateTableProcedure::new( - 1, - CreateTableTask::new(create_table_expr, vec![], test_data::new_table_info()), - test_data::new_region_routes(), - test_data::new_ddl_context(), - ) + CreateTableTask::new(create_table_expr, vec![], test_data::new_table_info()) } #[test] fn test_create_region_request_template() { - let procedure = create_table_procedure(); + let procedure = CreateTableProcedure::new( + 1, + create_table_task(), + test_data::new_region_routes(), + test_data::new_ddl_context(Arc::new(DatanodeClients::default())), + ); let template = procedure.create_region_request_template().unwrap(); @@ -91,37 +102,45 @@ fn test_create_region_request_template() { region_id: 0, engine: MITO2_ENGINE.to_string(), column_defs: vec![ - ColumnDef { - name: "ts".to_string(), + RegionColumnDef { + column_def: Some(PbColumnDef { + name: "ts".to_string(), + data_type: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, + }), column_id: 0, - datatype: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Timestamp as i32, }, - ColumnDef { - name: "my_tag1".to_string(), + RegionColumnDef { + column_def: Some(PbColumnDef { + name: "my_tag1".to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, + }), column_id: 1, - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, }, - ColumnDef { - name: "my_tag2".to_string(), + RegionColumnDef { + column_def: Some(PbColumnDef { + name: "my_tag2".to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, + }), column_id: 2, - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, }, - ColumnDef { - name: "my_field_column".to_string(), + RegionColumnDef { + column_def: Some(PbColumnDef { + name: "my_field_column".to_string(), + data_type: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + }), column_id: 3, - datatype: ColumnDataType::Int32 as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, }, ], primary_key: vec![2, 1], @@ -133,20 +152,33 @@ fn test_create_region_request_template() { assert_eq!(template, expected); } -#[tokio::test] -async fn test_on_datanode_create_regions() { - let mut procedure = create_table_procedure(); +async fn new_datanode_manager( + region_server: &EchoRegionServer, + region_routes: &[RegionRoute], +) -> DatanodeManagerRef { + let clients = DatanodeClients::default(); - let (region_server, mut rx) = EchoRegionServer::new(); - - let datanodes = find_leaders(&procedure.creator.data.region_routes); - let datanode_clients = DatanodeClients::default(); - for peer in datanodes { - let client = region_server.new_client(&peer); - datanode_clients.insert_client(peer, client).await; + let datanodes = find_leaders(region_routes); + for datanode in datanodes { + let client = region_server.new_client(&datanode); + clients.insert_client(datanode, client).await; } - procedure.context.datanode_manager = Arc::new(datanode_clients); + Arc::new(clients) +} + +#[tokio::test] +async fn test_on_datanode_create_regions() { + let (region_server, mut rx) = EchoRegionServer::new(); + let region_routes = test_data::new_region_routes(); + let datanode_manager = new_datanode_manager(®ion_server, ®ion_routes).await; + + let mut procedure = CreateTableProcedure::new( + 1, + create_table_task(), + region_routes, + test_data::new_ddl_context(datanode_manager), + ); let expected_created_regions = Arc::new(Mutex::new(HashSet::from([ RegionId::new(42, 1), @@ -190,24 +222,18 @@ async fn test_on_datanode_drop_regions() { table: "my_table".to_string(), table_id: 42, }; - let mut procedure = DropTableProcedure::new( - 1, - drop_table_task, - TableRouteValue::new(test_data::new_region_routes()), - TableInfoValue::new(test_data::new_table_info()), - test_data::new_ddl_context(), - ); let (region_server, mut rx) = EchoRegionServer::new(); + let region_routes = test_data::new_region_routes(); + let datanode_manager = new_datanode_manager(®ion_server, ®ion_routes).await; - let datanodes = find_leaders(&procedure.data.table_route_value.region_routes); - let datanode_clients = DatanodeClients::default(); - for peer in datanodes { - let client = region_server.new_client(&peer); - datanode_clients.insert_client(peer, client).await; - } - - procedure.context.datanode_manager = Arc::new(datanode_clients); + let procedure = DropTableProcedure::new( + 1, + drop_table_task, + TableRouteValue::new(region_routes), + TableInfoValue::new(test_data::new_table_info()), + test_data::new_ddl_context(datanode_manager), + ); let expected_dropped_regions = Arc::new(Mutex::new(HashSet::from([ RegionId::new(42, 1), @@ -238,3 +264,128 @@ async fn test_on_datanode_drop_regions() { assert!(expected_dropped_regions.lock().unwrap().is_empty()); } + +#[test] +fn test_create_alter_region_request() { + let alter_table_task = AlterTableTask { + alter_table: AlterExpr { + catalog_name: "my_catalog".to_string(), + schema_name: "my_schema".to_string(), + table_name: "my_table".to_string(), + kind: Some(Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some(PbColumnDef { + name: "my_tag3".to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: b"hello".to_vec(), + semantic_type: SemanticType::Tag as i32, + }), + location: Some(AddColumnLocation { + location_type: LocationType::After as i32, + after_column_name: "my_tag2".to_string(), + }), + }], + })), + }, + }; + + let procedure = AlterTableProcedure::new( + 1, + alter_table_task, + TableInfoValue::new(test_data::new_table_info()), + test_data::new_ddl_context(Arc::new(DatanodeClients::default())), + ); + + let region_id = RegionId::new(42, 1); + let alter_region_request = procedure.create_alter_region_request(region_id).unwrap(); + assert_eq!(alter_region_request.region_id, region_id.as_u64()); + assert_eq!(alter_region_request.schema_version, 1); + assert_eq!( + alter_region_request.kind, + Some(region::alter_request::Kind::AddColumns( + region::AddColumns { + add_columns: vec![region::AddColumn { + column_def: Some(RegionColumnDef { + column_def: Some(PbColumnDef { + name: "my_tag3".to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: b"hello".to_vec(), + semantic_type: SemanticType::Tag as i32, + }), + column_id: 3, + }), + location: Some(AddColumnLocation { + location_type: LocationType::After as i32, + after_column_name: "my_tag2".to_string(), + }), + }] + } + )) + ); +} + +#[tokio::test] +async fn test_submit_alter_region_requests() { + let alter_table_task = AlterTableTask { + alter_table: AlterExpr { + catalog_name: "my_catalog".to_string(), + schema_name: "my_schema".to_string(), + table_name: "my_table".to_string(), + kind: Some(Kind::DropColumns(DropColumns { + drop_columns: vec![DropColumn { + name: "my_field_column".to_string(), + }], + })), + }, + }; + + let (region_server, mut rx) = EchoRegionServer::new(); + let region_routes = test_data::new_region_routes(); + let datanode_manager = new_datanode_manager(®ion_server, ®ion_routes).await; + + let context = test_data::new_ddl_context(datanode_manager); + let table_info = test_data::new_table_info(); + context + .table_metadata_manager + .create_table_metadata(table_info.clone(), region_routes.clone()) + .await + .unwrap(); + + let procedure = AlterTableProcedure::new( + 1, + alter_table_task, + TableInfoValue::new(table_info), + context, + ); + + let expected_altered_regions = Arc::new(Mutex::new(HashSet::from([ + RegionId::new(42, 1), + RegionId::new(42, 2), + RegionId::new(42, 3), + ]))); + let handle = tokio::spawn({ + let expected_altered_regions = expected_altered_regions.clone(); + let mut max_recv = expected_altered_regions.lock().unwrap().len(); + async move { + while let Some(region_request::Body::Alter(request)) = rx.recv().await { + let region_id = RegionId::from_u64(request.region_id); + + expected_altered_regions.lock().unwrap().remove(®ion_id); + + max_recv -= 1; + if max_recv == 0 { + break; + } + } + } + }); + + let status = procedure.submit_alter_region_requests().await.unwrap(); + assert!(matches!(status, Status::Done)); + + handle.await.unwrap(); + + assert!(expected_altered_regions.lock().unwrap().is_empty()); +} diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 90608516c2..3297225218 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -104,8 +104,8 @@ pub mod test_data { use std::sync::Arc; use chrono::DateTime; - use client::client_manager::DatanodeClients; use common_catalog::consts::MITO2_ENGINE; + use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::DdlContext; use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; @@ -138,7 +138,10 @@ pub mod test_data { pub fn new_table_info() -> RawTableInfo { RawTableInfo { - ident: TableIdent::new(42), + ident: TableIdent { + table_id: 42, + version: 1, + }, name: "my_table".to_string(), desc: Some("blabla".to_string()), catalog_name: "my_catalog".to_string(), @@ -184,16 +187,15 @@ pub mod test_data { } } - pub(crate) fn new_ddl_context() -> DdlContext { + pub(crate) fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext { let kv_store = Arc::new(MemStore::new()); let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone()); let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); let kv_backend = KvBackendAdapter::wrap(kv_store); - let clients = Arc::new(DatanodeClients::default()); DdlContext { - datanode_manager: clients, + datanode_manager, cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( mailbox, MetasrvInfo { diff --git a/src/meta-srv/src/service/ddl.rs b/src/meta-srv/src/service/ddl.rs index fd5afbe979..9f82f8ec4d 100644 --- a/src/meta-srv/src/service/ddl.rs +++ b/src/meta-srv/src/service/ddl.rs @@ -22,7 +22,7 @@ use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; use super::GrpcResult; -use crate::error::{self}; +use crate::error; use crate::metasrv::MetaSrv; #[async_trait::async_trait] diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index ef67c6e81e..c251c26dfd 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -29,8 +29,8 @@ pub mod truncate; use std::str::FromStr; use api::helper::ColumnDataTypeWrapper; -use api::v1::add_column::location::LocationType; -use api::v1::add_column::Location; +use api::v1::add_column_location::LocationType; +use api::v1::{AddColumnLocation as Location, SemanticType}; use common_base::bytes::Bytes; use common_query::AddColumnLocation; use common_time::Timestamp; @@ -338,9 +338,11 @@ pub fn sql_column_def_to_grpc_column_def(col: &ColumnDef) -> Result Resu pub fn sql_location_to_grpc_add_column_location( location: &Option, -) -> Option { +) -> Option { match location { Some(AddColumnLocation::First) => Some(Location { location_type: LocationType::First.into(), @@ -741,7 +743,7 @@ mod tests { assert_eq!("col", grpc_column_def.name); assert!(grpc_column_def.is_nullable); // nullable when options are empty - assert_eq!(ColumnDataType::Float64 as i32, grpc_column_def.datatype); + assert_eq!(ColumnDataType::Float64 as i32, grpc_column_def.data_type); assert!(grpc_column_def.default_constraint.is_empty()); // test not null diff --git a/src/store-api/src/error.rs b/src/store-api/src/error.rs new file mode 100644 index 0000000000..d285e288cc --- /dev/null +++ b/src/store-api/src/error.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use snafu::{Location, Snafu}; + +use crate::storage::ColumnDescriptorBuilderError; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Invalid raw region request: {err}, at {location}"))] + InvalidRawRegionRequest { err: String, location: Location }, + + #[snafu(display("Invalid default constraint: {constraint}, source: {source}, at {location}"))] + InvalidDefaultConstraint { + constraint: String, + source: datatypes::error::Error, + location: Location, + }, + + #[snafu(display("Failed to build column descriptor: {source}, at {location}"))] + BuildColumnDescriptor { + source: ColumnDescriptorBuilderError, + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidRawRegionRequest { .. } => StatusCode::InvalidArguments, + Error::InvalidDefaultConstraint { source, .. } => source.status_code(), + Error::BuildColumnDescriptor { .. } => StatusCode::Internal, + } + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index d703794598..46b165ba2a 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -16,6 +16,7 @@ //! Storage related APIs pub mod data_source; +mod error; pub mod logstore; pub mod manifest; pub mod metadata; diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 863d3411c3..716dc647c3 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -21,7 +21,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::region::ColumnDef; +use api::v1::region::RegionColumnDef; use api::v1::SemanticType; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -49,10 +49,17 @@ pub struct ColumnMetadata { impl ColumnMetadata { /// Construct `Self` from protobuf struct [ColumnDef] - pub fn try_from_column_def(column_def: ColumnDef) -> Result { - let semantic_type = column_def.semantic_type(); + pub fn try_from_column_def(column_def: RegionColumnDef) -> Result { let column_id = column_def.column_id; + let column_def = column_def + .column_def + .context(InvalidRawRegionRequestSnafu { + err: "column_def is absent", + })?; + + let semantic_type = column_def.semantic_type(); + let default_constrain = if column_def.default_constraint.is_empty() { None } else { @@ -61,7 +68,7 @@ impl ColumnMetadata { .context(ConvertDatatypesSnafu)?, ) }; - let data_type = ColumnDataTypeWrapper::new(column_def.datatype()).into(); + let data_type = ColumnDataTypeWrapper::new(column_def.data_type()).into(); let column_schema = ColumnSchema::new(column_def.name, data_type, column_def.is_nullable) .with_default_constraint(default_constrain) .context(ConvertDatatypesSnafu)?; @@ -498,6 +505,9 @@ pub enum MetadataError { location: Location, source: datatypes::error::Error, }, + + #[snafu(display("Invalid raw region request: {err}, at {location}"))] + InvalidRawRegionRequest { err: String, location: Location }, } impl ErrorExt for MetadataError { diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 197c1e3a3f..9d34727ef6 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -16,8 +16,9 @@ use std::collections::HashMap; use api::v1::region::region_request; use api::v1::Rows; +use snafu::OptionExt; -use crate::metadata::{ColumnMetadata, MetadataError}; +use crate::metadata::{ColumnMetadata, InvalidRawRegionRequestSnafu, MetadataError}; use crate::path_utils::region_dir; use crate::storage::{AlterRequest, ColumnId, RegionId, ScanRequest}; @@ -102,12 +103,25 @@ impl RegionRequest { close.region_id.into(), Self::Close(RegionCloseRequest {}), )]), - region_request::Body::Alter(alter) => Ok(vec![( - alter.region_id.into(), - Self::Alter(RegionAlterRequest { - request: unimplemented!(), - }), - )]), + region_request::Body::Alter(alter) => { + let kind = alter.kind.context(InvalidRawRegionRequestSnafu { + err: "'kind' is absent", + })?; + Ok(vec![( + alter.region_id.into(), + Self::Alter(RegionAlterRequest { + request: AlterRequest { + operation: kind.try_into().map_err(|e| { + InvalidRawRegionRequestSnafu { + err: format!("{e}"), + } + .build() + })?, + version: alter.schema_version as _, + }, + }), + )]) + } region_request::Body::Flush(flush) => Ok(vec![( flush.region_id.into(), Self::Flush(RegionFlushRequest {}), diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index dbb495a576..5bd58eadc0 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -14,12 +14,19 @@ use std::collections::{HashMap, HashSet}; +use api::helper::ColumnDataTypeWrapper; +use api::v1::region::{alter_request, AddColumn as PbAddColumn}; +use api::v1::SemanticType; use common_error::ext::ErrorExt; use common_query::logical_plan::Expr; use common_recordbatch::OrderOption; use datatypes::vectors::VectorRef; +use snafu::{OptionExt, ResultExt}; -use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber}; +use crate::error::{ + BuildColumnDescriptorSnafu, Error, InvalidDefaultConstraintSnafu, InvalidRawRegionRequestSnafu, +}; +use crate::storage::{ColumnDescriptor, ColumnDescriptorBuilder, RegionDescriptor, SequenceNumber}; /// Write request holds a collection of updates to apply to a region. /// @@ -129,6 +136,83 @@ impl AlterOperation { } } +impl TryFrom for AddColumn { + type Error = Error; + + fn try_from(add_column: PbAddColumn) -> Result { + let column_def = add_column + .column_def + .context(InvalidRawRegionRequestSnafu { + err: "'column_def' is absent", + })?; + let column_id = column_def.column_id; + + let column_def = column_def + .column_def + .context(InvalidRawRegionRequestSnafu { + err: "'column_def' is absent", + })?; + + let data_type = column_def.data_type; + let data_type = ColumnDataTypeWrapper::try_new(data_type) + .map_err(|_| { + InvalidRawRegionRequestSnafu { + err: format!("unknown raw column datatype: {data_type}"), + } + .build() + })? + .into(); + + let constraint = column_def.default_constraint.as_slice(); + let constraint = if constraint.is_empty() { + None + } else { + Some( + constraint + .try_into() + .context(InvalidDefaultConstraintSnafu { + constraint: String::from_utf8_lossy(constraint), + })?, + ) + }; + + let desc = ColumnDescriptorBuilder::new(column_id, column_def.name.clone(), data_type) + .is_nullable(column_def.is_nullable) + .is_time_index(column_def.semantic_type() == SemanticType::Timestamp) + .default_constraint(constraint) + .build() + .context(BuildColumnDescriptorSnafu)?; + + Ok(AddColumn { + desc, + is_key: column_def.semantic_type() == SemanticType::Tag, + // TODO(ruihang & yingwen): support alter column's "location" + }) + } +} + +impl TryFrom for AlterOperation { + type Error = Error; + + fn try_from(kind: alter_request::Kind) -> Result { + let operation = match kind { + alter_request::Kind::AddColumns(x) => { + let columns = x + .add_columns + .into_iter() + .map(|x| x.try_into()) + .collect::, Self::Error>>()?; + AlterOperation::AddColumns { columns } + } + alter_request::Kind::DropColumns(x) => { + let names = x.drop_columns.into_iter().map(|x| x.name).collect(); + AlterOperation::DropColumns { names } + } + }; + Ok(operation) + } +} + /// Alter region request. #[derive(Debug)] pub struct AlterRequest { @@ -140,7 +224,12 @@ pub struct AlterRequest { #[cfg(test)] mod tests { + use api::v1::region::{ + AddColumn as PbAddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef, + }; + use api::v1::{ColumnDataType, ColumnDef}; use datatypes::prelude::*; + use datatypes::schema::ColumnDefaultConstraint; use super::*; use crate::storage::{ @@ -214,4 +303,82 @@ mod tests { assert_eq!(1, desc.row_key.columns.len()); assert_eq!(1, desc.default_cf.columns.len()); } + + #[test] + fn test_try_from_raw_alter_kind() { + let kind = alter_request::Kind::AddColumns(AddColumns { + add_columns: vec![ + PbAddColumn { + column_def: Some(RegionColumnDef { + column_def: Some(ColumnDef { + name: "my_tag".to_string(), + data_type: ColumnDataType::Int32 as _, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Tag as _, + }), + column_id: 1, + }), + location: None, + }, + PbAddColumn { + column_def: Some(RegionColumnDef { + column_def: Some(ColumnDef { + name: "my_field".to_string(), + data_type: ColumnDataType::String as _, + is_nullable: true, + default_constraint: ColumnDefaultConstraint::Value("hello".into()) + .try_into() + .unwrap(), + semantic_type: SemanticType::Field as _, + }), + column_id: 2, + }), + location: None, + }, + ], + }); + + let AlterOperation::AddColumns { columns } = AlterOperation::try_from(kind).unwrap() else { + unreachable!() + }; + assert_eq!(2, columns.len()); + + let desc = &columns[0].desc; + assert_eq!(desc.id, 1); + assert_eq!(&desc.name, "my_tag"); + assert_eq!(desc.data_type, ConcreteDataType::int32_datatype()); + assert!(!desc.is_nullable()); + assert!(!desc.is_time_index()); + assert_eq!(desc.default_constraint(), None); + assert!(columns[0].is_key); + + let desc = &columns[1].desc; + assert_eq!(desc.id, 2); + assert_eq!(&desc.name, "my_field"); + assert_eq!(desc.data_type, ConcreteDataType::string_datatype()); + assert!(desc.is_nullable()); + assert!(!desc.is_time_index()); + assert_eq!( + desc.default_constraint(), + Some(&ColumnDefaultConstraint::Value("hello".into())) + ); + assert!(!columns[1].is_key); + + let kind = alter_request::Kind::DropColumns(DropColumns { + drop_columns: vec![ + DropColumn { + name: "c1".to_string(), + }, + DropColumn { + name: "c2".to_string(), + }, + ], + }); + + let AlterOperation::DropColumns { names } = AlterOperation::try_from(kind).unwrap() else { + unreachable!() + }; + assert_eq!(names, vec!["c1", "c2"]); + } } diff --git a/src/table/src/error.rs b/src/table/src/error.rs index d5e01f8568..8028d53c60 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -58,7 +58,7 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("Column {} already exists in table {}", column_name, table_name))] + #[snafu(display("Column {column_name} already exists in table {table_name}, at {location}"))] ColumnExists { column_name: String, table_name: String,