From 5467ea496f4d40cb3271226cefe3b778feb4653f Mon Sep 17 00:00:00 2001 From: Zheming Li Date: Thu, 1 Jun 2023 10:13:00 +0800 Subject: [PATCH] feat: Add column supports at first or after the existing columns (#1621) * feat: Add column supports at first or after the existing columns * Update src/common/query/Cargo.toml --------- Co-authored-by: dennis zhuang --- Cargo.lock | 3 + src/api/src/lib.rs | 1 + src/common/grpc-expr/src/alter.rs | 101 +++++++++- src/common/grpc-expr/src/error.rs | 12 +- src/common/query/Cargo.toml | 2 + src/common/query/src/lib.rs | 24 +++ src/datanode/src/instance/grpc.rs | 62 ++++-- src/datanode/src/sql/alter.rs | 6 +- src/frontend/src/expr_factory.rs | 7 +- src/mito/src/engine/procedure/alter.rs | 41 ++++ src/mito/src/engine/tests.rs | 52 +++++ src/sql/Cargo.toml | 1 + src/sql/src/parsers/alter_parser.rs | 101 +++++++++- src/sql/src/statements.rs | 19 ++ src/sql/src/statements/alter.rs | 8 +- src/table/src/metadata.rs | 180 ++++++++++++++++-- src/table/src/requests.rs | 2 + .../alter/alter_table_first_after.result | 174 +++++++++++++++++ .../common/alter/alter_table_first_after.sql | 47 +++++ 19 files changed, 797 insertions(+), 46 deletions(-) create mode 100644 tests/cases/standalone/common/alter/alter_table_first_after.result create mode 100644 tests/cases/standalone/common/alter/alter_table_first_after.sql diff --git a/Cargo.lock b/Cargo.lock index 3ffce3fa2c..1f3564a2a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1844,6 +1844,7 @@ dependencies = [ name = "common-query" version = "0.2.0" dependencies = [ + "api", "async-trait", "common-base", "common-error", @@ -1853,6 +1854,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datatypes", + "serde", "snafu", "statrs", "tokio", @@ -8739,6 +8741,7 @@ dependencies = [ "common-catalog", "common-datasource", "common-error", + "common-query", "common-time", "datafusion-sql", "datatypes", diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index f642aea3f6..406f96df77 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -23,4 +23,5 @@ pub mod prometheus { pub mod v1; +pub use greptime_proto; pub use prost::DecodeError; diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index fe31c3f3a0..1d9b7ca905 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::add_column::location::LocationType; +use api::v1::add_column::Location; use api::v1::alter_expr::Kind; use api::v1::{column_def, AlterExpr, CreateTableExpr, DropColumns, RenameTable}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_query::AddColumnLocation; use datatypes::schema::{ColumnSchema, RawSchema}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; @@ -24,9 +27,12 @@ use table::requests::{ use crate::error::{ ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, - Result, UnrecognizedTableOptionSnafu, + Result, UnknownLocationTypeSnafu, UnrecognizedTableOptionSnafu, }; +const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32; +const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32; + /// Convert an [`AlterExpr`] to an [`AlterTableRequest`] pub fn alter_expr_to_request(expr: AlterExpr) -> Result { let catalog_name = expr.catalog_name; @@ -50,6 +56,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result { Ok(AddColumnRequest { column_schema: schema, is_key: ac.is_key, + location: parse_location(ac.location)?, }) }) .collect::>>()?; @@ -186,8 +193,26 @@ pub fn create_expr_to_request( }) } +fn parse_location(location: Option) -> Result> { + match location { + Some(Location { + location_type: LOCATION_TYPE_FIRST, + .. + }) => Ok(Some(AddColumnLocation::First)), + Some(Location { + location_type: LOCATION_TYPE_AFTER, + after_cloumn_name, + }) => Ok(Some(AddColumnLocation::After { + column_name: after_cloumn_name, + })), + Some(Location { location_type, .. }) => UnknownLocationTypeSnafu { location_type }.fail(), + None => Ok(None), + } +} + #[cfg(test)] mod tests { + use api::v1::add_column::location::LocationType; use api::v1::{AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn}; use datatypes::prelude::ConcreteDataType; @@ -229,6 +254,80 @@ mod tests { ConcreteDataType::float64_datatype(), add_column.column_schema.data_type ); + assert_eq!(None, add_column.location); + } + + #[test] + fn test_alter_expr_with_location_to_request() { + let expr = AlterExpr { + catalog_name: "".to_string(), + schema_name: "".to_string(), + table_name: "monitor".to_string(), + + kind: Some(Kind::AddColumns(AddColumns { + add_columns: vec![ + AddColumn { + column_def: Some(ColumnDef { + name: "mem_usage".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: false, + default_constraint: vec![], + }), + is_key: false, + location: Some(Location { + location_type: LocationType::First.into(), + after_cloumn_name: "".to_string(), + }), + }, + AddColumn { + column_def: Some(ColumnDef { + name: "cpu_usage".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: false, + default_constraint: vec![], + }), + is_key: false, + location: Some(Location { + location_type: LocationType::After.into(), + after_cloumn_name: "ts".to_string(), + }), + }, + ], + })), + }; + + let alter_request = alter_expr_to_request(expr).unwrap(); + assert_eq!(alter_request.catalog_name, ""); + assert_eq!(alter_request.schema_name, ""); + assert_eq!("monitor".to_string(), alter_request.table_name); + + let mut add_columns = match alter_request.alter_kind { + AlterKind::AddColumns { columns } => columns, + _ => unreachable!(), + }; + + let add_column = add_columns.pop().unwrap(); + assert!(!add_column.is_key); + assert_eq!("cpu_usage", add_column.column_schema.name); + assert_eq!( + ConcreteDataType::float64_datatype(), + add_column.column_schema.data_type + ); + assert_eq!( + Some(AddColumnLocation::After { + column_name: "ts".to_string() + }), + add_column.location + ); + + let add_column = add_columns.pop().unwrap(); + assert!(!add_column.is_key); + assert_eq!("mem_usage", add_column.column_schema.name); + assert_eq!( + ConcreteDataType::float64_datatype(), + add_column.column_schema.data_type + ); + assert_eq!(Some(AddColumnLocation::First), add_column.location); } #[test] diff --git a/src/common/grpc-expr/src/error.rs b/src/common/grpc-expr/src/error.rs index 39f7b30ec0..42417c6adc 100644 --- a/src/common/grpc-expr/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -83,6 +83,12 @@ pub enum Error { #[snafu(display("The column name already exists, column: {}", column))] ColumnAlreadyExists { column: String, location: Location }, + + #[snafu(display("Unknown location type: {}", location_type))] + UnknownLocationType { + location_type: i32, + location: Location, + }, } pub type Result = std::result::Result; @@ -103,9 +109,9 @@ impl ErrorExt for Error { Error::MissingField { .. } => StatusCode::InvalidArguments, Error::InvalidColumnDef { source, .. } => source.status_code(), Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, - Error::UnexpectedValuesLength { .. } | Error::ColumnAlreadyExists { .. } => { - StatusCode::InvalidArguments - } + Error::UnexpectedValuesLength { .. } + | Error::ColumnAlreadyExists { .. } + | Error::UnknownLocationType { .. } => StatusCode::InvalidArguments, } } diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml index 0fefe81b45..1db8665260 100644 --- a/src/common/query/Cargo.toml +++ b/src/common/query/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +api = { path = "../../api" } async-trait.workspace = true common-error = { path = "../error" } common-recordbatch = { path = "../recordbatch" } @@ -13,6 +14,7 @@ datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datatypes = { path = "../../datatypes" } +serde.workspace = true snafu.workspace = true statrs = "0.16" diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 61e90a1aba..8050be67a9 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -14,7 +14,10 @@ use std::fmt::{Debug, Formatter}; +use api::greptime_proto::v1::add_column::location::LocationType; +use api::greptime_proto::v1::add_column::Location; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; +use serde::{Deserialize, Serialize}; pub mod columnar_value; pub mod error; @@ -44,3 +47,24 @@ impl Debug for Output { } pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum AddColumnLocation { + First, + After { column_name: String }, +} + +impl From<&AddColumnLocation> for Location { + fn from(value: &AddColumnLocation) -> Self { + match value { + AddColumnLocation::First => Location { + location_type: LocationType::First.into(), + after_cloumn_name: "".to_string(), + }, + AddColumnLocation::After { column_name } => Location { + location_type: LocationType::After.into(), + after_cloumn_name: column_name.to_string(), + }, + } + } +} diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index b38e9257e7..8f4a9fe7f0 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -291,6 +291,8 @@ async fn new_dummy_catalog_list( #[cfg(test)] mod test { + use api::v1::add_column::location::LocationType; + use api::v1::add_column::Location; use api::v1::column::{SemanticType, Values}; use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, @@ -364,16 +366,44 @@ mod test { schema_name: "my_database".to_string(), table_name: "my_table".to_string(), kind: Some(alter_expr::Kind::AddColumns(AddColumns { - add_columns: vec![AddColumn { - column_def: Some(ColumnDef { - name: "b".to_string(), - datatype: ColumnDataType::Int32 as i32, - is_nullable: true, - default_constraint: vec![], - }), - is_key: true, - location: None, - }], + add_columns: vec![ + AddColumn { + column_def: Some(ColumnDef { + name: "b".to_string(), + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + }), + is_key: true, + location: None, + }, + AddColumn { + column_def: Some(ColumnDef { + name: "c".to_string(), + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + }), + is_key: true, + location: Some(Location { + location_type: LocationType::First.into(), + after_cloumn_name: "".to_string(), + }), + }, + AddColumn { + column_def: Some(ColumnDef { + name: "d".to_string(), + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + }), + is_key: true, + location: Some(Location { + location_type: LocationType::After.into(), + after_cloumn_name: "a".to_string(), + }), + }, + ], })), })), }); @@ -389,15 +419,15 @@ mod test { .unwrap(); assert!(matches!(output, Output::AffectedRows(1))); - let output = exec_selection(instance, "SELECT ts, a, b FROM my_database.my_table").await; + let output = exec_selection(instance, "SELECT * FROM my_database.my_table").await; let Output::Stream(stream) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2022-12-30T07:09:00 | s | 1 | -+---------------------+---+---+"; ++---+---+---+---------------------+---+ +| c | a | d | ts | b | ++---+---+---+---------------------+---+ +| | s | | 2022-12-30T07:09:00 | 1 | ++---+---+---+---------------------+---+"; assert_eq!(recordbatches.pretty_print().unwrap(), expected); } diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index fe1c6974ef..32537af9ea 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -68,12 +68,16 @@ impl SqlHandler { } .fail() } - AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumns { + AlterTableOperation::AddColumn { + column_def, + location, + } => AlterKind::AddColumns { columns: vec![AddColumnRequest { column_schema: column_def_to_schema(column_def, false) .context(error::ParseSqlSnafu)?, // FIXME(dennis): supports adding key column is_key: false, + location: location.clone(), }], }, AlterTableOperation::DropColumn { name } => AlterKind::DropColumns { diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index 65bf64fd01..fbc8d6926b 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -291,7 +291,10 @@ pub(crate) fn to_alter_expr( } .fail(); } - AlterTableOperation::AddColumn { column_def } => Kind::AddColumns(AddColumns { + AlterTableOperation::AddColumn { + column_def, + location, + } => Kind::AddColumns(AddColumns { add_columns: vec![AddColumn { column_def: Some( sql_column_def_to_grpc_column_def(column_def) @@ -299,7 +302,7 @@ pub(crate) fn to_alter_expr( .context(ExternalSnafu)?, ), is_key: false, - location: None, + location: location.as_ref().map(From::from), }], }), AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns { diff --git a/src/mito/src/engine/procedure/alter.rs b/src/mito/src/engine/procedure/alter.rs index a22a1c17d5..6f53ee5585 100644 --- a/src/mito/src/engine/procedure/alter.rs +++ b/src/mito/src/engine/procedure/alter.rs @@ -321,6 +321,7 @@ mod tests { use super::*; use crate::engine::procedure::procedure_test_util::{self, TestEnv}; + use crate::engine::tests::new_add_columns_req_with_location; use crate::table::test_util; fn new_add_columns_req() -> AlterTableRequest { @@ -331,10 +332,12 @@ mod tests { AddColumnRequest { column_schema: new_tag, is_key: true, + location: None, }, AddColumnRequest { column_schema: new_field, is_key: false, + location: None, }, ], }; @@ -394,6 +397,44 @@ mod tests { assert!(new_schema.column_schema_by_name("my_field").is_some()); assert_eq!(new_schema.version(), schema.version() + 1); assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 2); + + // Alter the table. + let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true); + let new_field = ColumnSchema::new( + "my_field_after_ts", + ConcreteDataType::string_datatype(), + true, + ); + let request = new_add_columns_req_with_location(&new_tag, &new_field); + let mut procedure = table_engine + .alter_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // Validate. + let table = table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .unwrap(); + let new_info = table.table_info(); + let new_meta = &new_info.meta; + let new_schema = &new_meta.schema; + + assert_eq!(&[0, 1, 6], &new_meta.primary_key_indices[..]); + assert_eq!(&[2, 3, 4, 5, 7], &new_meta.value_indices[..]); + assert!(new_schema.column_schema_by_name("my_tag_first").is_some()); + assert!(new_schema + .column_schema_by_name("my_field_after_ts") + .is_some()); + assert_eq!(new_schema.version(), schema.version() + 2); + assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 4); + assert_eq!(new_schema.column_index_by_name("my_tag_first").unwrap(), 0); + assert_eq!( + new_schema + .column_index_by_name("my_field_after_ts") + .unwrap(), + new_schema.column_index_by_name("ts").unwrap() + 1 + ); } #[tokio::test] diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 117c560c15..e7f0178279 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -547,10 +547,39 @@ fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> Alte AddColumnRequest { column_schema: new_tag.clone(), is_key: true, + location: None, }, AddColumnRequest { column_schema: new_field.clone(), is_key: false, + location: None, + }, + ], + }, + } +} + +pub(crate) fn new_add_columns_req_with_location( + new_tag: &ColumnSchema, + new_field: &ColumnSchema, +) -> AlterTableRequest { + AlterTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TABLE_NAME.to_string(), + alter_kind: AlterKind::AddColumns { + columns: vec![ + AddColumnRequest { + column_schema: new_tag.clone(), + is_key: true, + location: Some(common_query::AddColumnLocation::First), + }, + AddColumnRequest { + column_schema: new_field.clone(), + is_key: false, + location: Some(common_query::AddColumnLocation::After { + column_name: "ts".to_string(), + }), }, ], }, @@ -597,6 +626,29 @@ async fn test_alter_table_add_column() { assert_eq!(new_schema.version(), old_schema.version() + 1); assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 2); assert_eq!(new_meta.region_numbers, old_meta.region_numbers); + + let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true); + let new_field = ColumnSchema::new( + "my_field_after_ts", + ConcreteDataType::string_datatype(), + true, + ); + let req = new_add_columns_req_with_location(&new_tag, &new_field); + let table = table_engine + .alter_table(&EngineContext::default(), req) + .await + .unwrap(); + + let new_info = table.table_info(); + let new_meta = &new_info.meta; + let new_schema = &new_meta.schema; + + assert_eq!(&[0, 1, 6], &new_meta.primary_key_indices[..]); + assert_eq!(&[2, 3, 4, 5, 7], &new_meta.value_indices[..]); + assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column()); + assert_eq!(new_schema.version(), old_schema.version() + 2); + assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 4); + assert_eq!(new_meta.region_numbers, old_meta.region_numbers); } #[tokio::test] diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 669471d558..90528fd262 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -10,6 +10,7 @@ common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-datasource = { path = "../common/datasource" } common-error = { path = "../common/error" } +common-query = { path = "../common/query" } common-time = { path = "../common/time" } datafusion-sql.workspace = true datatypes = { path = "../datatypes" } diff --git a/src/sql/src/parsers/alter_parser.rs b/src/sql/src/parsers/alter_parser.rs index d71e9772c8..2448f383f1 100644 --- a/src/sql/src/parsers/alter_parser.rs +++ b/src/sql/src/parsers/alter_parser.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_query::AddColumnLocation; use snafu::ResultExt; use sqlparser::keywords::Keyword; use sqlparser::parser::ParserError; +use sqlparser::tokenizer::Token; use crate::error::{self, Result}; use crate::parser::ParserContext; @@ -41,7 +43,25 @@ impl<'a> ParserContext<'a> { } else { let _ = parser.parse_keyword(Keyword::COLUMN); let column_def = parser.parse_column_def()?; - AlterTableOperation::AddColumn { column_def } + let location = if parser.parse_keyword(Keyword::FIRST) { + Some(AddColumnLocation::First) + } else if let Token::Word(word) = parser.peek_token().token { + if word.value.to_ascii_uppercase() == "AFTER" { + parser.next_token(); + let name = parser.parse_identifier()?; + Some(AddColumnLocation::After { + column_name: name.value, + }) + } else { + None + } + } else { + None + }; + AlterTableOperation::AddColumn { + column_def, + location, + } } } else if parser.parse_keyword(Keyword::DROP) { if parser.parse_keyword(Keyword::COLUMN) { @@ -98,13 +118,90 @@ mod tests { let alter_operation = alter_table.alter_operation(); assert_matches!(alter_operation, AlterTableOperation::AddColumn { .. }); match alter_operation { - AlterTableOperation::AddColumn { column_def } => { + AlterTableOperation::AddColumn { + column_def, + location, + } => { assert_eq!("tagk_i", column_def.name.value); assert_eq!(DataType::String, column_def.data_type); assert!(column_def .options .iter() .any(|o| matches!(o.option, ColumnOption::Null))); + assert_eq!(&None, location); + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } + } + + #[test] + fn test_parse_alter_add_column_with_first() { + let sql = "ALTER TABLE my_metric_1 ADD tagk_i STRING Null FIRST;"; + let mut result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + assert_matches!(statement, Statement::Alter { .. }); + match statement { + Statement::Alter(alter_table) => { + assert_eq!("my_metric_1", alter_table.table_name().0[0].value); + + let alter_operation = alter_table.alter_operation(); + assert_matches!(alter_operation, AlterTableOperation::AddColumn { .. }); + match alter_operation { + AlterTableOperation::AddColumn { + column_def, + location, + } => { + assert_eq!("tagk_i", column_def.name.value); + assert_eq!(DataType::String, column_def.data_type); + assert!(column_def + .options + .iter() + .any(|o| matches!(o.option, ColumnOption::Null))); + assert_eq!(&Some(AddColumnLocation::First), location); + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } + } + + #[test] + fn test_parse_alter_add_column_with_after() { + let sql = "ALTER TABLE my_metric_1 ADD tagk_i STRING Null AFTER ts;"; + let mut result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + assert_matches!(statement, Statement::Alter { .. }); + match statement { + Statement::Alter(alter_table) => { + assert_eq!("my_metric_1", alter_table.table_name().0[0].value); + + let alter_operation = alter_table.alter_operation(); + assert_matches!(alter_operation, AlterTableOperation::AddColumn { .. }); + match alter_operation { + AlterTableOperation::AddColumn { + column_def, + location, + } => { + assert_eq!("tagk_i", column_def.name.value); + assert_eq!(DataType::String, column_def.data_type); + assert!(column_def + .options + .iter() + .any(|o| matches!(o.option, ColumnOption::Null))); + assert_eq!( + &Some(AddColumnLocation::After { + column_name: "ts".to_string() + }), + location + ); } _ => unreachable!(), } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 9e23559ea0..8ac02ce16c 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -28,7 +28,10 @@ pub mod tql; use std::str::FromStr; use api::helper::ColumnDataTypeWrapper; +use api::v1::add_column::location::LocationType; +use api::v1::add_column::Location; use common_base::bytes::Bytes; +use common_query::AddColumnLocation; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, COMMENT_KEY}; @@ -397,6 +400,22 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu } } +pub fn sql_location_to_grpc_add_column_location( + location: &Option, +) -> Option { + match location { + Some(AddColumnLocation::First) => Some(Location { + location_type: LocationType::First.into(), + after_cloumn_name: "".to_string(), + }), + Some(AddColumnLocation::After { column_name }) => Some(Location { + location_type: LocationType::After.into(), + after_cloumn_name: column_name.to_string(), + }), + None => None, + } +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; diff --git a/src/sql/src/statements/alter.rs b/src/sql/src/statements/alter.rs index 73723b9101..132f9368c1 100644 --- a/src/sql/src/statements/alter.rs +++ b/src/sql/src/statements/alter.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_query::AddColumnLocation; use sqlparser::ast::{ColumnDef, Ident, ObjectName, TableConstraint}; #[derive(Debug, Clone, PartialEq, Eq)] @@ -41,8 +42,11 @@ impl AlterTable { pub enum AlterTableOperation { /// `ADD ` AddConstraint(TableConstraint), - /// `ADD [ COLUMN ] ` - AddColumn { column_def: ColumnDef }, + /// `ADD [ COLUMN ] [location]` + AddColumn { + column_def: ColumnDef, + location: Option, + }, /// `DROP COLUMN ` DropColumn { name: Ident }, /// `RENAME ` diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 5958c477df..8e5b183f2a 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_query::AddColumnLocation; use datafusion_expr::TableProviderFilterPushDown; pub use datatypes::error::{Error as ConvertError, Result as ConvertResult}; use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; @@ -140,6 +141,18 @@ impl TableMetaBuilder { } } +/// The result after splitting requests by column location info. +struct SplitResult<'a> { + /// column requests should be added at first place. + columns_at_first: Vec<&'a AddColumnRequest>, + /// column requests should be added after already exist columns. + columns_at_after: HashMap>, + /// column requests should be added at last place. + columns_at_last: Vec<&'a AddColumnRequest>, + /// all column names should be added. + column_names: Vec, +} + impl TableMeta { pub fn row_key_column_names(&self) -> impl Iterator { let columns_schemas = &self.schema.column_schemas(); @@ -231,33 +244,49 @@ impl TableMeta { ) -> Result { let table_schema = &self.schema; let mut meta_builder = self.new_meta_builder(); + let original_primary_key_indices: HashSet<&usize> = + self.primary_key_indices.iter().collect(); - // Check whether columns to add are already existing. - for request in requests { - let column_name = &request.column_schema.name; - ensure!( - table_schema.column_schema_by_name(column_name).is_none(), - error::ColumnExistsSnafu { - column_name, - table_name, - } - ); - } - - // Collect names of columns to add for error message. - let mut column_names = Vec::with_capacity(requests.len()); - let mut primary_key_indices = self.primary_key_indices.clone(); + let SplitResult { + columns_at_first, + columns_at_after, + columns_at_last, + column_names, + } = self.split_requests_by_column_location(table_name, requests)?; + let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len()); let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len()); - columns.extend_from_slice(table_schema.column_schemas()); - // Append new columns to the end of column list. - for request in requests { - column_names.push(request.column_schema.name.clone()); + // add new columns with FIRST, and in reverse order of requests. + columns_at_first.iter().rev().for_each(|request| { if request.is_key { // If a key column is added, we also need to store its index in primary_key_indices. primary_key_indices.push(columns.len()); } columns.push(request.column_schema.clone()); + }); + // add existed columns in original order and handle new columns with AFTER. + for (index, column_schema) in table_schema.column_schemas().iter().enumerate() { + if original_primary_key_indices.contains(&index) { + primary_key_indices.push(columns.len()); + } + columns.push(column_schema.clone()); + if let Some(requests) = columns_at_after.get(&column_schema.name) { + requests.iter().rev().for_each(|request| { + if request.is_key { + // If a key column is added, we also need to store its index in primary_key_indices. + primary_key_indices.push(columns.len()); + } + columns.push(request.column_schema.clone()); + }); + } } + // add new columns without location info to last. + columns_at_last.iter().for_each(|request| { + if request.is_key { + // If a key column is added, we also need to store its index in primary_key_indices. + primary_key_indices.push(columns.len()); + } + columns.push(request.column_schema.clone()); + }); let mut builder = SchemaBuilder::try_from(columns) .with_context(|_| error::SchemaBuildSnafu { @@ -358,6 +387,58 @@ impl TableMeta { Ok(meta_builder) } + + /// Split requests into different groups using column location info. + fn split_requests_by_column_location<'a>( + &self, + table_name: &str, + requests: &'a [AddColumnRequest], + ) -> Result> { + let table_schema = &self.schema; + let mut columns_at_first = Vec::new(); + let mut columns_at_after = HashMap::new(); + let mut columns_at_last = Vec::new(); + let mut column_names = Vec::with_capacity(requests.len()); + for request in requests { + // Check whether columns to add are already existing. + let column_name = &request.column_schema.name; + column_names.push(column_name.clone()); + ensure!( + table_schema.column_schema_by_name(column_name).is_none(), + error::ColumnExistsSnafu { + column_name, + table_name, + } + ); + match request.location.as_ref() { + Some(AddColumnLocation::First) => { + columns_at_first.push(request); + } + Some(AddColumnLocation::After { column_name }) => { + ensure!( + table_schema.column_schema_by_name(column_name).is_some(), + error::ColumnNotExistsSnafu { + column_name, + table_name, + } + ); + columns_at_after + .entry(column_name.clone()) + .or_insert(Vec::new()) + .push(request); + } + None => { + columns_at_last.push(request); + } + } + } + Ok(SplitResult { + columns_at_first, + columns_at_after, + columns_at_last, + column_names, + }) + } } #[derive(Clone, Debug, PartialEq, Eq, Builder)] @@ -568,10 +649,42 @@ mod tests { AddColumnRequest { column_schema: new_tag, is_key: true, + location: None, }, AddColumnRequest { column_schema: new_field, is_key: false, + location: None, + }, + ], + }; + + let builder = meta + .builder_with_alter_kind("my_table", &alter_kind) + .unwrap(); + builder.build().unwrap() + } + + fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta { + let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true); + let new_field = ColumnSchema::new( + "my_field_after_ts", + ConcreteDataType::string_datatype(), + true, + ); + let alter_kind = AlterKind::AddColumns { + columns: vec![ + AddColumnRequest { + column_schema: new_tag, + is_key: true, + location: Some(AddColumnLocation::First), + }, + AddColumnRequest { + column_schema: new_field, + is_key: false, + location: Some(AddColumnLocation::After { + column_name: "ts".to_string(), + }), }, ], }; @@ -714,6 +827,7 @@ mod tests { columns: vec![AddColumnRequest { column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true), is_key: false, + location: None, }], }; @@ -798,4 +912,32 @@ mod tests { assert_eq!(4, meta.next_column_id); assert_eq!(column_schema.name, desc.name); } + + #[test] + fn test_add_columns_with_location() { + let schema = Arc::new(new_test_schema()); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .build() + .unwrap(); + + let new_meta = add_columns_to_meta_with_location(&meta); + assert_eq!(meta.region_numbers, new_meta.region_numbers); + + let names: Vec = new_meta + .schema + .column_schemas() + .iter() + .map(|column_schema| column_schema.name.clone()) + .collect(); + assert_eq!( + &["my_tag_first", "col1", "ts", "my_field_after_ts", "col2"], + &names[..] + ); + assert_eq!(&[0, 1], &new_meta.primary_key_indices[..]); + assert_eq!(&[2, 3, 4], &new_meta.value_indices[..]); + } } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index fed3ccc868..1519581464 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -19,6 +19,7 @@ use std::str::FromStr; use std::time::Duration; use common_base::readable_size::ReadableSize; +use common_query::AddColumnLocation; use datatypes::prelude::VectorRef; use datatypes::schema::{ColumnSchema, RawSchema}; use serde::{Deserialize, Serialize}; @@ -207,6 +208,7 @@ impl AlterTableRequest { pub struct AddColumnRequest { pub column_schema: ColumnSchema, pub is_key: bool, + pub location: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/tests/cases/standalone/common/alter/alter_table_first_after.result b/tests/cases/standalone/common/alter/alter_table_first_after.result new file mode 100644 index 0000000000..b9ad3786cc --- /dev/null +++ b/tests/cases/standalone/common/alter/alter_table_first_after.result @@ -0,0 +1,174 @@ +CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX); + +Affected Rows: 0 + +DESC TABLE t; + ++-------+-------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++-------+-------+------+---------+---------------+ +| i | Int32 | YES | | FIELD | +| j | Int64 | NO | | TIME INDEX | ++-------+-------+------+---------+---------------+ + +ALTER TABLE t ADD COLUMN k INTEGER; + +Affected Rows: 0 + +DESC TABLE t; + ++-------+-------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++-------+-------+------+---------+---------------+ +| i | Int32 | YES | | FIELD | +| j | Int64 | NO | | TIME INDEX | +| k | Int32 | YES | | FIELD | ++-------+-------+------+---------+---------------+ + +-- SQLNESS ARG restart=true +DESC TABLE t; + ++-------+-------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++-------+-------+------+---------+---------------+ +| i | Int32 | YES | | FIELD | +| j | Int64 | NO | | TIME INDEX | +| k | Int32 | YES | | FIELD | ++-------+-------+------+---------+---------------+ + +ALTER TABLE t ADD COLUMN m INTEGER; + +Affected Rows: 0 + +DESC TABLE t; + ++-------+-------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++-------+-------+------+---------+---------------+ +| i | Int32 | YES | | FIELD | +| j | Int64 | NO | | TIME INDEX | +| k | Int32 | YES | | FIELD | +| m | Int32 | YES | | FIELD | ++-------+-------+------+---------+---------------+ + +INSERT INTO t VALUES (1, 2, 3, 4); + +Affected Rows: 1 + +SELECT * FROM t; + ++---+---+---+---+ +| i | j | k | m | ++---+---+---+---+ +| 1 | 2 | 3 | 4 | ++---+---+---+---+ + +ALTER TABLE t ADD COLUMN n INTEGER FIRST; + +Affected Rows: 0 + +DESC TABLE t; + ++-------+-------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++-------+-------+------+---------+---------------+ +| n | Int32 | YES | | FIELD | +| i | Int32 | YES | | FIELD | +| j | Int64 | NO | | TIME INDEX | +| k | Int32 | YES | | FIELD | +| m | Int32 | YES | | FIELD | ++-------+-------+------+---------+---------------+ + +SELECT * FROM t; + ++---+---+---+---+---+ +| n | i | j | k | m | ++---+---+---+---+---+ +| | 1 | 2 | 3 | 4 | ++---+---+---+---+---+ + +INSERT INTO t VALUES (2, 3, 4, 5, 6); + +Affected Rows: 1 + +ALTER TABLE t ADD COLUMN y INTEGER AFTER j; + +Affected Rows: 0 + +DESC TABLE t; + ++-------+-------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++-------+-------+------+---------+---------------+ +| n | Int32 | YES | | FIELD | +| i | Int32 | YES | | FIELD | +| j | Int64 | NO | | TIME INDEX | +| y | Int32 | YES | | FIELD | +| k | Int32 | YES | | FIELD | +| m | Int32 | YES | | FIELD | ++-------+-------+------+---------+---------------+ + +SELECT * FROM t; + ++---+---+---+---+---+---+ +| n | i | j | y | k | m | ++---+---+---+---+---+---+ +| | 1 | 2 | | 3 | 4 | +| 2 | 3 | 4 | | 5 | 6 | ++---+---+---+---+---+---+ + +-- SQLNESS ARG restart=true +ALTER TABLE t ADD COLUMN a INTEGER FIRST; + +Affected Rows: 0 + +DESC TABLE t; + ++-------+-------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++-------+-------+------+---------+---------------+ +| a | Int32 | YES | | FIELD | +| n | Int32 | YES | | FIELD | +| i | Int32 | YES | | FIELD | +| j | Int64 | NO | | TIME INDEX | +| y | Int32 | YES | | FIELD | +| k | Int32 | YES | | FIELD | +| m | Int32 | YES | | FIELD | ++-------+-------+------+---------+---------------+ + +ALTER TABLE t ADD COLUMN b INTEGER AFTER j; + +Affected Rows: 0 + +DESC TABLE t; + ++-------+-------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++-------+-------+------+---------+---------------+ +| a | Int32 | YES | | FIELD | +| n | Int32 | YES | | FIELD | +| i | Int32 | YES | | FIELD | +| j | Int64 | NO | | TIME INDEX | +| b | Int32 | YES | | FIELD | +| y | Int32 | YES | | FIELD | +| k | Int32 | YES | | FIELD | +| m | Int32 | YES | | FIELD | ++-------+-------+------+---------+---------------+ + +SELECT * FROM t; + ++---+---+---+---+---+---+---+---+ +| a | n | i | j | b | y | k | m | ++---+---+---+---+---+---+---+---+ +| | | 1 | 2 | | | 3 | 4 | +| | 2 | 3 | 4 | | | 5 | 6 | ++---+---+---+---+---+---+---+---+ + +ALTER TABLE t ADD COLUMN x int xxx; + +Error: 1001(Unsupported), SQL statement is not supported: ALTER TABLE t ADD COLUMN x int xxx;, keyword: xxx + +DROP TABLE t; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/alter/alter_table_first_after.sql b/tests/cases/standalone/common/alter/alter_table_first_after.sql new file mode 100644 index 0000000000..399f89ff5a --- /dev/null +++ b/tests/cases/standalone/common/alter/alter_table_first_after.sql @@ -0,0 +1,47 @@ +CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX); + +DESC TABLE t; + +ALTER TABLE t ADD COLUMN k INTEGER; + +DESC TABLE t; + +-- SQLNESS ARG restart=true +DESC TABLE t; + +ALTER TABLE t ADD COLUMN m INTEGER; + +DESC TABLE t; + +INSERT INTO t VALUES (1, 2, 3, 4); + +SELECT * FROM t; + +ALTER TABLE t ADD COLUMN n INTEGER FIRST; + +DESC TABLE t; + +SELECT * FROM t; + +INSERT INTO t VALUES (2, 3, 4, 5, 6); + +ALTER TABLE t ADD COLUMN y INTEGER AFTER j; + +DESC TABLE t; + +SELECT * FROM t; + +-- SQLNESS ARG restart=true +ALTER TABLE t ADD COLUMN a INTEGER FIRST; + +DESC TABLE t; + +ALTER TABLE t ADD COLUMN b INTEGER AFTER j; + +DESC TABLE t; + +SELECT * FROM t; + +ALTER TABLE t ADD COLUMN x int xxx; + +DROP TABLE t;