feat: implement alter table (#218)

* feat: implement alter table

* Currently we have no plans to support altering the primary keys (maybe never), so removed the related codes.

* make `alter` a trait function in table

* address other CR comments

* cleanup

* rebase develop

* resolve code review comments

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-09-06 13:44:34 +08:00
committed by GitHub
parent 119ff2fc2e
commit 5e67301c00
28 changed files with 956 additions and 216 deletions

1
Cargo.lock generated
View File

@@ -1356,6 +1356,7 @@ dependencies = [
"common-telemetry",
"common-time",
"datafusion",
"datafusion-common",
"datatypes",
"futures",
"hyper",

View File

@@ -58,6 +58,7 @@ axum-test-helper = "0.1"
client = { path = "../client" }
common-query = { path = "../common/query" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
tempdir = "0.3"
[dev-dependencies.arrow]

View File

@@ -1,7 +1,6 @@
use std::any::Any;
use api::serde::DecodeError;
use common_error::ext::BoxedError;
use common_error::prelude::*;
use datatypes::prelude::ConcreteDataType;
use storage::error::Error as StorageError;
@@ -40,7 +39,14 @@ pub enum Error {
GetTable {
table_name: String,
#[snafu(backtrace)]
source: BoxedError,
source: TableError,
},
#[snafu(display("Failed to alter table {}, source: {}", table_name, source))]
AlterTable {
table_name: String,
#[snafu(backtrace)]
source: TableError,
},
#[snafu(display("Table not found: {}", table_name))]
@@ -135,8 +141,8 @@ pub enum Error {
source: common_runtime::error::Error,
},
#[snafu(display("Invalid CREATE TABLE sql statement, cause: {}", msg))]
InvalidCreateTableSql { msg: String, backtrace: Backtrace },
#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to create schema when creating table, source: {}", source))]
CreateSchema {
@@ -192,8 +198,9 @@ impl ErrorExt for Error {
Error::ExecuteSql { source } => source.status_code(),
Error::ExecutePhysicalPlan { source } => source.status_code(),
Error::NewCatalog { source } => source.status_code(),
Error::CreateTable { source, .. } => source.status_code(),
Error::GetTable { source, .. } => source.status_code(),
Error::CreateTable { source, .. }
| Error::GetTable { source, .. }
| Error::AlterTable { source, .. } => source.status_code(),
Error::Insert { source, .. } => source.status_code(),
Error::ConvertSchema { source, .. } => source.status_code(),
Error::TableNotFound { .. } => StatusCode::TableNotFound,
@@ -203,7 +210,7 @@ impl ErrorExt for Error {
| Error::ColumnTypeMismatch { .. }
| Error::IllegalInsertData { .. }
| Error::DecodeInsert { .. }
| Error::InvalidCreateTableSql { .. }
| Error::InvalidSql { .. }
| Error::SqlTypeNotSupported { .. }
| Error::CreateSchema { .. }
| Error::KeyColumnNotFound { .. }
@@ -243,6 +250,7 @@ impl From<Error> for tonic::Status {
#[cfg(test)]
mod tests {
use common_error::ext::BoxedError;
use common_error::mock::MockError;
use super::*;

View File

@@ -39,7 +39,7 @@ type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
pub struct Instance {
// Query service
query_engine: QueryEngineRef,
sql_handler: SqlHandler<DefaultEngine>,
sql_handler: SqlHandler,
// Catalog list
catalog_manager: CatalogManagerRef,
physical_planner: PhysicalPlanner,
@@ -62,8 +62,10 @@ impl Instance {
),
object_store,
);
let table_engine = Arc::new(table_engine);
let catalog_manager = Arc::new(
catalog::LocalCatalogManager::try_new(Arc::new(table_engine.clone()))
catalog::LocalCatalogManager::try_new(table_engine.clone())
.await
.context(NewCatalogSnafu)?,
);
@@ -150,7 +152,10 @@ impl Instance {
self.sql_handler.execute(SqlRequest::Create(request)).await
}
Statement::Alter(alter_table) => {
let req = self.sql_handler.alter_to_request(alter_table)?;
self.sql_handler.execute(SqlRequest::Alter(req)).await
}
_ => unimplemented!(),
}
}
@@ -198,7 +203,7 @@ impl Instance {
}
}
pub fn sql_handler(&self) -> &SqlHandler<DefaultEngine> {
pub fn sql_handler(&self) -> &SqlHandler {
&self.sql_handler
}
@@ -207,6 +212,43 @@ impl Instance {
}
}
#[cfg(test)]
impl Instance {
pub async fn new_mock() -> Result<Self> {
use table_engine::table::test_util::new_test_object_store;
use table_engine::table::test_util::MockEngine;
use table_engine::table::test_util::MockMitoEngine;
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = MockMitoEngine::new(
TableEngineConfig::default(),
MockEngine::default(),
object_store,
);
let mock_engine = Arc::new(mock_engine);
let catalog_manager = Arc::new(
catalog::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let sql_handler = SqlHandler::new(mock_engine, catalog_manager.clone());
let physical_planner = PhysicalPlanner::new(query_engine.clone());
let script_executor = ScriptExecutor::new(query_engine.clone());
Ok(Self {
query_engine,
sql_handler,
catalog_manager,
physical_planner,
script_executor,
})
}
}
async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
// TODO(dennis): supports other backend
let data_dir = util::normalize_dir(match store_config {

View File

@@ -1,17 +1,19 @@
//! sql handler
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::types::DateTimeType;
use query::query_engine::Output;
use snafu::{OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngine};
use sql::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName};
use table::engine::{EngineContext, TableEngineRef};
use table::requests::*;
use table::TableRef;
use crate::error::{GetTableSnafu, Result, TableNotFoundSnafu};
use crate::error::{self, GetTableSnafu, Result, TableNotFoundSnafu};
mod alter;
mod create;
mod insert;
@@ -19,18 +21,19 @@ mod insert;
pub enum SqlRequest {
Insert(InsertRequest),
Create(CreateTableRequest),
Alter(AlterTableRequest),
}
// Handler to execute SQL except query
pub struct SqlHandler<Engine: TableEngine> {
table_engine: Arc<Engine>,
pub struct SqlHandler {
table_engine: TableEngineRef,
catalog_manager: CatalogManagerRef,
}
impl<Engine: TableEngine> SqlHandler<Engine> {
pub fn new(table_engine: Engine, catalog_manager: CatalogManagerRef) -> Self {
impl SqlHandler {
pub fn new(table_engine: TableEngineRef, catalog_manager: CatalogManagerRef) -> Self {
Self {
table_engine: Arc::new(table_engine),
table_engine,
catalog_manager,
}
}
@@ -39,22 +42,92 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
match request {
SqlRequest::Insert(req) => self.insert(req).await,
SqlRequest::Create(req) => self.create(req).await,
SqlRequest::Alter(req) => self.alter(req).await,
}
}
pub(crate) fn get_table(&self, table_name: &str) -> Result<TableRef> {
self.table_engine
.get_table(&EngineContext::default(), table_name)
.map_err(BoxedError::new)
.context(GetTableSnafu { table_name })?
.context(TableNotFoundSnafu { table_name })
}
pub fn table_engine(&self) -> Arc<Engine> {
pub fn table_engine(&self) -> TableEngineRef {
self.table_engine.clone()
}
}
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>` or `<table>` when
/// catalog and schema are default) to tuple.
fn table_idents_to_full_name(
obj_name: &ObjectName,
) -> Result<(Option<String>, Option<String>, String)> {
match &obj_name.0[..] {
[table] => Ok((None, None, table.value.clone())),
[catalog, schema, table] => Ok((
Some(catalog.value.clone()),
Some(schema.value.clone()),
table.value.clone(),
)),
_ => error::InvalidSqlSnafu {
msg: format!(
"expect table name to be <catalog>.<schema>.<table> or <table>, actual: {}",
obj_name
),
}
.fail(),
}
}
fn column_def_to_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
let is_nullable = column_def
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Null));
Ok(ColumnSchema {
name: column_def.name.value.clone(),
data_type: sql_data_type_to_concrete_data_type(&column_def.data_type)?,
is_nullable,
})
}
fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<ConcreteDataType> {
match data_type {
SqlDataType::BigInt(_) => Ok(ConcreteDataType::int64_datatype()),
SqlDataType::Int(_) => Ok(ConcreteDataType::int32_datatype()),
SqlDataType::SmallInt(_) => Ok(ConcreteDataType::int16_datatype()),
SqlDataType::Char(_)
| SqlDataType::Varchar(_)
| SqlDataType::Text
| SqlDataType::String => Ok(ConcreteDataType::string_datatype()),
SqlDataType::Float(_) => Ok(ConcreteDataType::float32_datatype()),
SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()),
SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()),
SqlDataType::Date => Ok(ConcreteDataType::date_datatype()),
SqlDataType::Custom(obj_name) => match &obj_name.0[..] {
[type_name] => {
if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) {
Ok(ConcreteDataType::datetime_datatype())
} else {
error::SqlTypeNotSupportedSnafu {
t: data_type.clone(),
}
.fail()
}
}
_ => error::SqlTypeNotSupportedSnafu {
t: data_type.clone(),
}
.fail(),
},
_ => error::SqlTypeNotSupportedSnafu {
t: data_type.clone(),
}
.fail(),
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
@@ -165,9 +238,10 @@ mod tests {
),
object_store,
);
let table_engine = Arc::new(table_engine);
let catalog_list = Arc::new(
catalog::LocalCatalogManager::try_new(Arc::new(table_engine.clone()))
catalog::LocalCatalogManager::try_new(table_engine.clone())
.await
.unwrap(),
);

View File

@@ -0,0 +1,91 @@
use query::query_engine::Output;
use snafu::prelude::*;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use table::engine::EngineContext;
use table::requests::{AlterKind, AlterTableRequest};
use crate::error::{self, Result};
use crate::sql::{column_def_to_schema, table_idents_to_full_name, SqlHandler};
impl SqlHandler {
pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result<Output> {
let ctx = EngineContext {};
let table_name = &req.table_name.clone();
if !self.table_engine.table_exists(&ctx, table_name) {
return error::TableNotFoundSnafu { table_name }.fail();
}
self.table_engine
.alter_table(&ctx, req)
.await
.context(error::AlterTableSnafu { table_name })?;
// Tried in MySQL, it really prints "Affected Rows: 0".
Ok(Output::AffectedRows(0))
}
pub(crate) fn alter_to_request(&self, alter_table: AlterTable) -> Result<AlterTableRequest> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(alter_table.table_name())?;
let alter_kind = match alter_table.alter_operation() {
AlterTableOperation::AddConstraint(table_constraint) => {
return error::InvalidSqlSnafu {
msg: format!("unsupported table constraint {}", table_constraint),
}
.fail()
}
AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumn {
new_column: column_def_to_schema(column_def)?,
},
};
Ok(AlterTableRequest {
catalog_name,
schema_name,
table_name,
alter_kind,
})
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use datatypes::prelude::ConcreteDataType;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use super::*;
use crate::tests::test_util::create_mock_sql_handler;
fn parse_sql(sql: &str) -> AlterTable {
let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
assert_eq!(1, stmt.len());
let stmt = stmt.remove(0);
assert_matches!(stmt, Statement::Alter(_));
match stmt {
Statement::Alter(alter_table) => alter_table,
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_alter_to_request_with_adding_column() {
let handler = create_mock_sql_handler().await;
let alter_table = parse_sql("ALTER TABLE my_metric_1 ADD tagk_i STRING Null;");
let req = handler.alter_to_request(alter_table).unwrap();
assert_eq!(req.catalog_name, None);
assert_eq!(req.schema_name, None);
assert_eq!(req.table_name, "my_metric_1");
let alter_kind = req.alter_kind;
assert_matches!(alter_kind, AlterKind::AddColumn { .. });
match alter_kind {
AlterKind::AddColumn { new_column } => {
assert_eq!(new_column.name, "tagk_i");
assert!(new_column.is_nullable);
assert_eq!(new_column.data_type, ConcreteDataType::string_datatype());
}
}
}
}

View File

@@ -3,25 +3,23 @@ use std::sync::Arc;
use catalog::RegisterTableRequest;
use common_telemetry::tracing::info;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use datatypes::types::DateTimeType;
use datatypes::schema::SchemaBuilder;
use query::query_engine::Output;
use snafu::{OptionExt, ResultExt};
use sql::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName, TableConstraint};
use sql::ast::TableConstraint;
use sql::statements::create_table::CreateTable;
use store_api::storage::consts::TIME_INDEX_NAME;
use table::engine::{EngineContext, TableEngine};
use table::engine::EngineContext;
use table::metadata::TableId;
use table::requests::*;
use crate::error::{
ConstraintNotSupportedSnafu, CreateSchemaSnafu, CreateTableSnafu, InsertSystemCatalogSnafu,
InvalidCreateTableSqlSnafu, KeyColumnNotFoundSnafu, Result, SqlTypeNotSupportedSnafu,
self, ConstraintNotSupportedSnafu, CreateSchemaSnafu, CreateTableSnafu,
InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu, Result,
};
use crate::sql::SqlHandler;
use crate::sql::{column_def_to_schema, table_idents_to_full_name, SqlHandler};
impl<Engine: TableEngine> SqlHandler<Engine> {
impl SqlHandler {
pub(crate) async fn create(&self, req: CreateTableRequest) -> Result<Output> {
let ctx = EngineContext {};
let catalog_name = req.catalog_name.clone();
@@ -63,7 +61,7 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
let mut ts_index = usize::MAX;
let mut primary_keys = vec![];
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(stmt.name)?;
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&stmt.name)?;
let col_map = stmt
.columns
@@ -88,7 +86,7 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
},
)?;
} else {
return InvalidCreateTableSqlSnafu {
return error::InvalidSqlSnafu {
msg: format!("Cannot recognize named UNIQUE constraint: {}", name),
}
.fail();
@@ -102,7 +100,7 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
)?);
}
} else {
return InvalidCreateTableSqlSnafu {
return error::InvalidSqlSnafu {
msg: format!(
"Unrecognized non-primary unnamed UNIQUE constraint: {:?}",
name
@@ -156,93 +154,21 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
}
}
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>` or `<table>` when catalog and schema are default)
/// to tuples
fn table_idents_to_full_name(
obj_name: ObjectName,
) -> Result<(Option<String>, Option<String>, String)> {
match &obj_name.0[..] {
[table] => Ok((None, None, table.value.clone())),
[catalog, schema, table] => Ok((
Some(catalog.value.clone()),
Some(schema.value.clone()),
table.value.clone(),
)),
_ => InvalidCreateTableSqlSnafu {
msg: format!(
"table name can only be <catalog>.<schema>.<table> or <table>, but found: {}",
obj_name
),
}
.fail(),
}
}
fn column_def_to_schema(def: &ColumnDef) -> Result<ColumnSchema> {
let is_nullable = def
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Null));
Ok(ColumnSchema {
name: def.name.value.clone(),
data_type: sql_data_type_to_concrete_data_type(&def.data_type)?,
is_nullable,
})
}
fn sql_data_type_to_concrete_data_type(t: &SqlDataType) -> Result<ConcreteDataType> {
match t {
SqlDataType::BigInt(_) => Ok(ConcreteDataType::int64_datatype()),
SqlDataType::Int(_) => Ok(ConcreteDataType::int32_datatype()),
SqlDataType::SmallInt(_) => Ok(ConcreteDataType::int16_datatype()),
SqlDataType::Char(_)
| SqlDataType::Varchar(_)
| SqlDataType::Text
| SqlDataType::String => Ok(ConcreteDataType::string_datatype()),
SqlDataType::Float(_) => Ok(ConcreteDataType::float32_datatype()),
SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()),
SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()),
SqlDataType::Date => Ok(ConcreteDataType::date_datatype()),
SqlDataType::Custom(obj_name) => match &obj_name.0[..] {
[type_name] => {
if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) {
Ok(ConcreteDataType::datetime_datatype())
} else {
SqlTypeNotSupportedSnafu { t: t.clone() }.fail()
}
}
_ => SqlTypeNotSupportedSnafu { t: t.clone() }.fail(),
},
_ => SqlTypeNotSupportedSnafu { t: t.clone() }.fail(),
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use datatypes::prelude::ConcreteDataType;
use sql::ast::Ident;
use sql::ast::{DataType as SqlDataType, ObjectName};
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use table_engine::config::EngineConfig;
use table_engine::engine::MitoEngine;
use table_engine::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use super::*;
use crate::error::Error;
async fn create_mock_sql_handler() -> SqlHandler<MitoEngine<MockEngine>> {
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine =
MockMitoEngine::new(EngineConfig::default(), MockEngine::default(), object_store);
let catalog_manager = Arc::new(
catalog::LocalCatalogManager::try_new(Arc::new(mock_engine.clone()))
.await
.unwrap(),
);
SqlHandler::new(mock_engine, catalog_manager)
}
use crate::sql::sql_data_type_to_concrete_data_type;
use crate::tests::test_util::create_mock_sql_handler;
fn sql_to_statement(sql: &str) -> CreateTable {
let mut res = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();

View File

@@ -10,7 +10,6 @@ use snafu::OptionExt;
use snafu::ResultExt;
use sql::ast::Value as SqlValue;
use sql::statements::insert::Insert;
use table::engine::TableEngine;
use table::requests::*;
use crate::error::{
@@ -19,7 +18,7 @@ use crate::error::{
};
use crate::sql::{SqlHandler, SqlRequest};
impl<Engine: TableEngine> SqlHandler<Engine> {
impl SqlHandler {
pub(crate) async fn insert(&self, req: InsertRequest) -> Result<Output> {
let table_name = &req.table_name.to_string();
let table = self.get_table(table_name)?;

View File

@@ -1,5 +1,7 @@
use arrow::array::UInt64Array;
use common_recordbatch::util;
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use query::Output;
use crate::instance::Instance;
@@ -61,8 +63,6 @@ pub async fn test_execute_create() {
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
let output = instance
.execute_sql(
r#"create table test_table(
@@ -78,3 +78,63 @@ pub async fn test_execute_create() {
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
}
#[tokio::test]
async fn test_alter_table() {
common_telemetry::init_default_ut_logging();
// TODO(LFC) Use real Mito engine when we can alter its region schema,
// and delete the `new_mock` method.
let instance = Instance::new_mock().await.unwrap();
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
// make sure table insertion is ok before altering table
instance
.execute_sql("insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000)")
.await
.unwrap();
let output = instance
.execute_sql("alter table demo add my_tag string null")
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let output = instance
.execute_sql(
"insert into demo(host, cpu, memory, ts, my_tag) values ('host2', 2.2, 200, 2000, 'hello')",
)
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql("insert into demo(host, cpu, memory, ts) values ('host3', 3.3, 300, 3000)")
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance.execute_sql("select * from demo").await.unwrap();
match output {
Output::RecordBatch(stream) => {
let recordbatches = util::collect(stream).await.unwrap();
let recordbatch = recordbatches
.into_iter()
.map(|r| r.df_recordbatch)
.collect::<Vec<DfRecordBatch>>();
let pretty_print = arrow_print::write(&recordbatch);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![
"+-------+------+-----+--------+--------+",
"| host | ts | cpu | memory | my_tag |",
"+-------+------+-----+--------+--------+",
"| host1 | 1000 | 1.1 | 100 | |",
"| host2 | 2000 | 2.2 | 200 | hello |",
"| host3 | 3000 | 3.3 | 300 | |",
"+-------+------+-----+--------+--------+",
];
assert_eq!(pretty_print, expected);
}
_ => unreachable!(),
}
}

View File

@@ -8,11 +8,14 @@ use snafu::ResultExt;
use table::engine::EngineContext;
use table::engine::TableEngineRef;
use table::requests::CreateTableRequest;
use table_engine::config::EngineConfig;
use table_engine::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use tempdir::TempDir;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{CreateTableSnafu, Result};
use crate::instance::Instance;
use crate::sql::SqlHandler;
/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`,
/// Only for test.
@@ -66,7 +69,7 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> {
.expect("ts is expected to be timestamp column"),
),
create_if_not_exists: true,
primary_key_indices: Vec::default(),
primary_key_indices: vec![3, 0], // "host" and "ts" are primary keys
table_options: HashMap::new(),
},
)
@@ -84,3 +87,18 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> {
.unwrap();
Ok(())
}
pub async fn create_mock_sql_handler() -> SqlHandler {
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(MockMitoEngine::new(
EngineConfig::default(),
MockEngine::default(),
object_store,
));
let catalog_manager = Arc::new(
catalog::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);
SqlHandler::new(mock_engine, catalog_manager)
}

View File

@@ -49,7 +49,7 @@ where
todo!("Currently not supported")
}
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Create(_) | Statement::Insert(_) => unreachable!(),
Statement::Create(_) | Statement::Alter(_) | Statement::Insert(_) => unreachable!(),
}
}
}

View File

@@ -77,6 +77,8 @@ impl<'a> ParserContext<'a> {
Keyword::SELECT | Keyword::WITH | Keyword::VALUES => self.parse_query(),
Keyword::ALTER => self.parse_alter(),
// todo(hl) support more statements.
_ => self.unsupported(self.peek_token_as_string()),
}

View File

@@ -1,3 +1,4 @@
mod alter_parser;
pub(crate) mod create_parser;
pub(crate) mod insert_parser;
pub(crate) mod query_parser;

View File

@@ -0,0 +1,79 @@
use snafu::ResultExt;
use sqlparser::keywords::Keyword;
use sqlparser::parser::ParserError;
use crate::error;
use crate::parser::ParserContext;
use crate::parser::Result;
use crate::statements::alter::{AlterTable, AlterTableOperation};
use crate::statements::statement::Statement;
impl<'a> ParserContext<'a> {
pub(crate) fn parse_alter(&mut self) -> Result<Statement> {
let alter_table = self.parse().context(error::SyntaxSnafu { sql: self.sql })?;
Ok(Statement::Alter(alter_table))
}
fn parse(&mut self) -> std::result::Result<AlterTable, ParserError> {
let parser = &mut self.parser;
parser.expect_keywords(&[Keyword::ALTER, Keyword::TABLE])?;
let table_name = parser.parse_object_name()?;
let alter_operation = if parser.parse_keyword(Keyword::ADD) {
if let Some(constraint) = parser.parse_optional_table_constraint()? {
AlterTableOperation::AddConstraint(constraint)
} else {
let _ = parser.parse_keyword(Keyword::COLUMN);
let column_def = parser.parse_column_def()?;
AlterTableOperation::AddColumn { column_def }
}
} else {
return Err(ParserError::ParserError(format!(
"expect ADD or DROP after ALTER TABLE, found {}",
parser.peek_token()
)));
};
Ok(AlterTable::new(table_name, alter_operation))
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use sqlparser::ast::{ColumnOption, DataType};
use sqlparser::dialect::GenericDialect;
use super::*;
#[test]
fn test_parse_alter_add_column() {
let sql = "ALTER TABLE my_metric_1 ADD tagk_i STRING Null;";
let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
assert_matches!(statement, Statement::Alter { .. });
match statement {
Statement::Alter(alter_table) => {
assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
let alter_operation = alter_table.alter_operation();
assert_matches!(alter_operation, AlterTableOperation::AddColumn { .. });
match alter_operation {
AlterTableOperation::AddColumn { column_def } => {
assert_eq!("tagk_i", column_def.name.value);
assert_eq!(DataType::String, column_def.data_type);
assert!(column_def
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Null)));
}
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
}

View File

@@ -22,7 +22,6 @@ impl<'a> ParserContext<'a> {
mod tests {
use sqlparser::dialect::GenericDialect;
use super::*;
use crate::parser::ParserContext;
#[test]
@@ -36,19 +35,13 @@ mod tests {
}
#[test]
pub fn test_parser_invalid_query() {
// sql without selection
let sql = "SELECT FROM table_1";
let parser = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
match &parser[0] {
Statement::ShowDatabases(_) => {
panic!("Not expected to be a show database statement")
}
Statement::Insert(_) => {
panic!("Not expected to be a show database statement")
}
Statement::Create(_) | Statement::Query(_) => {}
}
pub fn test_parse_invalid_query() {
let sql = "SELECT * FROM table_1 WHERE";
let result = ParserContext::create_with_dialect(sql, &GenericDialect {});
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Expected an expression"));
}
}

View File

@@ -1,3 +1,4 @@
pub mod alter;
pub mod create_table;
pub mod insert;
pub mod query;

View File

@@ -0,0 +1,32 @@
use sqlparser::ast::{ColumnDef, ObjectName, TableConstraint};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AlterTable {
table_name: ObjectName,
alter_operation: AlterTableOperation,
}
impl AlterTable {
pub(crate) fn new(table_name: ObjectName, alter_operation: AlterTableOperation) -> Self {
Self {
table_name,
alter_operation,
}
}
pub fn table_name(&self) -> &ObjectName {
&self.table_name
}
pub fn alter_operation(&self) -> &AlterTableOperation {
&self.alter_operation
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AlterTableOperation {
/// `ADD <table_constraint>`
AddConstraint(TableConstraint),
/// `ADD [ COLUMN ] <column_def>`
AddColumn { column_def: ColumnDef },
}

View File

@@ -1,6 +1,7 @@
use sqlparser::ast::Statement as SpStatement;
use sqlparser::parser::ParserError;
use crate::statements::alter::AlterTable;
use crate::statements::create_table::CreateTable;
use crate::statements::insert::Insert;
use crate::statements::query::Query;
@@ -20,6 +21,9 @@ pub enum Statement {
/// CREATE TABLE
Create(CreateTable),
/// ALTER TABLE
Alter(AlterTable),
}
/// Converts Statement to sqlparser statement
@@ -33,7 +37,7 @@ impl TryFrom<Statement> for SpStatement {
)),
Statement::Query(s) => Ok(SpStatement::Query(Box::new(s.inner))),
Statement::Insert(i) => Ok(i.inner),
Statement::Create(_) => unimplemented!(),
Statement::Create(_) | Statement::Alter(_) => unimplemented!(),
}
}
}

View File

@@ -12,7 +12,7 @@ pub mod memtable;
pub mod metadata;
pub mod proto;
mod read;
mod region;
pub mod region;
pub mod schema;
mod snapshot;
mod sst;

View File

@@ -52,6 +52,10 @@ impl<S: LogStore> Region for RegionImpl<S> {
type WriteRequest = WriteBatch;
type Snapshot = SnapshotImpl;
fn id(&self) -> RegionId {
self.inner.shared.id
}
fn name(&self) -> &str {
&self.inner.shared.name
}

View File

@@ -26,6 +26,7 @@ use crate::storage::metadata::RegionMeta;
use crate::storage::requests::WriteRequest;
use crate::storage::responses::WriteResponse;
use crate::storage::snapshot::{ReadContext, Snapshot};
use crate::storage::{RegionDescriptor, RegionId};
/// Chunks of rows in storage engine.
#[async_trait]
@@ -35,6 +36,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
type WriteRequest: WriteRequest;
type Snapshot: Snapshot;
fn id(&self) -> RegionId;
/// Returns name of the region.
fn name(&self) -> &str;
@@ -53,6 +56,10 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
/// Create write request
fn write_request(&self) -> Self::WriteRequest;
fn alter(&self, _descriptor: RegionDescriptor) -> Result<(), Self::Error> {
unimplemented!()
}
}
/// Context for write operations.

View File

@@ -5,12 +5,13 @@ use std::sync::RwLock;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::logging;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{
self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
CreateOptions, OpenOptions, RegionDescriptorBuilder, RegionId, RowKeyDescriptor,
RowKeyDescriptorBuilder, StorageEngine,
ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
CreateOptions, EngineContext as StorageEngineContext, OpenOptions, RegionDescriptorBuilder,
RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{EngineContext, TableEngine};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
@@ -29,7 +30,7 @@ use crate::error::{
use crate::table::MitoTable;
pub const MITO_ENGINE: &str = "mito";
const INIT_COLUMN_ID: ColumnId = 0;
pub const INIT_COLUMN_ID: ColumnId = 0;
const INIT_TABLE_VERSION: TableVersion = 0;
/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}"
@@ -89,18 +90,18 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
async fn alter_table(
&self,
_ctx: &EngineContext,
_request: AlterTableRequest,
ctx: &EngineContext,
req: AlterTableRequest,
) -> TableResult<TableRef> {
unimplemented!();
Ok(self.inner.alter_table(ctx, req).await?)
}
fn get_table(&self, _ctx: &EngineContext, name: &str) -> TableResult<Option<TableRef>> {
Ok(self.inner.get_table(name))
}
fn table_exists(&self, _ctx: &EngineContext, _name: &str) -> bool {
unimplemented!();
fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool {
self.inner.get_table(name).is_some()
}
async fn drop_table(
@@ -125,18 +126,17 @@ struct MitoEngineInner<S: StorageEngine> {
table_mutex: Mutex<()>,
}
fn build_row_key_desc_from_schema(
pub(crate) fn build_row_key_desc(
mut column_id: ColumnId,
request: &CreateTableRequest,
table_name: &str,
table_schema: &SchemaRef,
primary_key_indices: &Vec<usize>,
) -> Result<(ColumnId, RowKeyDescriptor)> {
let ts_column_schema =
request
.schema
.timestamp_column()
.context(MissingTimestampIndexSnafu {
table_name: &request.table_name,
})?;
let timestamp_index = request.schema.timestamp_index().unwrap();
let ts_column_schema = table_schema
.timestamp_column()
.context(MissingTimestampIndexSnafu { table_name })?;
// `unwrap` is safe because we've checked the `timestamp_column` above
let timestamp_index = table_schema.timestamp_index().unwrap();
let ts_column = ColumnDescriptorBuilder::new(
column_id,
@@ -147,16 +147,16 @@ fn build_row_key_desc_from_schema(
.build()
.context(BuildColumnDescriptorSnafu {
column_name: &ts_column_schema.name,
table_name: &request.table_name,
table_name,
})?;
column_id += 1;
let column_schemas = &request.schema.column_schemas();
let column_schemas = &table_schema.column_schemas();
//TODO(boyan): enable version column by table option?
let mut builder = RowKeyDescriptorBuilder::new(ts_column);
for index in &request.primary_key_indices {
for index in primary_key_indices {
if *index == timestamp_index {
continue;
}
@@ -172,7 +172,7 @@ fn build_row_key_desc_from_schema(
.build()
.context(BuildColumnDescriptorSnafu {
column_name: &column_schema.name,
table_name: &request.table_name,
table_name,
})?;
builder = builder.push_column(column);
@@ -181,27 +181,24 @@ fn build_row_key_desc_from_schema(
Ok((
column_id,
builder.build().context(BuildRowKeyDescriptorSnafu {
table_name: &request.table_name,
})?,
builder
.build()
.context(BuildRowKeyDescriptorSnafu { table_name })?,
))
}
fn build_column_family_from_request(
pub(crate) fn build_column_family(
mut column_id: ColumnId,
request: &CreateTableRequest,
table_name: &str,
table_schema: &SchemaRef,
primary_key_indices: &[usize],
) -> Result<(ColumnId, ColumnFamilyDescriptor)> {
let mut builder = ColumnFamilyDescriptorBuilder::default();
let primary_key_indices = &request.primary_key_indices;
let ts_index = request
.schema
let ts_index = table_schema
.timestamp_index()
.context(MissingTimestampIndexSnafu {
table_name: &request.table_name,
})?;
let column_schemas = request
.schema
.context(MissingTimestampIndexSnafu { table_name })?;
let column_schemas = table_schema
.column_schemas()
.iter()
.enumerate()
@@ -217,7 +214,7 @@ fn build_column_family_from_request(
.build()
.context(BuildColumnDescriptorSnafu {
column_name: &column_schema.name,
table_name: &request.table_name,
table_name,
})?;
builder = builder.push_column(column);
@@ -226,9 +223,9 @@ fn build_column_family_from_request(
Ok((
column_id,
builder.build().context(BuildColumnFamilyDescriptorSnafu {
table_name: &request.table_name,
})?,
builder
.build()
.context(BuildColumnFamilyDescriptorSnafu { table_name })?,
))
}
@@ -248,9 +245,20 @@ impl<S: StorageEngine> MitoEngineInner<S> {
}
}
let (next_column_id, default_cf) =
build_column_family_from_request(INIT_COLUMN_ID, &request)?;
let (next_column_id, row_key) = build_row_key_desc_from_schema(next_column_id, &request)?;
let table_schema = &request.schema;
let primary_key_indices = &request.primary_key_indices;
let (next_column_id, default_cf) = build_column_family(
INIT_COLUMN_ID,
table_name,
table_schema,
primary_key_indices,
)?;
let (next_column_id, row_key) = build_row_key_desc(
next_column_id,
table_name,
table_schema,
primary_key_indices,
)?;
let table_id = request.id;
// TODO(dennis): supports multi regions;
@@ -285,7 +293,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let region = self
.storage_engine
.create_region(&storage::EngineContext::default(), region_descriptor, &opts)
.create_region(&StorageEngineContext::default(), region_descriptor, &opts)
.await
.map_err(BoxedError::new)
.context(error::CreateRegionSnafu)?;
@@ -340,7 +348,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
return Ok(Some(table));
}
let engine_ctx = storage::EngineContext::default();
let engine_ctx = StorageEngineContext::default();
let opts = OpenOptions {
parent_dir: table_dir(table_name),
};
@@ -379,6 +387,20 @@ impl<S: StorageEngine> MitoEngineInner<S> {
fn get_table(&self, name: &str) -> Option<TableRef> {
self.tables.read().unwrap().get(name).cloned()
}
async fn alter_table(&self, _ctx: &EngineContext, req: AlterTableRequest) -> Result<TableRef> {
let table_name = &req.table_name.clone();
let table = self
.get_table(table_name)
.context(error::TableNotFoundSnafu { table_name })?;
logging::info!("start altering table {} with request {:?}", table_name, req);
table
.alter(req)
.await
.context(error::AlterTableSnafu { table_name })?;
Ok(table)
}
}
impl<S: StorageEngine> MitoEngineInner<S> {
@@ -397,13 +419,15 @@ mod tests {
use common_recordbatch::util;
use datafusion_common::field_util::FieldExt;
use datafusion_common::field_util::SchemaExt;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::*;
use store_api::manifest::Manifest;
use table::requests::InsertRequest;
use table::requests::{AlterKind, InsertRequest};
use super::*;
use crate::table::test_util;
use crate::table::test_util::MockRegion;
use crate::table::test_util::{MockRegion, TABLE_NAME};
#[test]
fn test_region_name() {
@@ -617,4 +641,50 @@ mod tests {
assert_eq!(8589934602, region_id(2, 10));
assert_eq!(18446744069414584330, region_id(u32::MAX, 10));
}
#[tokio::test]
async fn test_alter_table_add_column() {
let (_engine, table_engine, table, _object_store, _dir) =
test_util::setup_mock_engine_and_table().await;
let table = table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap();
let table_info = table.table_info();
let old_info = (*table_info).clone();
let old_meta = &old_info.meta;
let old_schema = &old_meta.schema;
let new_column = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let req = AlterTableRequest {
catalog_name: None,
schema_name: None,
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::AddColumn {
new_column: new_column.clone(),
},
};
let table = table_engine
.alter_table(&EngineContext::default(), req)
.await
.unwrap();
let table = table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap();
let new_info = table.table_info();
let new_meta = &new_info.meta;
let new_schema = &new_meta.schema;
assert_eq!(new_schema.num_columns(), old_schema.num_columns() + 1);
assert_eq!(
new_schema.column_schemas().split_last().unwrap(),
(&new_column, old_schema.column_schemas())
);
assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column());
assert_eq!(new_schema.version(), old_schema.version() + 1);
assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 1);
}
}

View File

@@ -130,6 +130,33 @@ pub enum Error {
table_name: String,
},
#[snafu(display("Table not found: {}", table_name))]
TableNotFound {
backtrace: Backtrace,
table_name: String,
},
#[snafu(display("Column {} already exists in table {}", column_name, table_name))]
ColumnExists {
backtrace: Backtrace,
column_name: String,
table_name: String,
},
#[snafu(display("Failed to build schema, msg: {}, source: {}", msg, source))]
SchemaBuild {
#[snafu(backtrace)]
source: datatypes::error::Error,
msg: String,
},
#[snafu(display("Failed to alter table {}, source: {}", table_name, source))]
AlterTable {
table_name: String,
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display(
"Projected columnd not found in region, column: {}",
column_qualified_name
@@ -155,6 +182,10 @@ impl ErrorExt for Error {
match self {
CreateRegion { source, .. } | OpenRegion { source, .. } => source.status_code(),
AlterTable { source, .. } => source.status_code(),
SchemaBuild { source, .. } => source.status_code(),
BuildRowKeyDescriptor { .. }
| BuildColumnDescriptor { .. }
| BuildColumnFamilyDescriptor { .. }
@@ -162,8 +193,10 @@ impl ErrorExt for Error {
| BuildTableInfo { .. }
| BuildRegionDescriptor { .. }
| TableExists { .. }
| ColumnExists { .. }
| ProjectedColumnNotFound { .. }
| MissingTimestampIndex { .. } => StatusCode::InvalidArguments,
| MissingTimestampIndex { .. }
| TableNotFound { .. } => StatusCode::InvalidArguments,
TableInfoNotFound { .. } => StatusCode::Unexpected,

View File

@@ -3,31 +3,38 @@ pub mod test_util;
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_telemetry::logging;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use futures::task::{Context, Poll};
use futures::Stream;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::RegionDescriptorBuilder;
use store_api::storage::{
ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot,
WriteContext, WriteRequest,
};
use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult};
use table::requests::InsertRequest;
use table::metadata::{TableInfoRef, TableMetaBuilder};
use table::requests::{AlterKind, AlterTableRequest, InsertRequest};
use table::{
metadata::{TableInfo, TableType},
table::Table,
};
use tokio::sync::Mutex;
use crate::engine::{build_column_family, build_row_key_desc, INIT_COLUMN_ID};
use crate::error::{
ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu,
self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu,
UpdateTableManifestSnafu,
};
use crate::manifest::action::*;
@@ -41,9 +48,11 @@ fn table_manifest_dir(table_name: &str) -> String {
/// [Table] implementation.
pub struct MitoTable<R: Region> {
manifest: TableManifest,
table_info: TableInfo,
// guarded by `self.alter_lock`
table_info: ArcSwap<TableInfo>,
// TODO(dennis): a table contains multi regions
region: R,
alter_lock: Mutex<()>,
}
#[async_trait]
@@ -53,7 +62,7 @@ impl<R: Region> Table for MitoTable<R> {
}
fn schema(&self) -> SchemaRef {
self.table_info.meta.schema.clone()
self.table_info().meta.schema.clone()
}
async fn insert(&self, request: InsertRequest) -> TableResult<usize> {
@@ -65,8 +74,10 @@ impl<R: Region> Table for MitoTable<R> {
let mut put_op = write_request.put_op();
let mut columns_values = request.columns_values;
let key_columns = self.table_info.meta.row_key_column_names();
let value_columns = self.table_info.meta.value_column_names();
let table_info = self.table_info();
let key_columns = table_info.meta.row_key_column_names();
let value_columns = table_info.meta.value_column_names();
//Add row key and columns
for name in key_columns {
@@ -101,7 +112,7 @@ impl<R: Region> Table for MitoTable<R> {
}
fn table_type(&self) -> TableType {
self.table_info.table_type
self.table_info().table_type
}
async fn scan(
@@ -140,6 +151,138 @@ impl<R: Region> Table for MitoTable<R> {
Ok(Box::pin(ChunkStream { schema, stream }))
}
// Alter table changes the schemas of the table. The altering happens as cloning a new schema,
// change the new one, and swap the old. Though we can change the schema in place, considering
// the complex interwinding of inner data representation of schema, I think it's safer to
// change it like this to avoid partial inconsistent during the altering. For example, schema's
// `name_to_index` field must changed with `column_schemas` synchronously. If we add or remove
// columns from `column_schemas` *and then* update the `name_to_index`, there's a slightly time
// window of an inconsistency of the two field, which might bring some hard to trace down
// concurrency related bugs or failures. (Of course we could introduce some guards like readwrite
// lock to protect the consistency of schema altering, but that would hurt the performance of
// schema reads, and the reads are the dominant operation of schema. At last, altering is
// performed far lesser frequent.)
async fn alter(&self, req: AlterTableRequest) -> TableResult<()> {
let _lock = self.alter_lock.lock().await;
let table_info = self.table_info();
let table_name = &table_info.name;
let table_meta = &table_info.meta;
let table_schema = match &req.alter_kind {
AlterKind::AddColumn { new_column } => {
build_table_schema_with_new_column(table_name, &table_meta.schema, new_column)?
}
};
let primary_key_indices = &table_meta.primary_key_indices;
let (next_column_id, default_cf) = build_column_family(
INIT_COLUMN_ID,
table_name,
&table_schema,
primary_key_indices,
)?;
let (next_column_id, row_key) = build_row_key_desc(
next_column_id,
table_name,
&table_schema,
primary_key_indices,
)?;
let new_meta = TableMetaBuilder::default()
.schema(table_schema.clone())
.engine(&table_meta.engine)
.next_column_id(next_column_id)
.primary_key_indices(primary_key_indices.clone())
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
let mut new_info = TableInfo::clone(&*table_info);
new_info.ident.version = table_info.ident.version + 1;
new_info.meta = new_meta;
// first alter region
let region = self.region();
let region_descriptor = RegionDescriptorBuilder::default()
.id(region.id())
.name(region.name())
.row_key(row_key)
.default_cf(default_cf)
.build()
.context(error::BuildRegionDescriptorSnafu {
table_name,
region_name: region.name(),
})?;
logging::debug!(
"start altering region {} of table {}, with new region descriptor {:?}",
region.name(),
table_name,
region_descriptor
);
region.alter(region_descriptor).map_err(TableError::new)?;
// then alter table info
logging::debug!(
"start updating the manifest of table {} with new table info {:?}",
table_name,
new_info
);
self.manifest
.update(TableMetaActionList::new(vec![
TableMetaAction::Protocol(ProtocolAction::new()),
TableMetaAction::Change(Box::new(TableChange {
table_info: new_info.clone(),
})),
]))
.await
.context(UpdateTableManifestSnafu {
table_name: &self.table_info().name,
})?;
self.set_table_info(new_info);
// TODO(LFC): Think of a way to properly handle the metadata integrity between region and table.
// Currently there are no "transactions" to alter the metadata of region and table together,
// they are altered in sequence. That means there might be cases where the metadata of region
// is altered while the table's is not. Then the metadata integrity between region and
// table cannot be hold.
Ok(())
}
}
fn build_table_schema_with_new_column(
table_name: &str,
table_schema: &SchemaRef,
new_column: &ColumnSchema,
) -> Result<SchemaRef> {
if table_schema
.column_schema_by_name(&new_column.name)
.is_some()
{
return error::ColumnExistsSnafu {
column_name: &new_column.name,
table_name,
}
.fail()?;
}
let mut columns = table_schema.column_schemas().to_vec();
columns.push(new_column.clone());
// Right now we are not support adding the column
// before or after some column, so just clone a new schema like this.
// TODO(LFC): support adding column before or after some column
let mut builder = SchemaBuilder::from_columns(columns).version(table_schema.version() + 1);
if let Some(index) = table_schema.timestamp_index() {
builder = builder.timestamp_index(index);
}
for (k, v) in table_schema.arrow_schema().metadata.iter() {
builder = builder.add_metadata(k, v);
}
let new_schema = Arc::new(builder.build().with_context(|_| error::SchemaBuildSnafu {
msg: format!("cannot add new column {:?}", new_column),
})?);
Ok(new_schema)
}
struct ChunkStream {
@@ -169,9 +312,10 @@ fn column_qualified_name(table_name: &str, region_name: &str, column_name: &str)
impl<R: Region> MitoTable<R> {
fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self {
Self {
table_info,
table_info: ArcSwap::new(Arc::new(table_info)),
region,
manifest,
alter_lock: Mutex::new(()),
}
}
@@ -182,7 +326,9 @@ impl<R: Region> MitoTable<R> {
region: &R,
projection: Option<Vec<usize>>,
) -> Result<Option<Vec<usize>>> {
let table_schema = &self.table_info.meta.schema;
let table_info = self.table_info();
let table_schema = &table_info.meta.schema;
let region_meta = region.in_memory_metadata();
let region_schema = region_meta.schema();
@@ -198,7 +344,7 @@ impl<R: Region> MitoTable<R> {
region_schema.column_index_by_name(name).with_context(|| {
ProjectedColumnNotFoundSnafu {
column_qualified_name: column_qualified_name(
&self.table_info.name,
&table_info.name,
region.name(),
name,
),
@@ -217,7 +363,7 @@ impl<R: Region> MitoTable<R> {
region_schema.column_index_by_name(name).with_context(|| {
ProjectedColumnNotFoundSnafu {
column_qualified_name: column_qualified_name(
&self.table_info.name,
&table_info.name,
region.name(),
name,
),
@@ -311,8 +457,17 @@ impl<R: Region> MitoTable<R> {
}
#[inline]
pub fn table_info(&self) -> &TableInfo {
&self.table_info
pub fn region(&self) -> &R {
&self.region
}
#[inline]
pub fn table_info(&self) -> TableInfoRef {
Arc::clone(&self.table_info.load())
}
pub fn set_table_info(&self, table_info: TableInfo) {
self.table_info.swap(Arc::new(table_info));
}
#[inline]
@@ -328,6 +483,8 @@ impl<R: Region> MitoTable<R> {
#[cfg(test)]
mod tests {
use datatypes::prelude::ConcreteDataType;
use super::*;
#[test]
@@ -335,4 +492,36 @@ mod tests {
assert_eq!("demo/manifest/", table_manifest_dir("demo"));
assert_eq!("numbers/manifest/", table_manifest_dir("numbers"));
}
#[test]
fn test_build_table_schema_with_new_column() {
let table_info = test_util::build_test_table_info();
let table_name = &table_info.name;
let table_meta = &table_info.meta;
let table_schema = &table_meta.schema;
let new_column = ColumnSchema::new("host", ConcreteDataType::string_datatype(), true);
let result = build_table_schema_with_new_column(table_name, table_schema, &new_column);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Column host already exists in table demo"));
let new_column = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_schema =
build_table_schema_with_new_column(table_name, table_schema, &new_column).unwrap();
assert_eq!(new_schema.num_columns(), table_schema.num_columns() + 1);
assert_eq!(
new_schema.column_schemas().split_last().unwrap(),
(&new_column, table_schema.column_schemas())
);
assert_eq!(
new_schema.timestamp_column(),
table_schema.timestamp_column()
);
assert_eq!(new_schema.version(), table_schema.version() + 1);
}
}

View File

@@ -1,23 +1,27 @@
//! A mock storage engine for table test purpose.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use common_error::mock::MockError;
use common_telemetry::logging;
use storage::metadata::{RegionMetaImpl, RegionMetadataRef};
use storage::write_batch::WriteBatch;
use datatypes::prelude::{Value, VectorBuilder, VectorRef};
use datatypes::schema::ColumnSchema;
use storage::metadata::{RegionMetaImpl, RegionMetadata};
use storage::write_batch::{Mutation, WriteBatch};
use store_api::storage::{
Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions,
ReadContext, Region, RegionDescriptor, RegionMeta, ScanRequest, ScanResponse, SchemaRef,
Snapshot, StorageEngine, WriteContext, WriteResponse,
ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest, ScanResponse,
SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse,
};
pub type Result<T> = std::result::Result<T, MockError>;
pub struct MockChunkReader {
schema: SchemaRef,
memtable: MockMemtable,
}
#[async_trait]
@@ -29,12 +33,27 @@ impl ChunkReader for MockChunkReader {
}
async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
Ok(None)
let columns = self
.schema
.column_schemas()
.iter()
.map(|column_schema| {
let data = self.memtable.get(&column_schema.name).unwrap();
let mut builder =
VectorBuilder::with_capacity(column_schema.data_type.clone(), data.len());
for v in data {
builder.push(v);
}
builder.finish()
})
.collect::<Vec<VectorRef>>();
Ok(Some(Chunk::new(columns)))
}
}
pub struct MockSnapshot {
metadata: RegionMetadataRef,
schema: SchemaRef,
region: Arc<MockRegionInner>,
}
#[async_trait]
@@ -43,7 +62,7 @@ impl Snapshot for MockSnapshot {
type Reader = MockChunkReader;
fn schema(&self) -> &SchemaRef {
self.metadata.user_schema()
&self.schema
}
async fn scan(
@@ -51,10 +70,14 @@ impl Snapshot for MockSnapshot {
_ctx: &ReadContext,
_request: ScanRequest,
) -> Result<ScanResponse<MockChunkReader>> {
let reader = MockChunkReader {
schema: self.metadata.user_schema().clone(),
let memtable = {
let memtable = self.region.memtable.read().unwrap();
memtable.clone()
};
let reader = MockChunkReader {
schema: self.schema().clone(),
memtable,
};
Ok(ScanResponse { reader })
}
@@ -69,10 +92,17 @@ impl Snapshot for MockSnapshot {
pub struct MockRegion {
// FIXME(yingwen): Remove this once name is provided by metadata.
name: String,
// We share the same metadata definition with the storage engine.
metadata: RegionMetadataRef,
pub inner: Arc<MockRegionInner>,
}
#[derive(Debug)]
pub struct MockRegionInner {
pub metadata: ArcSwap<RegionMetadata>,
memtable: Arc<RwLock<MockMemtable>>,
}
type MockMemtable = HashMap<String, Vec<Value>>;
#[async_trait]
impl Region for MockRegion {
type Error = MockError;
@@ -80,27 +110,85 @@ impl Region for MockRegion {
type WriteRequest = WriteBatch;
type Snapshot = MockSnapshot;
fn id(&self) -> RegionId {
self.inner.metadata.load().id()
}
fn name(&self) -> &str {
&self.name
}
fn in_memory_metadata(&self) -> RegionMetaImpl {
RegionMetaImpl::new(self.metadata.clone())
RegionMetaImpl::new(self.inner.metadata.load().clone())
}
async fn write(&self, _ctx: &WriteContext, _request: WriteBatch) -> Result<WriteResponse> {
async fn write(&self, _ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
self.inner.write(request);
Ok(WriteResponse {})
}
fn snapshot(&self, _ctx: &ReadContext) -> Result<MockSnapshot> {
Ok(MockSnapshot {
metadata: self.metadata.clone(),
schema: self.inner.metadata.load().user_schema().clone(),
region: self.inner.clone(),
})
}
fn write_request(&self) -> WriteBatch {
WriteBatch::new(self.in_memory_metadata().schema().clone())
}
fn alter(&self, descriptor: RegionDescriptor) -> Result<()> {
let metadata = descriptor.try_into().unwrap();
self.inner.update_metadata(metadata);
Ok(())
}
}
impl MockRegionInner {
fn new(metadata: RegionMetadata) -> Self {
let mut memtable = HashMap::new();
for column in metadata.user_schema().column_schemas() {
memtable.insert(column.name.clone(), vec![]);
}
Self {
metadata: ArcSwap::new(Arc::new(metadata)),
memtable: Arc::new(RwLock::new(memtable)),
}
}
fn update_metadata(&self, metadata: RegionMetadata) {
{
let mut memtable = self.memtable.write().unwrap();
let rows = memtable.values().last().unwrap().len();
// currently dropping columns are not supported, so we only add columns here
for column in metadata.user_schema().column_schemas() {
let _ = memtable
.entry(column.name.clone())
.or_insert_with(|| vec![Value::Null; rows]);
}
}
self.metadata.swap(Arc::new(metadata));
}
fn write(&self, request: WriteBatch) {
let metadata = self.metadata.load();
let mut memtable = self.memtable.write().unwrap();
for Mutation::Put(put) in request.iter() {
for ColumnSchema { name, .. } in metadata.user_schema().column_schemas() {
let column = memtable.get_mut(name).unwrap();
if let Some(data) = put.column_by_name(name) {
(0..data.len()).for_each(|i| column.push(data.get(i)));
} else {
column.extend_from_slice(&vec![Value::Null; put.num_rows()]);
}
}
}
}
}
type RegionMap = HashMap<String, MockRegion>;
@@ -165,7 +253,7 @@ impl StorageEngine for MockEngine {
let metadata = descriptor.try_into().unwrap();
let region = MockRegion {
name: name.clone(),
metadata: Arc::new(metadata),
inner: Arc::new(MockRegionInner::new(metadata)),
};
regions.opened_regions.insert(name, region.clone());

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use datatypes::schema::SchemaRef;
@@ -105,6 +106,8 @@ pub struct TableInfo {
pub table_type: TableType,
}
pub type TableInfoRef = Arc<TableInfo>;
impl TableInfoBuilder {
pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
Self {

View File

@@ -2,7 +2,7 @@
use std::collections::HashMap;
use datatypes::prelude::VectorRef;
use datatypes::schema::SchemaRef;
use datatypes::schema::{ColumnSchema, SchemaRef};
use crate::metadata::TableId;
@@ -38,7 +38,17 @@ pub struct OpenTableRequest {
/// Alter table request
#[derive(Debug)]
pub struct AlterTableRequest {}
pub struct AlterTableRequest {
pub catalog_name: Option<String>,
pub schema_name: Option<String>,
pub table_name: String,
pub alter_kind: AlterKind,
}
#[derive(Debug)]
pub enum AlterKind {
AddColumn { new_column: ColumnSchema },
}
/// Drop table request
#[derive(Debug)]

View File

@@ -10,7 +10,7 @@ use datatypes::schema::SchemaRef;
use crate::error::Result;
use crate::metadata::{FilterPushDownType, TableType};
use crate::requests::InsertRequest;
use crate::requests::{AlterTableRequest, InsertRequest};
/// Table abstraction.
#[async_trait::async_trait]
@@ -49,6 +49,10 @@ pub trait Table: Send + Sync {
fn supports_filter_pushdown(&self, _filter: &Expr) -> Result<FilterPushDownType> {
Ok(FilterPushDownType::Unsupported)
}
async fn alter(&self, _request: AlterTableRequest) -> Result<()> {
unimplemented!()
}
}
pub type TableRef = Arc<dyn Table>;