feat: Add column supports at first or after the existing columns (#1621)

* feat: Add column supports at first or after the existing columns

* Update src/common/query/Cargo.toml

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
Zheming Li
2023-06-01 10:13:00 +08:00
committed by GitHub
parent 70e17ead68
commit 5467ea496f
19 changed files with 797 additions and 46 deletions

3
Cargo.lock generated
View File

@@ -1844,6 +1844,7 @@ dependencies = [
name = "common-query"
version = "0.2.0"
dependencies = [
"api",
"async-trait",
"common-base",
"common-error",
@@ -1853,6 +1854,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datatypes",
"serde",
"snafu",
"statrs",
"tokio",
@@ -8739,6 +8741,7 @@ dependencies = [
"common-catalog",
"common-datasource",
"common-error",
"common-query",
"common-time",
"datafusion-sql",
"datatypes",

View File

@@ -23,4 +23,5 @@ pub mod prometheus {
pub mod v1;
pub use greptime_proto;
pub use prost::DecodeError;

View File

@@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::add_column::location::LocationType;
use api::v1::add_column::Location;
use api::v1::alter_expr::Kind;
use api::v1::{column_def, AlterExpr, CreateTableExpr, DropColumns, RenameTable};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, RawSchema};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
@@ -24,9 +27,12 @@ use table::requests::{
use crate::error::{
ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu,
Result, UnrecognizedTableOptionSnafu,
Result, UnknownLocationTypeSnafu, UnrecognizedTableOptionSnafu,
};
const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
let catalog_name = expr.catalog_name;
@@ -50,6 +56,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
Ok(AddColumnRequest {
column_schema: schema,
is_key: ac.is_key,
location: parse_location(ac.location)?,
})
})
.collect::<Result<Vec<_>>>()?;
@@ -186,8 +193,26 @@ pub fn create_expr_to_request(
})
}
fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation>> {
match location {
Some(Location {
location_type: LOCATION_TYPE_FIRST,
..
}) => Ok(Some(AddColumnLocation::First)),
Some(Location {
location_type: LOCATION_TYPE_AFTER,
after_cloumn_name,
}) => Ok(Some(AddColumnLocation::After {
column_name: after_cloumn_name,
})),
Some(Location { location_type, .. }) => UnknownLocationTypeSnafu { location_type }.fail(),
None => Ok(None),
}
}
#[cfg(test)]
mod tests {
use api::v1::add_column::location::LocationType;
use api::v1::{AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn};
use datatypes::prelude::ConcreteDataType;
@@ -229,6 +254,80 @@ mod tests {
ConcreteDataType::float64_datatype(),
add_column.column_schema.data_type
);
assert_eq!(None, add_column.location);
}
#[test]
fn test_alter_expr_with_location_to_request() {
let expr = AlterExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "monitor".to_string(),
kind: Some(Kind::AddColumns(AddColumns {
add_columns: vec![
AddColumn {
column_def: Some(ColumnDef {
name: "mem_usage".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
default_constraint: vec![],
}),
is_key: false,
location: Some(Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
}),
},
AddColumn {
column_def: Some(ColumnDef {
name: "cpu_usage".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
default_constraint: vec![],
}),
is_key: false,
location: Some(Location {
location_type: LocationType::After.into(),
after_cloumn_name: "ts".to_string(),
}),
},
],
})),
};
let alter_request = alter_expr_to_request(expr).unwrap();
assert_eq!(alter_request.catalog_name, "");
assert_eq!(alter_request.schema_name, "");
assert_eq!("monitor".to_string(), alter_request.table_name);
let mut add_columns = match alter_request.alter_kind {
AlterKind::AddColumns { columns } => columns,
_ => unreachable!(),
};
let add_column = add_columns.pop().unwrap();
assert!(!add_column.is_key);
assert_eq!("cpu_usage", add_column.column_schema.name);
assert_eq!(
ConcreteDataType::float64_datatype(),
add_column.column_schema.data_type
);
assert_eq!(
Some(AddColumnLocation::After {
column_name: "ts".to_string()
}),
add_column.location
);
let add_column = add_columns.pop().unwrap();
assert!(!add_column.is_key);
assert_eq!("mem_usage", add_column.column_schema.name);
assert_eq!(
ConcreteDataType::float64_datatype(),
add_column.column_schema.data_type
);
assert_eq!(Some(AddColumnLocation::First), add_column.location);
}
#[test]

