feat: mito table supports RemoveColumns alter kind (#395)

* feat: Support removing columns from mito table

Implements drop column for mito table engine, and adjusts the execution
order of altering table, persists the table manifest first, then alter
the schema of the region.

* feat(storage): Remove duplicate table_info() impl

Table already provides a table_info() now, some downcast in tests are
also no longer needed.

* test: Add tests for add/remove columns

* style(table): Fix clippy

* fix: Find timestamp index by its column name

Previous implementation updates the timestamp index too early, which
would cause the index check that compare the index to remove with
timestamp index failed.

* chore: Remove generated comment in Cargo.toml

* chore: Rename alter to builder_with_alter_kind

* refactor: Alloc new column from TableMeta

* style: Fix clippy
This commit is contained in:
Yingwen
2022-11-09 11:50:02 +08:00
committed by GitHub
parent 2c9bcbe885
commit cf4e876e51
7 changed files with 691 additions and 277 deletions

View File

@@ -2,7 +2,6 @@
name = "benchmarks"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow = "10"

View File

@@ -19,6 +19,7 @@ use table::Result as TableResult;
use table::{
metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion},
table::TableRef,
Table,
};
use tokio::sync::Mutex;
@@ -740,10 +741,6 @@ mod tests {
let (_engine, table_engine, table, _object_store, _dir) =
test_util::setup_mock_engine_and_table().await;
let table = table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap();
let table_info = table.table_info();
let request = CreateTableRequest {
@@ -760,14 +757,7 @@ mod tests {
};
let created_table = table_engine.create_table(&ctx, request).await.unwrap();
assert_eq!(
table_info,
created_table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap()
.table_info()
);
assert_eq!(table_info, created_table.table_info());
// test create_if_not_exists=false
let request = CreateTableRequest {
@@ -826,10 +816,6 @@ mod tests {
.unwrap();
assert_eq!(table.schema(), reopened.schema());
let table = table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap();
let reopened = reopened
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
@@ -850,30 +836,92 @@ mod tests {
assert_eq!(18446744069414584330, region_id(u32::MAX, 10));
}
fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> AlterTableRequest {
AlterTableRequest {
catalog_name: None,
schema_name: None,
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::AddColumns {
columns: vec![
AddColumnRequest {
column_schema: new_tag.clone(),
is_key: true,
},
AddColumnRequest {
column_schema: new_field.clone(),
is_key: false,
},
],
},
}
}
#[tokio::test]
async fn test_alter_table_add_column() {
let (_engine, table_engine, table, _object_store, _dir) =
test_util::setup_mock_engine_and_table().await;
let table = table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap();
let table_info = table.table_info();
let old_info = (*table_info).clone();
let old_info = table.table_info();
let old_meta = &old_info.meta;
let old_schema = &old_meta.schema;
let new_column = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
let req = new_add_columns_req(&new_tag, &new_field);
let table = table_engine
.alter_table(&EngineContext::default(), req)
.await
.unwrap();
let new_info = table.table_info();
let new_meta = &new_info.meta;
let new_schema = &new_meta.schema;
assert_eq!(&[0, 4], &new_meta.primary_key_indices[..]);
assert_eq!(&[1, 2, 3, 5], &new_meta.value_indices[..]);
assert_eq!(new_schema.num_columns(), old_schema.num_columns() + 2);
assert_eq!(
&new_schema.column_schemas()[..old_schema.num_columns()],
old_schema.column_schemas()
);
assert_eq!(
&new_schema.column_schemas()[old_schema.num_columns()],
&new_tag
);
assert_eq!(
&new_schema.column_schemas()[old_schema.num_columns() + 1],
&new_field
);
assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column());
assert_eq!(new_schema.version(), old_schema.version() + 1);
assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 2);
}
#[tokio::test]
async fn test_alter_table_remove_column() {
let (_engine, table_engine, _table, _object_store, _dir) =
test_util::setup_mock_engine_and_table().await;
// Add two columns to the table first.
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
let req = new_add_columns_req(&new_tag, &new_field);
let table = table_engine
.alter_table(&EngineContext::default(), req)
.await
.unwrap();
let old_info = table.table_info();
let old_meta = &old_info.meta;
let old_schema = &old_meta.schema;
// Then remove memory and my_field from the table.
let req = AlterTableRequest {
catalog_name: None,
schema_name: None,
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: new_column.clone(),
is_key: false,
}],
alter_kind: AlterKind::RemoveColumns {
names: vec![String::from("memory"), String::from("my_field")],
},
};
let table = table_engine
@@ -881,21 +929,20 @@ mod tests {
.await
.unwrap();
let table = table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap();
let new_info = table.table_info();
let new_meta = &new_info.meta;
let new_schema = &new_meta.schema;
assert_eq!(new_schema.num_columns(), old_schema.num_columns() + 1);
assert_eq!(
new_schema.column_schemas().split_last().unwrap(),
(&new_column, old_schema.column_schemas())
);
assert_eq!(new_schema.num_columns(), old_schema.num_columns() - 2);
let remaining_names: Vec<String> = new_schema
.column_schemas()
.iter()
.map(|column_schema| column_schema.name.clone())
.collect();
assert_eq!(&["host", "cpu", "ts", "my_tag"], &remaining_names[..]);
assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
assert_eq!(&[1, 2], &new_meta.value_indices[..]);
assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column());
assert_eq!(new_schema.version(), old_schema.version() + 1);
assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 1);
}
}

