From b619950c701f41557e56f66d10366d36354d7df3 Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 24 Apr 2024 12:27:23 +0800 Subject: [PATCH] feat: add `ChangeColumnType` for `AlterKind` (#3757) * feat: add `ModifyColumn` for `AlterKind` * chore: additional code comments for `AlterKind::ModifyColumns` * fix: add nullable check on `ModifyColumn` * style: codefmt * style: fix the code based on review suggestions * style: fix the code based on review suggestions * style: rename `ModifyColumn` -> `ChangeColumnType` * style: code fmt * style: `change_columns_type` -> `change_column_types` --- .../src/ddl/alter_table/update_metadata.rs | 2 +- src/store-api/src/metadata.rs | 81 +++++-- src/store-api/src/region_request.rs | 122 +++++++++++ src/table/src/metadata.rs | 199 +++++++++++++++++- src/table/src/requests.rs | 23 +- 5 files changed, 404 insertions(+), 23 deletions(-) diff --git a/src/common/meta/src/ddl/alter_table/update_metadata.rs b/src/common/meta/src/ddl/alter_table/update_metadata.rs index a2bc444860..6ef6e2e7fc 100644 --- a/src/common/meta/src/ddl/alter_table/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_table/update_metadata.rs @@ -51,7 +51,7 @@ impl AlterTableProcedure { AlterKind::RenameTable { new_table_name } => { new_info.name = new_table_name.to_string(); } - AlterKind::DropColumns { .. } => {} + AlterKind::DropColumns { .. } | AlterKind::ChangeColumnTypes { .. } => {} } Ok(new_info) diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 1c9c970011..5874f23da4 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::region::RegionColumnDef; -use api::v1::SemanticType; +use api::v1::{ColumnDef, SemanticType}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; @@ -33,7 +33,7 @@ use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use snafu::{ensure, Location, OptionExt, ResultExt, Snafu}; -use crate::region_request::{AddColumn, AddColumnLocation, AlterKind}; +use crate::region_request::{AddColumn, AddColumnLocation, AlterKind, ChangeColumnType}; use crate::storage::consts::is_internal_column; use crate::storage::{ColumnId, RegionId}; @@ -61,18 +61,7 @@ impl fmt::Debug for ColumnMetadata { } impl ColumnMetadata { - /// Construct `Self` from protobuf struct [RegionColumnDef] - pub fn try_from_column_def(column_def: RegionColumnDef) -> Result { - let column_id = column_def.column_id; - - let column_def = column_def - .column_def - .context(InvalidRawRegionRequestSnafu { - err: "column_def is absent", - })?; - - let semantic_type = column_def.semantic_type(); - + fn inner_try_from_column_def(column_def: ColumnDef) -> Result { let default_constrain = if column_def.default_constraint.is_empty() { None } else { @@ -86,9 +75,22 @@ impl ColumnMetadata { column_def.datatype_extension.clone(), ) .into(); - let column_schema = ColumnSchema::new(column_def.name, data_type, column_def.is_nullable) + ColumnSchema::new(column_def.name, data_type, column_def.is_nullable) .with_default_constraint(default_constrain) - .context(ConvertDatatypesSnafu)?; + .context(ConvertDatatypesSnafu) + } + + /// Construct `Self` from protobuf struct [RegionColumnDef] + pub fn try_from_column_def(column_def: RegionColumnDef) -> Result { + let column_id = column_def.column_id; + let column_def = column_def + .column_def + .context(InvalidRawRegionRequestSnafu { + err: "column_def is absent", + })?; + let semantic_type = column_def.semantic_type(); + let column_schema = Self::inner_try_from_column_def(column_def)?; + Ok(Self { column_schema, semantic_type, @@ -535,6 +537,7 @@ impl RegionMetadataBuilder { match kind { AlterKind::AddColumns { columns } => self.add_columns(columns)?, AlterKind::DropColumns { names } => self.drop_columns(&names), + AlterKind::ChangeColumnTypes { columns } => self.change_column_types(columns), } Ok(self) } @@ -615,6 +618,25 @@ impl RegionMetadataBuilder { self.column_metadatas .retain(|col| !name_set.contains(&col.column_schema.name)); } + + /// Changes columns type to the metadata if exist. + fn change_column_types(&mut self, columns: Vec) { + let mut change_type_map: HashMap<_, _> = columns + .into_iter() + .map( + |ChangeColumnType { + column_name, + target_type, + }| (column_name, target_type), + ) + .collect(); + + for column_meta in self.column_metadatas.iter_mut() { + if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) { + column_meta.column_schema.data_type = target_type; + } + } + } } /// Fields skipped in serialization. @@ -707,6 +729,13 @@ pub enum MetadataError { #[snafu(display("Time index column not found"))] TimeIndexNotFound { location: Location }, + + #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))] + ChangeColumnNotFound { + column_name: String, + region_id: RegionId, + location: Location, + }, } impl ErrorExt for MetadataError { @@ -1112,7 +1141,7 @@ mod test { let metadata = builder.build().unwrap(); check_columns(&metadata, &["a", "b", "f", "c", "d"]); - let mut builder = RegionMetadataBuilder::from_existing(metadata); + let mut builder = RegionMetadataBuilder::from_existing(metadata.clone()); builder .alter(AlterKind::DropColumns { names: vec!["a".to_string()], @@ -1121,6 +1150,24 @@ mod test { // Build returns error as the primary key contains a. let err = builder.build().unwrap_err(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); + + let mut builder = RegionMetadataBuilder::from_existing(metadata); + builder + .alter(AlterKind::ChangeColumnTypes { + columns: vec![ChangeColumnType { + column_name: "b".to_string(), + target_type: ConcreteDataType::string_datatype(), + }], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["a", "b", "f", "c", "d"]); + let b_type = &metadata + .column_by_name("b") + .unwrap() + .column_schema + .data_type; + assert_eq!(ConcreteDataType::string_datatype(), *b_type); } #[test] diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index f07b240b2a..b98c3951d5 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -23,6 +23,7 @@ use api::v1::region::{ }; use api::v1::{self, Rows, SemanticType}; pub use common_base::AffectedRows; +use datatypes::data_type::ConcreteDataType; use snafu::{ensure, OptionExt}; use strum::IntoStaticStr; @@ -332,6 +333,11 @@ pub enum AlterKind { /// Name of columns to drop. names: Vec, }, + /// Change columns datatype form the region, only fields are allowed to change. + ChangeColumnTypes { + /// Columns to change. + columns: Vec, + }, } impl AlterKind { @@ -350,6 +356,11 @@ impl AlterKind { Self::validate_column_to_drop(name, metadata)?; } } + AlterKind::ChangeColumnTypes { columns } => { + for col_to_change in columns { + col_to_change.validate(metadata)?; + } + } } Ok(()) } @@ -364,6 +375,9 @@ impl AlterKind { AlterKind::DropColumns { names } => names .iter() .any(|name| metadata.column_by_name(name).is_some()), + AlterKind::ChangeColumnTypes { columns } => columns + .iter() + .any(|col_to_change| col_to_change.need_alter(metadata)), } } @@ -501,6 +515,56 @@ impl TryFrom for AddColumnLocation { } } +/// Change a column's datatype. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ChangeColumnType { + /// Schema of the column to modify. + pub column_name: String, + /// Column will be changed to this type. + pub target_type: ConcreteDataType, +} + +impl ChangeColumnType { + /// Returns an error if the column's datatype to change is invalid. + pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> { + let column_meta = metadata + .column_by_name(&self.column_name) + .with_context(|| InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!("column {} not found", self.column_name), + })?; + + ensure!( + matches!(column_meta.semantic_type, SemanticType::Field), + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: "'timestamp' or 'tag' column cannot change type".to_string() + } + ); + ensure!( + column_meta + .column_schema + .data_type + .can_arrow_type_cast_to(&self.target_type), + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "column '{}' cannot be cast automatically to type '{}'", + self.column_name, self.target_type + ), + } + ); + + Ok(()) + } + + /// Returns true if no column's datatype to change to the region. + pub fn need_alter(&self, metadata: &RegionMetadata) -> bool { + debug_assert!(self.validate(metadata).is_ok()); + metadata.column_by_name(&self.column_name).is_some() + } +} + #[derive(Debug, Default)] pub struct RegionFlushRequest { pub row_group_size: Option, @@ -678,6 +742,15 @@ mod tests { semantic_type: SemanticType::Field, column_id: 3, }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_1", + ConcreteDataType::boolean_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 4, + }) .primary_key(vec![2]); builder.build().unwrap() } @@ -790,6 +863,55 @@ mod tests { assert!(kind.need_alter(&metadata)); } + #[test] + fn test_validate_change_column_type() { + let metadata = new_metadata(); + AlterKind::ChangeColumnTypes { + columns: vec![ChangeColumnType { + column_name: "xxxx".to_string(), + target_type: ConcreteDataType::string_datatype(), + }], + } + .validate(&metadata) + .unwrap_err(); + + AlterKind::ChangeColumnTypes { + columns: vec![ChangeColumnType { + column_name: "field_1".to_string(), + target_type: ConcreteDataType::date_datatype(), + }], + } + .validate(&metadata) + .unwrap_err(); + + AlterKind::ChangeColumnTypes { + columns: vec![ChangeColumnType { + column_name: "ts".to_string(), + target_type: ConcreteDataType::date_datatype(), + }], + } + .validate(&metadata) + .unwrap_err(); + + AlterKind::ChangeColumnTypes { + columns: vec![ChangeColumnType { + column_name: "tag_0".to_string(), + target_type: ConcreteDataType::date_datatype(), + }], + } + .validate(&metadata) + .unwrap_err(); + + let kind = AlterKind::ChangeColumnTypes { + columns: vec![ChangeColumnType { + column_name: "field_0".to_string(), + target_type: ConcreteDataType::int32_datatype(), + }], + }; + kind.validate(&metadata).unwrap(); + assert!(kind.need_alter(&metadata)); + } + #[test] fn test_validate_schema_version() { let mut metadata = new_metadata(); diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 4746a17bcf..6e91b37fd8 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -23,11 +23,11 @@ pub use datatypes::error::{Error as ConvertError, Result as ConvertResult}; use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; use derive_builder::Builder; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId}; use crate::error::{self, Result}; -use crate::requests::{AddColumnRequest, AlterKind, TableOptions}; +use crate::requests::{AddColumnRequest, AlterKind, ChangeColumnTypeRequest, TableOptions}; pub type TableId = u32; pub type TableVersion = u64; @@ -195,6 +195,9 @@ impl TableMeta { self.add_columns(table_name, columns, add_if_not_exists) } AlterKind::DropColumns { names } => self.remove_columns(table_name, names), + AlterKind::ChangeColumnTypes { columns } => { + self.change_column_types(table_name, columns) + } // No need to rebuild table meta when renaming tables. AlterKind::RenameTable { .. } => { let mut meta_builder = TableMetaBuilder::default(); @@ -458,6 +461,133 @@ impl TableMeta { Ok(meta_builder) } + fn change_column_types( + &self, + table_name: &str, + requests: &[ChangeColumnTypeRequest], + ) -> Result { + let table_schema = &self.schema; + let mut meta_builder = self.new_meta_builder(); + + let mut change_column_types = HashMap::with_capacity(requests.len()); + let timestamp_index = table_schema.timestamp_index(); + + for col_to_change in requests { + let change_column_name = &col_to_change.column_name; + + let index = table_schema + .column_index_by_name(change_column_name) + .with_context(|| error::ColumnNotExistsSnafu { + column_name: change_column_name, + table_name, + })?; + + let column = &table_schema.column_schemas()[index]; + + ensure!( + !self.primary_key_indices.contains(&index), + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "Not allowed to change primary key index column '{}'", + column.name + ) + } + ); + + if let Some(ts_index) = timestamp_index { + // Not allowed to change column datatype in timestamp index. + ensure!( + index != ts_index, + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "Not allowed to change timestamp index column '{}' datatype", + column.name + ) + } + ); + } + + ensure!( + change_column_types + .insert(&col_to_change.column_name, col_to_change) + .is_none(), + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "change column datatype {} more than once", + col_to_change.column_name + ), + } + ); + + ensure!( + column + .data_type + .can_arrow_type_cast_to(&col_to_change.target_type), + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "column '{}' cannot be cast automatically to type '{}'", + col_to_change.column_name, col_to_change.target_type, + ), + } + ); + + ensure!( + column.is_nullable(), + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "column '{}' must be nullable to ensure safe conversion.", + col_to_change.column_name, + ), + } + ); + } + // Collect columns after changed. + let columns: Vec<_> = table_schema + .column_schemas() + .iter() + .cloned() + .map(|mut column| { + if let Some(change_column) = change_column_types.get(&column.name) { + column.data_type = change_column.target_type.clone(); + } + column + }) + .collect(); + + let mut builder = SchemaBuilder::try_from_columns(columns) + .with_context(|_| error::SchemaBuildSnafu { + msg: format!("Failed to convert column schemas into schema for table {table_name}"), + })? + // Also bump the schema version. + .version(table_schema.version() + 1); + for (k, v) in table_schema.metadata().iter() { + builder = builder.add_metadata(k, v); + } + let new_schema = builder.build().with_context(|_| { + let column_names: Vec<_> = requests + .iter() + .map(|request| &request.column_name) + .collect(); + + error::SchemaBuildSnafu { + msg: format!( + "Table {table_name} cannot change datatype with columns {column_names:?}" + ), + } + })?; + + let _ = meta_builder + .schema(Arc::new(new_schema)) + .primary_key_indices(self.primary_key_indices.clone()); + + Ok(meta_builder) + } + /// Split requests into different groups using column location info. fn split_requests_by_column_location<'a>( &self, @@ -1027,6 +1157,31 @@ mod tests { assert_eq!(StatusCode::TableColumnNotFound, err.status_code()); } + #[test] + fn test_change_unknown_column_data_type() { + let schema = Arc::new(new_test_schema()); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .build() + .unwrap(); + + let alter_kind = AlterKind::ChangeColumnTypes { + columns: vec![ChangeColumnTypeRequest { + column_name: "unknown".to_string(), + target_type: ConcreteDataType::string_datatype(), + }], + }; + + let err = meta + .builder_with_alter_kind("my_table", &alter_kind, false) + .err() + .unwrap(); + assert_eq!(StatusCode::TableColumnNotFound, err.status_code()); + } + #[test] fn test_remove_key_column() { let schema = Arc::new(new_test_schema()); @@ -1061,6 +1216,46 @@ mod tests { assert_eq!(StatusCode::InvalidArguments, err.status_code()); } + #[test] + fn test_change_key_column_data_type() { + let schema = Arc::new(new_test_schema()); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .build() + .unwrap(); + + // Remove column in primary key. + let alter_kind = AlterKind::ChangeColumnTypes { + columns: vec![ChangeColumnTypeRequest { + column_name: "col1".to_string(), + target_type: ConcreteDataType::string_datatype(), + }], + }; + + let err = meta + .builder_with_alter_kind("my_table", &alter_kind, false) + .err() + .unwrap(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + + // Remove timestamp column. + let alter_kind = AlterKind::ChangeColumnTypes { + columns: vec![ChangeColumnTypeRequest { + column_name: "ts".to_string(), + target_type: ConcreteDataType::string_datatype(), + }], + }; + + let err = meta + .builder_with_alter_kind("my_table", &alter_kind, false) + .err() + .unwrap(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + } + #[test] fn test_alloc_new_column() { let schema = Arc::new(new_test_schema()); diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 0c1e322952..9472491bcb 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -22,6 +22,7 @@ use common_base::readable_size::ReadableSize; use common_datasource::object_store::s3::is_supported_in_s3; use common_query::AddColumnLocation; use common_time::range::TimestampRange; +use datatypes::data_type::ConcreteDataType; use datatypes::prelude::VectorRef; use datatypes::schema::ColumnSchema; use serde::{Deserialize, Serialize}; @@ -164,11 +165,27 @@ pub struct AddColumnRequest { pub location: Option, } +/// Change column datatype request +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChangeColumnTypeRequest { + pub column_name: String, + pub target_type: ConcreteDataType, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum AlterKind { - AddColumns { columns: Vec }, - DropColumns { names: Vec }, - RenameTable { new_table_name: String }, + AddColumns { + columns: Vec, + }, + DropColumns { + names: Vec, + }, + ChangeColumnTypes { + columns: Vec, + }, + RenameTable { + new_table_name: String, + }, } #[derive(Debug)]