View File

@@ -83,6 +83,12 @@ pub enum Error {
#[snafu(display("The column name already exists, column: {}", column))]
ColumnAlreadyExists { column: String, location: Location },
#[snafu(display("Unknown location type: {}", location_type))]
UnknownLocationType {
location_type: i32,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -103,9 +109,9 @@ impl ErrorExt for Error {
Error::MissingField { .. } => StatusCode::InvalidArguments,
Error::InvalidColumnDef { source, .. } => source.status_code(),
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
Error::UnexpectedValuesLength { .. } | Error::ColumnAlreadyExists { .. } => {
StatusCode::InvalidArguments
}
Error::UnexpectedValuesLength { .. }
| Error::ColumnAlreadyExists { .. }
| Error::UnknownLocationType { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
api = { path = "../../api" }
async-trait.workspace = true
common-error = { path = "../error" }
common-recordbatch = { path = "../recordbatch" }
@@ -13,6 +14,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes = { path = "../../datatypes" }
serde.workspace = true
snafu.workspace = true
statrs = "0.16"

View File

@@ -14,7 +14,10 @@
use std::fmt::{Debug, Formatter};
use api::greptime_proto::v1::add_column::location::LocationType;
use api::greptime_proto::v1::add_column::Location;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use serde::{Deserialize, Serialize};
pub mod columnar_value;
pub mod error;
@@ -44,3 +47,24 @@ impl Debug for Output {
}
pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AddColumnLocation {
First,
After { column_name: String },
}
impl From<&AddColumnLocation> for Location {
fn from(value: &AddColumnLocation) -> Self {
match value {
AddColumnLocation::First => Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
},
AddColumnLocation::After { column_name } => Location {
location_type: LocationType::After.into(),
after_cloumn_name: column_name.to_string(),
},
}
}
}

View File

@@ -291,6 +291,8 @@ async fn new_dummy_catalog_list(
#[cfg(test)]
mod test {
use api::v1::add_column::location::LocationType;
use api::v1::add_column::Location;
use api::v1::column::{SemanticType, Values};
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
@@ -364,16 +366,44 @@ mod test {
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
kind: Some(alter_expr::Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
location: None,
}],
add_columns: vec![
AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
location: None,
},
AddColumn {
column_def: Some(ColumnDef {
name: "c".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
location: Some(Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
}),
},
AddColumn {
column_def: Some(ColumnDef {
name: "d".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
location: Some(Location {
location_type: LocationType::After.into(),
after_cloumn_name: "a".to_string(),
}),
},
],
})),
})),
});
@@ -389,15 +419,15 @@ mod test {
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let output = exec_selection(instance, "SELECT ts, a, b FROM my_database.my_table").await;
let output = exec_selection(instance, "SELECT * FROM my_database.my_table").await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2022-12-30T07:09:00 | s | 1 |
+---------------------+---+---+";
+---+---+---+---------------------+---+
| c | a | d | ts | b |
+---+---+---+---------------------+---+
| | s | | 2022-12-30T07:09:00 | 1 |
+---+---+---+---------------------+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}

View File