View File

@@ -136,13 +136,6 @@ pub enum Error {
table_name: String,
},
#[snafu(display("Column {} already exists in table {}", column_name, table_name))]
ColumnExists {
backtrace: Backtrace,
column_name: String,
table_name: String,
},
#[snafu(display("Columns {} not exist in table {}", column_names.join(","), table_name))]
ColumnsNotExist {
backtrace: Backtrace,
@@ -150,13 +143,6 @@ pub enum Error {
table_name: String,
},
#[snafu(display("Failed to build schema, msg: {}, source: {}", msg, source))]
SchemaBuild {
#[snafu(backtrace)]
source: datatypes::error::Error,
msg: String,
},
#[snafu(display("Failed to alter table {}, source: {}", table_name, source))]
AlterTable {
table_name: String,
@@ -206,8 +192,6 @@ impl ErrorExt for Error {
AlterTable { source, .. } => source.status_code(),
SchemaBuild { source, .. } => source.status_code(),
BuildRowKeyDescriptor { .. }
| BuildColumnDescriptor { .. }
| BuildColumnFamilyDescriptor { .. }
@@ -220,8 +204,6 @@ impl ErrorExt for Error {
| UnsupportedDefaultConstraint { .. }
| TableNotFound { .. } => StatusCode::InvalidArguments,
ColumnExists { .. } => StatusCode::TableColumnExists,
ColumnsNotExist { .. } => StatusCode::TableColumnNotFound,
TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected,

View File

@@ -12,7 +12,7 @@ use common_query::physical_plan::PhysicalPlanRef;
use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream};
use common_telemetry::logging;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use datatypes::schema::ColumnSchema;
use datatypes::vectors::VectorRef;
use futures::task::{Context, Poll};
use futures::Stream;
@@ -20,23 +20,22 @@ use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, ChunkReader, ColumnDescriptorBuilder, PutOperation,
ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
AddColumn, AlterOperation, AlterRequest, ChunkReader, PutOperation, ReadContext, Region,
RegionMeta, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
};
use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult};
use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder};
use table::metadata::{FilterPushDownType, TableInfoRef};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest};
use table::table::scan::SimpleTableScan;
use table::{
metadata::{RawTableInfo, TableInfo, TableType},
metadata::{RawTableInfo, TableInfo, TableMeta, TableType},
table::Table,
};
use tokio::sync::Mutex;
use crate::error::{
self, ColumnsNotExistSnafu, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu,
SchemaBuildSnafu, TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu,
UpdateTableManifestSnafu,
TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu, UpdateTableManifestSnafu,
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
@@ -186,100 +185,26 @@ impl<R: Region> Table for MitoTable<R> {
Ok(Arc::new(SimpleTableScan::new(stream)))
}
// Alter table changes the schemas of the table. The altering happens as cloning a new schema,
// change the new one, and swap the old. Though we can change the schema in place, considering
// the complex interwinding of inner data representation of schema, I think it's safer to
// change it like this to avoid partial inconsistent during the altering. For example, schema's
// `name_to_index` field must changed with `column_schemas` synchronously. If we add or remove
// columns from `column_schemas` *and then* update the `name_to_index`, there's a slightly time
// window of an inconsistency of the two field, which might bring some hard to trace down
// concurrency related bugs or failures. (Of course we could introduce some guards like readwrite
// lock to protect the consistency of schema altering, but that would hurt the performance of
// schema reads, and the reads are the dominant operation of schema. At last, altering is
// performed far lesser frequent.)
/// Alter table changes the schemas of the table.
async fn alter(&self, req: AlterTableRequest) -> TableResult<()> {
let _lock = self.alter_lock.lock().await;
let table_info = self.table_info();
let table_name = &table_info.name;
let table_meta = &table_info.meta;
let (alter_op, table_schema, new_columns_num) = match &req.alter_kind {
AlterKind::AddColumns {
columns: new_columns,
} => {
let columns = new_columns
.iter()
.enumerate()
.map(|(i, add_column)| {
let new_column = &add_column.column_schema;
let desc = ColumnDescriptorBuilder::new(
table_meta.next_column_id + i as u32,
&new_column.name,
new_column.data_type.clone(),
)
.is_nullable(new_column.is_nullable())
.default_constraint(new_column.default_constraint().cloned())
.build()
.context(error::BuildColumnDescriptorSnafu {
table_name,
column_name: &new_column.name,
})?;
Ok(AddColumn {
desc,
is_key: add_column.is_key,
})
})
.collect::<Result<Vec<_>>>()?;
let alter_op = AlterOperation::AddColumns { columns };
// TODO(yingwen): [alter] Better way to alter the schema struct. In fact the column order
// in table schema could differ from the region schema, so we could just push this column
// to the back of the schema (as last column).
let table_schema = build_table_schema_with_new_columns(
table_name,
&table_meta.schema,
new_columns,
)?;
(alter_op, table_schema, new_columns.len() as u32)
}
// TODO(dennis): supports removing columns etc.
_ => unimplemented!(),
};
let new_meta = TableMetaBuilder::default()
.schema(table_schema.clone())
.engine(&table_meta.engine)
.next_column_id(table_meta.next_column_id + new_columns_num) // Bump next column id.
.primary_key_indices(table_meta.primary_key_indices.clone())
let mut new_meta = table_meta
.builder_with_alter_kind(table_name, &req.alter_kind)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
let alter_op = create_alter_operation(table_name, &req.alter_kind, &mut new_meta)?;
let mut new_info = TableInfo::clone(&*table_info);
// Increase version of the table.
new_info.ident.version = table_info.ident.version + 1;
new_info.meta = new_meta;
// FIXME(yingwen): [alter] Alter the region, now this is a temporary implementation.
let region = self.region();
let region_meta = region.in_memory_metadata();
let alter_req = AlterRequest {
operation: alter_op,
version: region_meta.version(),
};
logging::debug!(
"start altering region {} of table {}, with request {:?}",
region.name(),
table_name,
alter_req,
);
region.alter(alter_req).await.map_err(TableError::new)?;
// then alter table info
// Persist the alteration to the manifest.
logging::debug!(
"start updating the manifest of table {} with new table info {:?}",
table_name,
@@ -295,13 +220,27 @@ impl<R: Region> Table for MitoTable<R> {
.context(UpdateTableManifestSnafu {
table_name: &self.table_info().name,
})?;
// TODO(yingwen): Error handling. Maybe the region need to provide a method to
// validate the request first.
let region = self.region();
let region_meta = region.in_memory_metadata();
let alter_req = AlterRequest {
operation: alter_op,
version: region_meta.version(),
};
// Alter the region.
logging::debug!(
"start altering region {} of table {}, with request {:?}",
region.name(),
table_name,
alter_req,
);
region.alter(alter_req).await.map_err(TableError::new)?;
// Update in memory metadata of the table.
self.set_table_info(new_info);
// TODO(LFC): Think of a way to properly handle the metadata integrity between region and table.
// Currently there are no "transactions" to alter the metadata of region and table together,
// they are altered in sequence. That means there might be cases where the metadata of region
// is altered while the table's is not. Then the metadata integrity between region and
// table cannot be hold.
Ok(())
}
@@ -310,47 +249,6 @@ impl<R: Region> Table for MitoTable<R> {
}
}
fn build_table_schema_with_new_columns(
table_name: &str,
table_schema: &SchemaRef,
new_columns: &[AddColumnRequest],
) -> Result<SchemaRef> {
let mut columns = table_schema.column_schemas().to_vec();
for add_column in new_columns {
let new_column = &add_column.column_schema;
if table_schema
.column_schema_by_name(&new_column.name)
.is_some()
{
return error::ColumnExistsSnafu {
column_name: &new_column.name,
table_name,
}
.fail()?;
}
columns.push(new_column.clone());
}
// Right now we are not support adding the column
// before or after some column, so just clone a new schema like this.
// TODO(LFC): support adding column before or after some column
let mut builder = SchemaBuilder::try_from_columns(columns)
.context(SchemaBuildSnafu {
msg: "Failed to convert column schemas into table schema",
})?
.timestamp_index(table_schema.timestamp_index())
.version(table_schema.version() + 1);
for (k, v) in table_schema.arrow_schema().metadata.iter() {
builder = builder.add_metadata(k, v);
}
let new_schema = Arc::new(builder.build().with_context(|_| error::SchemaBuildSnafu {
msg: format!("cannot add new columns {:?}", new_columns),
})?);
Ok(new_schema)
}
struct ChunkStream {
schema: SchemaRef,
stream: Pin<Box<dyn Stream<Item = RecordBatchResult<RecordBatch>> + Send>>,
@@ -449,7 +347,7 @@ impl<R: Region> MitoTable<R> {
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store);
// TODO(dennis): save manifest version into catalog?
// TODO(dennis): save manifest version into catalog?
let _manifest_version = manifest
.update(TableMetaActionList::with_action(TableMetaAction::Change(
Box::new(TableChange {
@@ -543,11 +441,6 @@ impl<R: Region> MitoTable<R> {
&self.region
}
#[inline]
pub fn table_info(&self) -> TableInfoRef {
Arc::clone(&self.table_info.load())
}
pub fn set_table_info(&self, table_info: TableInfo) {
self.table_info.swap(Arc::new(table_info));
}
@@ -563,57 +456,50 @@ impl<R: Region> MitoTable<R> {
}
}
/// Create [`AlterOperation`] according to given `alter_kind`.
fn create_alter_operation(
table_name: &str,
alter_kind: &AlterKind,
table_meta: &mut TableMeta,
) -> TableResult<AlterOperation> {
match alter_kind {
AlterKind::AddColumns { columns } => {
create_add_columns_operation(table_name, columns, table_meta)
}
AlterKind::RemoveColumns { names } => Ok(AlterOperation::DropColumns {
names: names.to_vec(),
}),
}
}
fn create_add_columns_operation(
table_name: &str,
requests: &[AddColumnRequest],
table_meta: &mut TableMeta,
) -> TableResult<AlterOperation> {
let columns = requests
.iter()
.map(|request| {
let new_column = &request.column_schema;
let desc = table_meta.alloc_new_column(table_name, new_column)?;
Ok(AddColumn {
desc,
is_key: request.is_key,
})
})
.collect::<TableResult<Vec<_>>>()?;
Ok(AlterOperation::AddColumns { columns })
}
#[cfg(test)]
mod tests {
use datatypes::prelude::ConcreteDataType;
use super::*;
use crate::table::test_util;
#[test]
fn test_table_manifest_dir() {
assert_eq!("demo/manifest/", table_manifest_dir("demo"));
assert_eq!("numbers/manifest/", table_manifest_dir("numbers"));
}
#[test]
fn test_build_table_schema_with_new_column() {
let table_info = test_util::build_test_table_info();
let table_name = &table_info.name;
let table_meta = &table_info.meta;
let table_schema = &table_meta.schema;
let new_column = ColumnSchema::new("host", ConcreteDataType::string_datatype(), true);
let new_columns = vec![AddColumnRequest {
column_schema: new_column,
is_key: false,
}];
let result = build_table_schema_with_new_columns(table_name, table_schema, &new_columns);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Column host already exists in table demo"));
let new_column = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_columns = vec![AddColumnRequest {
column_schema: new_column.clone(),
is_key: false,
}];
let new_schema =
build_table_schema_with_new_columns(table_name, table_schema, &new_columns).unwrap();
assert_eq!(new_schema.num_columns(), table_schema.num_columns() + 1);
assert_eq!(
new_schema.column_schemas().split_last().unwrap(),
(&new_column, table_schema.column_schemas())
);
assert_eq!(
new_schema.timestamp_column(),
table_schema.timestamp_column()
);
assert_eq!(new_schema.version(), table_schema.version() + 1);
}
}

View File

@@ -50,7 +50,8 @@ pub fn build_test_table_info() -> TableInfo {
.schema(Arc::new(schema_for_test()))
.engine(MITO_ENGINE)
.next_column_id(1)
.primary_key_indices(vec![0, 1])
// host is primary key column.
.primary_key_indices(vec![0])
.build()
.unwrap();
@@ -71,6 +72,21 @@ pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
(dir, ObjectStore::new(accessor))
}
fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
CreateTableRequest {
id: 1,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
desc: Some("a test table".to_string()),
schema,
region_numbers: vec![0],
create_if_not_exists: true,
primary_key_indices: vec![0],
table_options: HashMap::new(),
}
}
pub async fn setup_test_engine_and_table() -> (
MitoEngine<EngineImpl<NoopLogStore>>,
TableRef,
@@ -93,18 +109,7 @@ pub async fn setup_test_engine_and_table() -> (
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
id: 1,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
desc: Some("a test table".to_string()),
schema: schema.clone(),
create_if_not_exists: true,
primary_key_indices: Vec::default(),
table_options: HashMap::new(),
region_numbers: vec![0],
},
new_create_request(schema.clone()),
)
.await
.unwrap();
@@ -124,21 +129,7 @@ pub async fn setup_mock_engine_and_table(
let schema = Arc::new(schema_for_test());
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
id: 1,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
desc: None,
schema: schema.clone(),
create_if_not_exists: true,
primary_key_indices: Vec::default(),
table_options: HashMap::new(),
region_numbers: vec![0],
},
)
.create_table(&EngineContext::default(), new_create_request(schema))
.await
.unwrap();

View File

@@ -51,6 +51,51 @@ pub enum InnerError {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Column {} already exists in table {}", column_name, table_name))]
ColumnExists {
column_name: String,
table_name: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to build schema, msg: {}, source: {}", msg, source))]
SchemaBuild {
#[snafu(backtrace)]
source: datatypes::error::Error,
msg: String,
},
#[snafu(display("Column {} not exists in table {}", column_name, table_name))]
ColumnNotExists {
column_name: String,
table_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Not allowed to remove index column {} from table {}",
column_name,
table_name
))]
RemoveColumnInIndex {
column_name: String,
table_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to build column descriptor for table: {}, column: {}, source: {}",
table_name,
column_name,
source,
))]
BuildColumnDescriptor {
source: store_api::storage::ColumnDescriptorBuilderError,
table_name: String,
column_name: String,
backtrace: Backtrace,
},
}
impl ErrorExt for InnerError {
@@ -60,8 +105,13 @@ impl ErrorExt for InnerError {
| InnerError::PollStream { .. }
| InnerError::SchemaConversion { .. }
| InnerError::TableProjection { .. } => StatusCode::EngineExecuteQuery,
InnerError::MissingColumn { .. } => StatusCode::InvalidArguments,
InnerError::MissingColumn { .. }
| InnerError::RemoveColumnInIndex { .. }
| InnerError::BuildColumnDescriptor { .. } => StatusCode::InvalidArguments,
InnerError::TablesRecordBatch { .. } => StatusCode::Unexpected,
InnerError::ColumnExists { .. } => StatusCode::TableColumnExists,
InnerError::SchemaBuild { source, .. } => source.status_code(),
InnerError::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
}
}

View File

@@ -1,12 +1,16 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use chrono::{DateTime, Utc};
pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
use datatypes::schema::{RawSchema, Schema, SchemaRef};
use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef};
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use store_api::storage::ColumnId;
use snafu::{ensure, ResultExt};
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
use crate::error::{self, Result};
use crate::requests::{AddColumnRequest, AlterKind};
pub type TableId = u32;
pub type TableVersion = u64;
@@ -42,7 +46,10 @@ pub enum TableType {
/// Identifier of the table.
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
pub struct TableIdent {
/// Unique id of this table.
pub table_id: TableId,
/// Version of the table, bumped when metadata (such as schema) of the table
/// being changed.
pub version: TableVersion,
}
@@ -50,6 +57,8 @@ pub struct TableIdent {
#[builder(pattern = "mutable")]
pub struct TableMeta {
pub schema: SchemaRef,
/// The indices of columns in primary key. Note that the index of timestamp column
/// is not included in these indices.
pub primary_key_indices: Vec<usize>,
#[builder(default = "self.default_value_indices()?")]
pub value_indices: Vec<usize>,
@@ -69,7 +78,7 @@ pub struct TableMeta {
}
impl TableMetaBuilder {
fn default_value_indices(&self) -> Result<Vec<usize>, String> {
fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
match (&self.primary_key_indices, &self.schema) {
(Some(v), Some(schema)) => {
let column_schemas = schema.column_schemas();
@@ -96,13 +105,230 @@ impl TableMeta {
.iter()
.map(|idx| &columns_schemas[*idx].name)
}
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
///
/// The returned builder would derive the next column id of this meta.
pub fn builder_with_alter_kind(
&self,
table_name: &str,
alter_kind: &AlterKind,
) -> Result<TableMetaBuilder> {
match alter_kind {
AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
AlterKind::RemoveColumns { names } => self.remove_columns(table_name, names),
}
}
/// Allocate a new column for the table.
///
/// This method would bump the `next_column_id` of the meta.
pub fn alloc_new_column(
&mut self,
table_name: &str,
new_column: &ColumnSchema,
) -> Result<ColumnDescriptor> {
let desc = ColumnDescriptorBuilder::new(
self.next_column_id as ColumnId,
&new_column.name,
new_column.data_type.clone(),
)
.is_nullable(new_column.is_nullable())
.default_constraint(new_column.default_constraint().cloned())
.build()
.context(error::BuildColumnDescriptorSnafu {
table_name,
column_name: &new_column.name,
})?;
// Bump next column id.
self.next_column_id += 1;
Ok(desc)
}
fn new_meta_builder(&self) -> TableMetaBuilder {
let mut builder = TableMetaBuilder::default();
builder
.engine(&self.engine)
.engine_options(self.engine_options.clone())
.options(self.options.clone())
.created_on(self.created_on)
.next_column_id(self.next_column_id);
builder
}
fn add_columns(
&self,
table_name: &str,
requests: &[AddColumnRequest],
) -> Result<TableMetaBuilder> {
let table_schema = &self.schema;
let mut meta_builder = self.new_meta_builder();
// Check whether columns to add are already existing.
for request in requests {
let column_name = &request.column_schema.name;
ensure!(
table_schema.column_schema_by_name(column_name).is_none(),
error::ColumnExistsSnafu {
column_name,
table_name,
}
);
}
// Collect names of columns to add for error message.
let mut column_names = Vec::with_capacity(requests.len());
let mut primary_key_indices = self.primary_key_indices.clone();
let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
columns.extend_from_slice(table_schema.column_schemas());
// Append new columns to the end of column list.
for request in requests {
column_names.push(request.column_schema.name.clone());
if request.is_key {
// If a key column is added, we also need to store its index in primary_key_indices.
primary_key_indices.push(columns.len());
}
columns.push(request.column_schema.clone());
}
let mut builder = SchemaBuilder::try_from(columns)
.with_context(|_| error::SchemaBuildSnafu {
msg: format!(
"Failed to convert column schemas into schema for table {}",
table_name
),
})?
.timestamp_index(table_schema.timestamp_index())
// Also bump the schema version.
.version(table_schema.version() + 1);
for (k, v) in table_schema.metadata().iter() {
builder = builder.add_metadata(k, v);
}
let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
msg: format!(
"Table {} cannot add new columns {:?}",
table_name, column_names
),
})?;
// value_indices would be generated automatically.
meta_builder
.schema(Arc::new(new_schema))
.primary_key_indices(primary_key_indices);
Ok(meta_builder)
}
fn remove_columns(
&self,
table_name: &str,
column_names: &[String],
) -> Result<TableMetaBuilder> {
let table_schema = &self.schema;
let column_names: HashSet<_> = column_names.iter().collect();
let mut meta_builder = self.new_meta_builder();
let timestamp_index = table_schema.timestamp_index();
// Check whether columns are existing and not in primary key index.
for column_name in &column_names {
if let Some(index) = table_schema.column_index_by_name(column_name) {
// This is a linear search, but since there won't be too much columns, the performance should
// be acceptable.
ensure!(
!self.primary_key_indices.contains(&index),
error::RemoveColumnInIndexSnafu {
column_name: *column_name,
table_name,
}
);
if let Some(ts_index) = timestamp_index {
// Not allowed to remove column in timestamp index.
ensure!(
index != ts_index,
error::RemoveColumnInIndexSnafu {
column_name: table_schema.column_name_by_index(ts_index),
table_name,
}
);
}
} else {
return error::ColumnNotExistsSnafu {
column_name: *column_name,
table_name,
}
.fail()?;
}
}
// Collect columns after removal.
let columns: Vec<_> = table_schema
.column_schemas()
.iter()
.filter(|column_schema| !column_names.contains(&column_schema.name))
.cloned()
.collect();
// Find the index of the timestamp column.
let timestamp_column = table_schema.timestamp_column();
let timestamp_index = columns.iter().enumerate().find_map(|(idx, column_schema)| {
let is_timestamp = timestamp_column
.map(|c| c.name == column_schema.name)
.unwrap_or(false);
if is_timestamp {
Some(idx)
} else {
None
}
});
let mut builder = SchemaBuilder::try_from_columns(columns)
.with_context(|_| error::SchemaBuildSnafu {
msg: format!(
"Failed to convert column schemas into schema for table {}",
table_name
),
})?
// Need to use the newly computed timestamp index.
.timestamp_index(timestamp_index)
// Also bump the schema version.
.version(table_schema.version() + 1);
for (k, v) in table_schema.metadata().iter() {
builder = builder.add_metadata(k, v);
}
let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
msg: format!(
"Table {} cannot add remove columns {:?}",
table_name, column_names
),
})?;
// Rebuild the indices of primary key columns.
let primary_key_indices = self
.primary_key_indices
.iter()
.map(|idx| table_schema.column_name_by_index(*idx))
// This unwrap is safe since we don't allow removing a primary key column.
.map(|name| new_schema.column_index_by_name(name).unwrap())
.collect();
meta_builder
.schema(Arc::new(new_schema))
.primary_key_indices(primary_key_indices);
Ok(meta_builder)
}
}
#[derive(Clone, Debug, PartialEq, Builder)]
#[builder(pattern = "owned")]
pub struct TableInfo {
/// Id and version of the table.
#[builder(default, setter(into))]
pub ident: TableIdent,
/// Name of the table.
#[builder(setter(into))]
pub name: String,
/// Comment of the table.
@@ -248,6 +474,7 @@ impl TryFrom<RawTableInfo> for TableInfo {
#[cfg(test)]
mod tests {
use common_error::prelude::*;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
@@ -257,6 +484,7 @@ mod tests {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
SchemaBuilder::try_from(column_schemas)
.unwrap()
@@ -271,10 +499,9 @@ mod tests {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![1])
.value_indices(vec![0])
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(2)
.next_column_id(3)
.build()
.unwrap();
let info = TableInfoBuilder::default()
@@ -290,4 +517,236 @@ mod tests {
assert_eq!(info, info_new);
}
fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
let alter_kind = AlterKind::AddColumns {
columns: vec![
AddColumnRequest {
column_schema: new_tag,
is_key: true,
},
AddColumnRequest {
column_schema: new_field,
is_key: false,
},
],
};
let builder = meta
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap();
builder.build().unwrap()
}
#[test]
fn test_add_columns() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
let new_meta = add_columns_to_meta(&meta);
let names: Vec<String> = new_meta
.schema
.column_schemas()
.iter()
.map(|column_schema| column_schema.name.clone())
.collect();
assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
}
#[test]
fn test_remove_columns() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema.clone())
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
// Add more columns so we have enough candidate columns to remove.
let meta = add_columns_to_meta(&meta);
let alter_kind = AlterKind::RemoveColumns {
names: vec![String::from("col2"), String::from("my_field")],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();
let names: Vec<String> = new_meta
.schema
.column_schemas()
.iter()
.map(|column_schema| column_schema.name.clone())
.collect();
assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
assert_eq!(&[1], &new_meta.value_indices[..]);
assert_eq!(
schema.timestamp_column(),
new_meta.schema.timestamp_column()
);
}
#[test]
fn test_remove_multiple_columns_before_timestamp() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
];
let schema = Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(3))
.version(123)
.build()
.unwrap(),
);
let meta = TableMetaBuilder::default()
.schema(schema.clone())
.primary_key_indices(vec![1])
.engine("engine")
.next_column_id(4)
.build()
.unwrap();
// Remove columns in reverse order to test whether timestamp index is valid.
let alter_kind = AlterKind::RemoveColumns {
names: vec![String::from("col3"), String::from("col1")],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();
let names: Vec<String> = new_meta
.schema
.column_schemas()
.iter()
.map(|column_schema| column_schema.name.clone())
.collect();
assert_eq!(&["col2", "ts"], &names[..]);
assert_eq!(&[0], &new_meta.primary_key_indices[..]);
assert_eq!(&[1], &new_meta.value_indices[..]);
assert_eq!(
schema.timestamp_column(),
new_meta.schema.timestamp_column()
);
}
#[test]
fn test_add_existing_column() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
is_key: false,
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnExists, err.status_code());
}
#[test]
fn test_remove_unknown_column() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
let alter_kind = AlterKind::RemoveColumns {
names: vec![String::from("unknown")],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
}
#[test]
fn test_remove_key_column() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
// Remove column in primary key.
let alter_kind = AlterKind::RemoveColumns {
names: vec![String::from("col1")],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
// Remove timestamp column.
let alter_kind = AlterKind::RemoveColumns {
names: vec![String::from("ts")],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_alloc_new_column() {
let schema = Arc::new(new_test_schema());
let mut meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
assert_eq!(3, meta.next_column_id);
let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
assert_eq!(4, meta.next_column_id);
assert_eq!(column_schema.name, desc.name);
}
}