feat: add ChangeColumnType for AlterKind (#3757)

* feat: add `ModifyColumn` for `AlterKind`

* chore: additional code comments for `AlterKind::ModifyColumns`

* fix: add nullable check on `ModifyColumn`

* style: codefmt

* style: fix the code based on review suggestions

* style: fix the code based on review suggestions

* style: rename `ModifyColumn` -> `ChangeColumnType`

* style: code fmt

* style: `change_columns_type` -> `change_column_types`
This commit is contained in:
Kould
2024-04-24 12:27:23 +08:00
committed by GitHub
parent 4685b59ef1
commit b619950c70
5 changed files with 404 additions and 23 deletions

View File

@@ -51,7 +51,7 @@ impl AlterTableProcedure {
AlterKind::RenameTable { new_table_name } => {
new_info.name = new_table_name.to_string();
}
AlterKind::DropColumns { .. } => {}
AlterKind::DropColumns { .. } | AlterKind::ChangeColumnTypes { .. } => {}
}
Ok(new_info)

View File

@@ -23,7 +23,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::region::RegionColumnDef;
use api::v1::SemanticType;
use api::v1::{ColumnDef, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
@@ -33,7 +33,7 @@ use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
use crate::region_request::{AddColumn, AddColumnLocation, AlterKind};
use crate::region_request::{AddColumn, AddColumnLocation, AlterKind, ChangeColumnType};
use crate::storage::consts::is_internal_column;
use crate::storage::{ColumnId, RegionId};
@@ -61,18 +61,7 @@ impl fmt::Debug for ColumnMetadata {
}
impl ColumnMetadata {
/// Construct `Self` from protobuf struct [RegionColumnDef]
pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
let column_id = column_def.column_id;
let column_def = column_def
.column_def
.context(InvalidRawRegionRequestSnafu {
err: "column_def is absent",
})?;
let semantic_type = column_def.semantic_type();
fn inner_try_from_column_def(column_def: ColumnDef) -> Result<ColumnSchema> {
let default_constrain = if column_def.default_constraint.is_empty() {
None
} else {
@@ -86,9 +75,22 @@ impl ColumnMetadata {
column_def.datatype_extension.clone(),
)
.into();
let column_schema = ColumnSchema::new(column_def.name, data_type, column_def.is_nullable)
ColumnSchema::new(column_def.name, data_type, column_def.is_nullable)
.with_default_constraint(default_constrain)
.context(ConvertDatatypesSnafu)?;
.context(ConvertDatatypesSnafu)
}
/// Construct `Self` from protobuf struct [RegionColumnDef]
pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
let column_id = column_def.column_id;
let column_def = column_def
.column_def
.context(InvalidRawRegionRequestSnafu {
err: "column_def is absent",
})?;
let semantic_type = column_def.semantic_type();
let column_schema = Self::inner_try_from_column_def(column_def)?;
Ok(Self {
column_schema,
semantic_type,
@@ -535,6 +537,7 @@ impl RegionMetadataBuilder {
match kind {
AlterKind::AddColumns { columns } => self.add_columns(columns)?,
AlterKind::DropColumns { names } => self.drop_columns(&names),
AlterKind::ChangeColumnTypes { columns } => self.change_column_types(columns),
}
Ok(self)
}
@@ -615,6 +618,25 @@ impl RegionMetadataBuilder {
self.column_metadatas
.retain(|col| !name_set.contains(&col.column_schema.name));
}
/// Changes columns type to the metadata if exist.
fn change_column_types(&mut self, columns: Vec<ChangeColumnType>) {
let mut change_type_map: HashMap<_, _> = columns
.into_iter()
.map(
|ChangeColumnType {
column_name,
target_type,
}| (column_name, target_type),
)
.collect();
for column_meta in self.column_metadatas.iter_mut() {
if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
column_meta.column_schema.data_type = target_type;
}
}
}
}
/// Fields skipped in serialization.
@@ -707,6 +729,13 @@ pub enum MetadataError {
#[snafu(display("Time index column not found"))]
TimeIndexNotFound { location: Location },
#[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
ChangeColumnNotFound {
column_name: String,
region_id: RegionId,
location: Location,
},
}
impl ErrorExt for MetadataError {
@@ -1112,7 +1141,7 @@ mod test {
let metadata = builder.build().unwrap();
check_columns(&metadata, &["a", "b", "f", "c", "d"]);
let mut builder = RegionMetadataBuilder::from_existing(metadata);
let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
builder
.alter(AlterKind::DropColumns {
names: vec!["a".to_string()],
@@ -1121,6 +1150,24 @@ mod test {
// Build returns error as the primary key contains a.
let err = builder.build().unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
let mut builder = RegionMetadataBuilder::from_existing(metadata);
builder
.alter(AlterKind::ChangeColumnTypes {
columns: vec![ChangeColumnType {
column_name: "b".to_string(),
target_type: ConcreteDataType::string_datatype(),
}],
})
.unwrap();
let metadata = builder.build().unwrap();
check_columns(&metadata, &["a", "b", "f", "c", "d"]);
let b_type = &metadata
.column_by_name("b")
.unwrap()
.column_schema
.data_type;
assert_eq!(ConcreteDataType::string_datatype(), *b_type);
}
#[test]

View File

@@ -23,6 +23,7 @@ use api::v1::region::{
};
use api::v1::{self, Rows, SemanticType};
pub use common_base::AffectedRows;
use datatypes::data_type::ConcreteDataType;
use snafu::{ensure, OptionExt};
use strum::IntoStaticStr;
@@ -332,6 +333,11 @@ pub enum AlterKind {
/// Name of columns to drop.
names: Vec<String>,
},
/// Change columns datatype form the region, only fields are allowed to change.
ChangeColumnTypes {
/// Columns to change.
columns: Vec<ChangeColumnType>,
},
}
impl AlterKind {
@@ -350,6 +356,11 @@ impl AlterKind {
Self::validate_column_to_drop(name, metadata)?;
}
}
AlterKind::ChangeColumnTypes { columns } => {
for col_to_change in columns {
col_to_change.validate(metadata)?;
}
}
}
Ok(())
}
@@ -364,6 +375,9 @@ impl AlterKind {
AlterKind::DropColumns { names } => names
.iter()
.any(|name| metadata.column_by_name(name).is_some()),
AlterKind::ChangeColumnTypes { columns } => columns
.iter()
.any(|col_to_change| col_to_change.need_alter(metadata)),
}
}
@@ -501,6 +515,56 @@ impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
}
}
/// Change a column's datatype.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ChangeColumnType {
/// Schema of the column to modify.
pub column_name: String,
/// Column will be changed to this type.
pub target_type: ConcreteDataType,
}
impl ChangeColumnType {
/// Returns an error if the column's datatype to change is invalid.
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
let column_meta = metadata
.column_by_name(&self.column_name)
.with_context(|| InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!("column {} not found", self.column_name),
})?;
ensure!(
matches!(column_meta.semantic_type, SemanticType::Field),
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: "'timestamp' or 'tag' column cannot change type".to_string()
}
);
ensure!(
column_meta
.column_schema
.data_type
.can_arrow_type_cast_to(&self.target_type),
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column '{}' cannot be cast automatically to type '{}'",
self.column_name, self.target_type
),
}
);
Ok(())
}
/// Returns true if no column's datatype to change to the region.
pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
debug_assert!(self.validate(metadata).is_ok());
metadata.column_by_name(&self.column_name).is_some()
}
}
#[derive(Debug, Default)]
pub struct RegionFlushRequest {
pub row_group_size: Option<usize>,
@@ -678,6 +742,15 @@ mod tests {
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_1",
ConcreteDataType::boolean_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 4,
})
.primary_key(vec![2]);
builder.build().unwrap()
}
@@ -790,6 +863,55 @@ mod tests {
assert!(kind.need_alter(&metadata));
}
#[test]
fn test_validate_change_column_type() {
let metadata = new_metadata();
AlterKind::ChangeColumnTypes {
columns: vec![ChangeColumnType {
column_name: "xxxx".to_string(),
target_type: ConcreteDataType::string_datatype(),
}],
}
.validate(&metadata)
.unwrap_err();
AlterKind::ChangeColumnTypes {
columns: vec![ChangeColumnType {
column_name: "field_1".to_string(),
target_type: ConcreteDataType::date_datatype(),
}],
}
.validate(&metadata)
.unwrap_err();
AlterKind::ChangeColumnTypes {
columns: vec![ChangeColumnType {
column_name: "ts".to_string(),
target_type: ConcreteDataType::date_datatype(),
}],
}
.validate(&metadata)
.unwrap_err();
AlterKind::ChangeColumnTypes {
columns: vec![ChangeColumnType {
column_name: "tag_0".to_string(),
target_type: ConcreteDataType::date_datatype(),
}],
}
.validate(&metadata)
.unwrap_err();
let kind = AlterKind::ChangeColumnTypes {
columns: vec![ChangeColumnType {
column_name: "field_0".to_string(),
target_type: ConcreteDataType::int32_datatype(),
}],
};
kind.validate(&metadata).unwrap();
assert!(kind.need_alter(&metadata));
}
#[test]
fn test_validate_schema_version() {
let mut metadata = new_metadata();

View File

@@ -23,11 +23,11 @@ pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef};
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
use crate::error::{self, Result};
use crate::requests::{AddColumnRequest, AlterKind, TableOptions};
use crate::requests::{AddColumnRequest, AlterKind, ChangeColumnTypeRequest, TableOptions};
pub type TableId = u32;
pub type TableVersion = u64;
@@ -195,6 +195,9 @@ impl TableMeta {
self.add_columns(table_name, columns, add_if_not_exists)
}
AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
AlterKind::ChangeColumnTypes { columns } => {
self.change_column_types(table_name, columns)
}
// No need to rebuild table meta when renaming tables.
AlterKind::RenameTable { .. } => {
let mut meta_builder = TableMetaBuilder::default();
@@ -458,6 +461,133 @@ impl TableMeta {
Ok(meta_builder)
}
fn change_column_types(
&self,
table_name: &str,
requests: &[ChangeColumnTypeRequest],
) -> Result<TableMetaBuilder> {
let table_schema = &self.schema;
let mut meta_builder = self.new_meta_builder();
let mut change_column_types = HashMap::with_capacity(requests.len());
let timestamp_index = table_schema.timestamp_index();
for col_to_change in requests {
let change_column_name = &col_to_change.column_name;
let index = table_schema
.column_index_by_name(change_column_name)
.with_context(|| error::ColumnNotExistsSnafu {
column_name: change_column_name,
table_name,
})?;
let column = &table_schema.column_schemas()[index];
ensure!(
!self.primary_key_indices.contains(&index),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"Not allowed to change primary key index column '{}'",
column.name
)
}
);
if let Some(ts_index) = timestamp_index {
// Not allowed to change column datatype in timestamp index.
ensure!(
index != ts_index,
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"Not allowed to change timestamp index column '{}' datatype",
column.name
)
}
);
}
ensure!(
change_column_types
.insert(&col_to_change.column_name, col_to_change)
.is_none(),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"change column datatype {} more than once",
col_to_change.column_name
),
}
);
ensure!(
column
.data_type
.can_arrow_type_cast_to(&col_to_change.target_type),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"column '{}' cannot be cast automatically to type '{}'",
col_to_change.column_name, col_to_change.target_type,
),
}
);
ensure!(
column.is_nullable(),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"column '{}' must be nullable to ensure safe conversion.",
col_to_change.column_name,
),
}
);
}
// Collect columns after changed.
let columns: Vec<_> = table_schema
.column_schemas()
.iter()
.cloned()
.map(|mut column| {
if let Some(change_column) = change_column_types.get(&column.name) {
column.data_type = change_column.target_type.clone();
}
column
})
.collect();
let mut builder = SchemaBuilder::try_from_columns(columns)
.with_context(|_| error::SchemaBuildSnafu {
msg: format!("Failed to convert column schemas into schema for table {table_name}"),
})?
// Also bump the schema version.
.version(table_schema.version() + 1);
for (k, v) in table_schema.metadata().iter() {
builder = builder.add_metadata(k, v);
}
let new_schema = builder.build().with_context(|_| {
let column_names: Vec<_> = requests
.iter()
.map(|request| &request.column_name)
.collect();
error::SchemaBuildSnafu {
msg: format!(
"Table {table_name} cannot change datatype with columns {column_names:?}"
),
}
})?;
let _ = meta_builder
.schema(Arc::new(new_schema))
.primary_key_indices(self.primary_key_indices.clone());
Ok(meta_builder)
}
/// Split requests into different groups using column location info.
fn split_requests_by_column_location<'a>(
&self,
@@ -1027,6 +1157,31 @@ mod tests {
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
}
#[test]
fn test_change_unknown_column_data_type() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
let alter_kind = AlterKind::ChangeColumnTypes {
columns: vec![ChangeColumnTypeRequest {
column_name: "unknown".to_string(),
target_type: ConcreteDataType::string_datatype(),
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
}
#[test]
fn test_remove_key_column() {
let schema = Arc::new(new_test_schema());
@@ -1061,6 +1216,46 @@ mod tests {
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_change_key_column_data_type() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
// Remove column in primary key.
let alter_kind = AlterKind::ChangeColumnTypes {
columns: vec![ChangeColumnTypeRequest {
column_name: "col1".to_string(),
target_type: ConcreteDataType::string_datatype(),
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
// Remove timestamp column.
let alter_kind = AlterKind::ChangeColumnTypes {
columns: vec![ChangeColumnTypeRequest {
column_name: "ts".to_string(),
target_type: ConcreteDataType::string_datatype(),
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_alloc_new_column() {
let schema = Arc::new(new_test_schema());

View File

@@ -22,6 +22,7 @@ use common_base::readable_size::ReadableSize;
use common_datasource::object_store::s3::is_supported_in_s3;
use common_query::AddColumnLocation;
use common_time::range::TimestampRange;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::schema::ColumnSchema;
use serde::{Deserialize, Serialize};
@@ -164,11 +165,27 @@ pub struct AddColumnRequest {
pub location: Option<AddColumnLocation>,
}
/// Change column datatype request
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeColumnTypeRequest {
pub column_name: String,
pub target_type: ConcreteDataType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlterKind {
AddColumns { columns: Vec<AddColumnRequest> },
DropColumns { names: Vec<String> },
RenameTable { new_table_name: String },
AddColumns {
columns: Vec<AddColumnRequest>,
},
DropColumns {
names: Vec<String>,
},
ChangeColumnTypes {
columns: Vec<ChangeColumnTypeRequest>,
},
RenameTable {
new_table_name: String,
},
}
#[derive(Debug)]