@@ -68,12 +68,16 @@ impl SqlHandler {
}
.fail()
}
AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumns {
AlterTableOperation::AddColumn {
column_def,
location,
} => AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: column_def_to_schema(column_def, false)
.context(error::ParseSqlSnafu)?,
// FIXME(dennis): supports adding key column
is_key: false,
location: location.clone(),
}],
},
AlterTableOperation::DropColumn { name } => AlterKind::DropColumns {

View File

@@ -291,7 +291,10 @@ pub(crate) fn to_alter_expr(
}
.fail();
}
AlterTableOperation::AddColumn { column_def } => Kind::AddColumns(AddColumns {
AlterTableOperation::AddColumn {
column_def,
location,
} => Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(
sql_column_def_to_grpc_column_def(column_def)
@@ -299,7 +302,7 @@ pub(crate) fn to_alter_expr(
.context(ExternalSnafu)?,
),
is_key: false,
location: None,
location: location.as_ref().map(From::from),
}],
}),
AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns {

View File

@@ -321,6 +321,7 @@ mod tests {
use super::*;
use crate::engine::procedure::procedure_test_util::{self, TestEnv};
use crate::engine::tests::new_add_columns_req_with_location;
use crate::table::test_util;
fn new_add_columns_req() -> AlterTableRequest {
@@ -331,10 +332,12 @@ mod tests {
AddColumnRequest {
column_schema: new_tag,
is_key: true,
location: None,
},
AddColumnRequest {
column_schema: new_field,
is_key: false,
location: None,
},
],
};
@@ -394,6 +397,44 @@ mod tests {
assert!(new_schema.column_schema_by_name("my_field").is_some());
assert_eq!(new_schema.version(), schema.version() + 1);
assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 2);
// Alter the table.
let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
let new_field = ColumnSchema::new(
"my_field_after_ts",
ConcreteDataType::string_datatype(),
true,
);
let request = new_add_columns_req_with_location(&new_tag, &new_field);
let mut procedure = table_engine
.alter_table_procedure(&engine_ctx, request.clone())
.unwrap();
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
// Validate.
let table = table_engine
.get_table(&engine_ctx, &table_ref)
.unwrap()
.unwrap();
let new_info = table.table_info();
let new_meta = &new_info.meta;
let new_schema = &new_meta.schema;
assert_eq!(&[0, 1, 6], &new_meta.primary_key_indices[..]);
assert_eq!(&[2, 3, 4, 5, 7], &new_meta.value_indices[..]);
assert!(new_schema.column_schema_by_name("my_tag_first").is_some());
assert!(new_schema
.column_schema_by_name("my_field_after_ts")
.is_some());
assert_eq!(new_schema.version(), schema.version() + 2);
assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 4);
assert_eq!(new_schema.column_index_by_name("my_tag_first").unwrap(), 0);
assert_eq!(
new_schema
.column_index_by_name("my_field_after_ts")
.unwrap(),
new_schema.column_index_by_name("ts").unwrap() + 1
);
}
#[tokio::test]

View File

@@ -547,10 +547,39 @@ fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> Alte
AddColumnRequest {
column_schema: new_tag.clone(),
is_key: true,
location: None,
},
AddColumnRequest {
column_schema: new_field.clone(),
is_key: false,
location: None,
},
],
},
}
}
pub(crate) fn new_add_columns_req_with_location(
new_tag: &ColumnSchema,
new_field: &ColumnSchema,
) -> AlterTableRequest {
AlterTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::AddColumns {
columns: vec![
AddColumnRequest {
column_schema: new_tag.clone(),
is_key: true,
location: Some(common_query::AddColumnLocation::First),
},
AddColumnRequest {
column_schema: new_field.clone(),
is_key: false,
location: Some(common_query::AddColumnLocation::After {
column_name: "ts".to_string(),
}),
},
],
},
@@ -597,6 +626,29 @@ async fn test_alter_table_add_column() {
assert_eq!(new_schema.version(), old_schema.version() + 1);
assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 2);
assert_eq!(new_meta.region_numbers, old_meta.region_numbers);
let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
let new_field = ColumnSchema::new(
"my_field_after_ts",
ConcreteDataType::string_datatype(),
true,
);
let req = new_add_columns_req_with_location(&new_tag, &new_field);
let table = table_engine
.alter_table(&EngineContext::default(), req)
.await
.unwrap();
let new_info = table.table_info();
let new_meta = &new_info.meta;
let new_schema = &new_meta.schema;
assert_eq!(&[0, 1, 6], &new_meta.primary_key_indices[..]);
assert_eq!(&[2, 3, 4, 5, 7], &new_meta.value_indices[..]);
assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column());
assert_eq!(new_schema.version(), old_schema.version() + 2);
assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 4);
assert_eq!(new_meta.region_numbers, old_meta.region_numbers);
}
#[tokio::test]

View File

@@ -10,6 +10,7 @@ common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-datasource = { path = "../common/datasource" }
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-time = { path = "../common/time" }
datafusion-sql.workspace = true
datatypes = { path = "../datatypes" }

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::AddColumnLocation;
use snafu::ResultExt;
use sqlparser::keywords::Keyword;
use sqlparser::parser::ParserError;
use sqlparser::tokenizer::Token;
use crate::error::{self, Result};
use crate::parser::ParserContext;
@@ -41,7 +43,25 @@ impl<'a> ParserContext<'a> {
} else {
let _ = parser.parse_keyword(Keyword::COLUMN);
let column_def = parser.parse_column_def()?;
AlterTableOperation::AddColumn { column_def }
let location = if parser.parse_keyword(Keyword::FIRST) {
Some(AddColumnLocation::First)
} else if let Token::Word(word) = parser.peek_token().token {
if word.value.to_ascii_uppercase() == "AFTER" {
parser.next_token();
let name = parser.parse_identifier()?;
Some(AddColumnLocation::After {
column_name: name.value,
})
} else {
None
}
} else {
None
};
AlterTableOperation::AddColumn {
column_def,
location,
}
}
} else if parser.parse_keyword(Keyword::DROP) {
if parser.parse_keyword(Keyword::COLUMN) {
@@ -98,13 +118,90 @@ mod tests {
let alter_operation = alter_table.alter_operation();
assert_matches!(alter_operation, AlterTableOperation::AddColumn { .. });
match alter_operation {
AlterTableOperation::AddColumn { column_def } => {
AlterTableOperation::AddColumn {
column_def,
location,
} => {
assert_eq!("tagk_i", column_def.name.value);
assert_eq!(DataType::String, column_def.data_type);
assert!(column_def
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Null)));
assert_eq!(&None, location);
}
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
#[test]
fn test_parse_alter_add_column_with_first() {
let sql = "ALTER TABLE my_metric_1 ADD tagk_i STRING Null FIRST;";
let mut result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
assert_matches!(statement, Statement::Alter { .. });
match statement {
Statement::Alter(alter_table) => {
assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
let alter_operation = alter_table.alter_operation();
assert_matches!(alter_operation, AlterTableOperation::AddColumn { .. });
match alter_operation {
AlterTableOperation::AddColumn {
column_def,
location,
} => {
assert_eq!("tagk_i", column_def.name.value);
assert_eq!(DataType::String, column_def.data_type);
assert!(column_def
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Null)));
assert_eq!(&Some(AddColumnLocation::First), location);
}
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
#[test]
fn test_parse_alter_add_column_with_after() {
let sql = "ALTER TABLE my_metric_1 ADD tagk_i STRING Null AFTER ts;";
let mut result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
assert_matches!(statement, Statement::Alter { .. });
match statement {
Statement::Alter(alter_table) => {
assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
let alter_operation = alter_table.alter_operation();
assert_matches!(alter_operation, AlterTableOperation::AddColumn { .. });
match alter_operation {
AlterTableOperation::AddColumn {
column_def,
location,
} => {
assert_eq!("tagk_i", column_def.name.value);
assert_eq!(DataType::String, column_def.data_type);
assert!(column_def
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Null)));
assert_eq!(
&Some(AddColumnLocation::After {
column_name: "ts".to_string()
}),
location
);
}
_ => unreachable!(),
}

View File

@@ -28,7 +28,10 @@ pub mod tql;
use std::str::FromStr;
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column::location::LocationType;
use api::v1::add_column::Location;
use common_base::bytes::Bytes;
use common_query::AddColumnLocation;
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, COMMENT_KEY};
@@ -397,6 +400,22 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu
}
}
pub fn sql_location_to_grpc_add_column_location(
location: &Option<AddColumnLocation>,
) -> Option<api::v1::add_column::Location> {
match location {
Some(AddColumnLocation::First) => Some(Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
}),
Some(AddColumnLocation::After { column_name }) => Some(Location {
location_type: LocationType::After.into(),
after_cloumn_name: column_name.to_string(),
}),
None => None,
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::AddColumnLocation;
use sqlparser::ast::{ColumnDef, Ident, ObjectName, TableConstraint};
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -41,8 +42,11 @@ impl AlterTable {
pub enum AlterTableOperation {
/// `ADD <table_constraint>`
AddConstraint(TableConstraint),
/// `ADD [ COLUMN ] <column_def>`
AddColumn { column_def: ColumnDef },
/// `ADD [ COLUMN ] <column_def> [location]`
AddColumn {
column_def: ColumnDef,
location: Option<AddColumnLocation>,
},
/// `DROP COLUMN <name>`
DropColumn { name: Ident },
/// `RENAME <new_table_name>`

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::AddColumnLocation;
use datafusion_expr::TableProviderFilterPushDown;
pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef};
@@ -140,6 +141,18 @@ impl TableMetaBuilder {
}
}
/// The result after splitting requests by column location info.
struct SplitResult<'a> {
/// column requests should be added at first place.
columns_at_first: Vec<&'a AddColumnRequest>,
/// column requests should be added after already exist columns.
columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
/// column requests should be added at last place.
columns_at_last: Vec<&'a AddColumnRequest>,
/// all column names should be added.
column_names: Vec<String>,
}
impl TableMeta {
pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
let columns_schemas = &self.schema.column_schemas();
@@ -231,33 +244,49 @@ impl TableMeta {
) -> Result<TableMetaBuilder> {
let table_schema = &self.schema;
let mut meta_builder = self.new_meta_builder();
let original_primary_key_indices: HashSet<&usize> =
self.primary_key_indices.iter().collect();
// Check whether columns to add are already existing.
for request in requests {
let column_name = &request.column_schema.name;
ensure!(
table_schema.column_schema_by_name(column_name).is_none(),
error::ColumnExistsSnafu {
column_name,
table_name,
}
);
}
// Collect names of columns to add for error message.
let mut column_names = Vec::with_capacity(requests.len());
let mut primary_key_indices = self.primary_key_indices.clone();
let SplitResult {
columns_at_first,
columns_at_after,
columns_at_last,
column_names,
} = self.split_requests_by_column_location(table_name, requests)?;
let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
columns.extend_from_slice(table_schema.column_schemas());
// Append new columns to the end of column list.
for request in requests {
column_names.push(request.column_schema.name.clone());
// add new columns with FIRST, and in reverse order of requests.
columns_at_first.iter().rev().for_each(|request| {
if request.is_key {
// If a key column is added, we also need to store its index in primary_key_indices.
primary_key_indices.push(columns.len());
}
columns.push(request.column_schema.clone());
});
// add existed columns in original order and handle new columns with AFTER.
for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
if original_primary_key_indices.contains(&index) {
primary_key_indices.push(columns.len());
}
columns.push(column_schema.clone());
if let Some(requests) = columns_at_after.get(&column_schema.name) {
requests.iter().rev().for_each(|request| {
if request.is_key {
// If a key column is added, we also need to store its index in primary_key_indices.
primary_key_indices.push(columns.len());
}
columns.push(request.column_schema.clone());
});
}
}
// add new columns without location info to last.
columns_at_last.iter().for_each(|request| {
if request.is_key {
// If a key column is added, we also need to store its index in primary_key_indices.
primary_key_indices.push(columns.len());
}
columns.push(request.column_schema.clone());
});
let mut builder = SchemaBuilder::try_from(columns)
.with_context(|_| error::SchemaBuildSnafu {
@@ -358,6 +387,58 @@ impl TableMeta {
Ok(meta_builder)
}
/// Split requests into different groups using column location info.
fn split_requests_by_column_location<'a>(
&self,
table_name: &str,
requests: &'a [AddColumnRequest],
) -> Result<SplitResult<'a>> {
let table_schema = &self.schema;
let mut columns_at_first = Vec::new();
let mut columns_at_after = HashMap::new();
let mut columns_at_last = Vec::new();
let mut column_names = Vec::with_capacity(requests.len());
for request in requests {
// Check whether columns to add are already existing.
let column_name = &request.column_schema.name;
column_names.push(column_name.clone());
ensure!(
table_schema.column_schema_by_name(column_name).is_none(),
error::ColumnExistsSnafu {
column_name,
table_name,
}
);
match request.location.as_ref() {
Some(AddColumnLocation::First) => {
columns_at_first.push(request);
}
Some(AddColumnLocation::After { column_name }) => {
ensure!(
table_schema.column_schema_by_name(column_name).is_some(),
error::ColumnNotExistsSnafu {
column_name,
table_name,
}
);
columns_at_after
.entry(column_name.clone())
.or_insert(Vec::new())
.push(request);
}
None => {
columns_at_last.push(request);
}
}
}
Ok(SplitResult {
columns_at_first,
columns_at_after,
columns_at_last,
column_names,
})
}
}
#[derive(Clone, Debug, PartialEq, Eq, Builder)]
@@ -568,10 +649,42 @@ mod tests {
AddColumnRequest {
column_schema: new_tag,
is_key: true,
location: None,
},
AddColumnRequest {
column_schema: new_field,
is_key: false,
location: None,
},
],
};
let builder = meta
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap();
builder.build().unwrap()
}
fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
let new_field = ColumnSchema::new(
"my_field_after_ts",
ConcreteDataType::string_datatype(),
true,
);
let alter_kind = AlterKind::AddColumns {
columns: vec![
AddColumnRequest {
column_schema: new_tag,
is_key: true,
location: Some(AddColumnLocation::First),
},
AddColumnRequest {
column_schema: new_field,
is_key: false,
location: Some(AddColumnLocation::After {
column_name: "ts".to_string(),
}),
},
],
};
@@ -714,6 +827,7 @@ mod tests {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
is_key: false,
location: None,
}],
};
@@ -798,4 +912,32 @@ mod tests {
assert_eq!(4, meta.next_column_id);
assert_eq!(column_schema.name, desc.name);
}
#[test]
fn test_add_columns_with_location() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
let new_meta = add_columns_to_meta_with_location(&meta);
assert_eq!(meta.region_numbers, new_meta.region_numbers);
let names: Vec<String> = new_meta
.schema
.column_schemas()
.iter()
.map(|column_schema| column_schema.name.clone())
.collect();
assert_eq!(
&["my_tag_first", "col1", "ts", "my_field_after_ts", "col2"],
&names[..]
);
assert_eq!(&[0, 1], &new_meta.primary_key_indices[..]);
assert_eq!(&[2, 3, 4], &new_meta.value_indices[..]);
}
}

View File

@@ -19,6 +19,7 @@ use std::str::FromStr;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_query::AddColumnLocation;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, RawSchema};
use serde::{Deserialize, Serialize};
@@ -207,6 +208,7 @@ impl AlterTableRequest {
pub struct AddColumnRequest {
pub column_schema: ColumnSchema,
pub is_key: bool,
pub location: Option<AddColumnLocation>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -0,0 +1,174 @@
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
Affected Rows: 0
DESC TABLE t;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+-------+------+---------+---------------+
| i | Int32 | YES | | FIELD |
| j | Int64 | NO | | TIME INDEX |
+-------+-------+------+---------+---------------+
ALTER TABLE t ADD COLUMN k INTEGER;
Affected Rows: 0
DESC TABLE t;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+-------+------+---------+---------------+
| i | Int32 | YES | | FIELD |
| j | Int64 | NO | | TIME INDEX |
| k | Int32 | YES | | FIELD |
+-------+-------+------+---------+---------------+
-- SQLNESS ARG restart=true
DESC TABLE t;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+-------+------+---------+---------------+
| i | Int32 | YES | | FIELD |
| j | Int64 | NO | | TIME INDEX |
| k | Int32 | YES | | FIELD |
+-------+-------+------+---------+---------------+
ALTER TABLE t ADD COLUMN m INTEGER;
Affected Rows: 0
DESC TABLE t;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+-------+------+---------+---------------+
| i | Int32 | YES | | FIELD |
| j | Int64 | NO | | TIME INDEX |
| k | Int32 | YES | | FIELD |
| m | Int32 | YES | | FIELD |
+-------+-------+------+---------+---------------+
INSERT INTO t VALUES (1, 2, 3, 4);
Affected Rows: 1
SELECT * FROM t;
+---+---+---+---+
| i | j | k | m |
+---+---+---+---+
| 1 | 2 | 3 | 4 |
+---+---+---+---+
ALTER TABLE t ADD COLUMN n INTEGER FIRST;
Affected Rows: 0
DESC TABLE t;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+-------+------+---------+---------------+
| n | Int32 | YES | | FIELD |
| i | Int32 | YES | | FIELD |
| j | Int64 | NO | | TIME INDEX |
| k | Int32 | YES | | FIELD |
| m | Int32 | YES | | FIELD |
+-------+-------+------+---------+---------------+
SELECT * FROM t;
+---+---+---+---+---+
| n | i | j | k | m |
+---+---+---+---+---+
| | 1 | 2 | 3 | 4 |
+---+---+---+---+---+
INSERT INTO t VALUES (2, 3, 4, 5, 6);
Affected Rows: 1
ALTER TABLE t ADD COLUMN y INTEGER AFTER j;
Affected Rows: 0
DESC TABLE t;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+-------+------+---------+---------------+
| n | Int32 | YES | | FIELD |
| i | Int32 | YES | | FIELD |
| j | Int64 | NO | | TIME INDEX |
| y | Int32 | YES | | FIELD |
| k | Int32 | YES | | FIELD |
| m | Int32 | YES | | FIELD |
+-------+-------+------+---------+---------------+
SELECT * FROM t;
+---+---+---+---+---+---+
| n | i | j | y | k | m |
+---+---+---+---+---+---+
| | 1 | 2 | | 3 | 4 |
| 2 | 3 | 4 | | 5 | 6 |
+---+---+---+---+---+---+
-- SQLNESS ARG restart=true
ALTER TABLE t ADD COLUMN a INTEGER FIRST;
Affected Rows: 0
DESC TABLE t;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+-------+------+---------+---------------+
| a | Int32 | YES | | FIELD |
| n | Int32 | YES | | FIELD |
| i | Int32 | YES | | FIELD |
| j | Int64 | NO | | TIME INDEX |
| y | Int32 | YES | | FIELD |
| k | Int32 | YES | | FIELD |
| m | Int32 | YES | | FIELD |
+-------+-------+------+---------+---------------+
ALTER TABLE t ADD COLUMN b INTEGER AFTER j;
Affected Rows: 0
DESC TABLE t;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+-------+------+---------+---------------+
| a | Int32 | YES | | FIELD |
| n | Int32 | YES | | FIELD |
| i | Int32 | YES | | FIELD |
| j | Int64 | NO | | TIME INDEX |
| b | Int32 | YES | | FIELD |
| y | Int32 | YES | | FIELD |
| k | Int32 | YES | | FIELD |
| m | Int32 | YES | | FIELD |
+-------+-------+------+---------+---------------+
SELECT * FROM t;
+---+---+---+---+---+---+---+---+
| a | n | i | j | b | y | k | m |
+---+---+---+---+---+---+---+---+
| | | 1 | 2 | | | 3 | 4 |
| | 2 | 3 | 4 | | | 5 | 6 |
+---+---+---+---+---+---+---+---+
ALTER TABLE t ADD COLUMN x int xxx;
Error: 1001(Unsupported), SQL statement is not supported: ALTER TABLE t ADD COLUMN x int xxx;, keyword: xxx
DROP TABLE t;
Affected Rows: 1

View File

@@ -0,0 +1,47 @@
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
DESC TABLE t;
ALTER TABLE t ADD COLUMN k INTEGER;
DESC TABLE t;
-- SQLNESS ARG restart=true
DESC TABLE t;
ALTER TABLE t ADD COLUMN m INTEGER;
DESC TABLE t;
INSERT INTO t VALUES (1, 2, 3, 4);
SELECT * FROM t;
ALTER TABLE t ADD COLUMN n INTEGER FIRST;
DESC TABLE t;
SELECT * FROM t;
INSERT INTO t VALUES (2, 3, 4, 5, 6);
ALTER TABLE t ADD COLUMN y INTEGER AFTER j;
DESC TABLE t;
SELECT * FROM t;
-- SQLNESS ARG restart=true
ALTER TABLE t ADD COLUMN a INTEGER FIRST;
DESC TABLE t;
ALTER TABLE t ADD COLUMN b INTEGER AFTER j;
DESC TABLE t;
SELECT * FROM t;
ALTER TABLE t ADD COLUMN x int xxx;
DROP TABLE t;