diff --git a/Cargo.lock b/Cargo.lock index 16ff64bdc5..c1cadcceb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1356,6 +1356,7 @@ dependencies = [ "common-telemetry", "common-time", "datafusion", + "datafusion-common", "datatypes", "futures", "hyper", diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 872387c03a..0637eba966 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -58,6 +58,7 @@ axum-test-helper = "0.1" client = { path = "../client" } common-query = { path = "../common/query" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } tempdir = "0.3" [dev-dependencies.arrow] diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 1838885d59..a4c22290a3 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,7 +1,6 @@ use std::any::Any; use api::serde::DecodeError; -use common_error::ext::BoxedError; use common_error::prelude::*; use datatypes::prelude::ConcreteDataType; use storage::error::Error as StorageError; @@ -40,7 +39,14 @@ pub enum Error { GetTable { table_name: String, #[snafu(backtrace)] - source: BoxedError, + source: TableError, + }, + + #[snafu(display("Failed to alter table {}, source: {}", table_name, source))] + AlterTable { + table_name: String, + #[snafu(backtrace)] + source: TableError, }, #[snafu(display("Table not found: {}", table_name))] @@ -135,8 +141,8 @@ pub enum Error { source: common_runtime::error::Error, }, - #[snafu(display("Invalid CREATE TABLE sql statement, cause: {}", msg))] - InvalidCreateTableSql { msg: String, backtrace: Backtrace }, + #[snafu(display("Invalid SQL, error: {}", msg))] + InvalidSql { msg: String, backtrace: Backtrace }, #[snafu(display("Failed to create schema when creating table, source: {}", source))] CreateSchema { @@ -192,8 +198,9 @@ impl ErrorExt for Error { Error::ExecuteSql { source } => source.status_code(), Error::ExecutePhysicalPlan { source } => source.status_code(), Error::NewCatalog { source } => source.status_code(), - Error::CreateTable { source, .. } => source.status_code(), - Error::GetTable { source, .. } => source.status_code(), + Error::CreateTable { source, .. } + | Error::GetTable { source, .. } + | Error::AlterTable { source, .. } => source.status_code(), Error::Insert { source, .. } => source.status_code(), Error::ConvertSchema { source, .. } => source.status_code(), Error::TableNotFound { .. } => StatusCode::TableNotFound, @@ -203,7 +210,7 @@ impl ErrorExt for Error { | Error::ColumnTypeMismatch { .. } | Error::IllegalInsertData { .. } | Error::DecodeInsert { .. } - | Error::InvalidCreateTableSql { .. } + | Error::InvalidSql { .. } | Error::SqlTypeNotSupported { .. } | Error::CreateSchema { .. } | Error::KeyColumnNotFound { .. } @@ -243,6 +250,7 @@ impl From for tonic::Status { #[cfg(test)] mod tests { + use common_error::ext::BoxedError; use common_error::mock::MockError; use super::*; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 2485a30602..0acf68ce4f 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -39,7 +39,7 @@ type DefaultEngine = MitoEngine>; pub struct Instance { // Query service query_engine: QueryEngineRef, - sql_handler: SqlHandler, + sql_handler: SqlHandler, // Catalog list catalog_manager: CatalogManagerRef, physical_planner: PhysicalPlanner, @@ -62,8 +62,10 @@ impl Instance { ), object_store, ); + let table_engine = Arc::new(table_engine); + let catalog_manager = Arc::new( - catalog::LocalCatalogManager::try_new(Arc::new(table_engine.clone())) + catalog::LocalCatalogManager::try_new(table_engine.clone()) .await .context(NewCatalogSnafu)?, ); @@ -150,7 +152,10 @@ impl Instance { self.sql_handler.execute(SqlRequest::Create(request)).await } - + Statement::Alter(alter_table) => { + let req = self.sql_handler.alter_to_request(alter_table)?; + self.sql_handler.execute(SqlRequest::Alter(req)).await + } _ => unimplemented!(), } } @@ -198,7 +203,7 @@ impl Instance { } } - pub fn sql_handler(&self) -> &SqlHandler { + pub fn sql_handler(&self) -> &SqlHandler { &self.sql_handler } @@ -207,6 +212,43 @@ impl Instance { } } +#[cfg(test)] +impl Instance { + pub async fn new_mock() -> Result { + use table_engine::table::test_util::new_test_object_store; + use table_engine::table::test_util::MockEngine; + use table_engine::table::test_util::MockMitoEngine; + + let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; + let mock_engine = MockMitoEngine::new( + TableEngineConfig::default(), + MockEngine::default(), + object_store, + ); + let mock_engine = Arc::new(mock_engine); + + let catalog_manager = Arc::new( + catalog::LocalCatalogManager::try_new(mock_engine.clone()) + .await + .unwrap(), + ); + + let factory = QueryEngineFactory::new(catalog_manager.clone()); + let query_engine = factory.query_engine().clone(); + + let sql_handler = SqlHandler::new(mock_engine, catalog_manager.clone()); + let physical_planner = PhysicalPlanner::new(query_engine.clone()); + let script_executor = ScriptExecutor::new(query_engine.clone()); + Ok(Self { + query_engine, + sql_handler, + catalog_manager, + physical_planner, + script_executor, + }) + } +} + async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { // TODO(dennis): supports other backend let data_dir = util::normalize_dir(match store_config { diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 248c36a5db..cb5f0cf039 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -1,17 +1,19 @@ //! sql handler -use std::sync::Arc; - use catalog::CatalogManagerRef; -use common_error::ext::BoxedError; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use datatypes::types::DateTimeType; use query::query_engine::Output; use snafu::{OptionExt, ResultExt}; -use table::engine::{EngineContext, TableEngine}; +use sql::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName}; +use table::engine::{EngineContext, TableEngineRef}; use table::requests::*; use table::TableRef; -use crate::error::{GetTableSnafu, Result, TableNotFoundSnafu}; +use crate::error::{self, GetTableSnafu, Result, TableNotFoundSnafu}; +mod alter; mod create; mod insert; @@ -19,18 +21,19 @@ mod insert; pub enum SqlRequest { Insert(InsertRequest), Create(CreateTableRequest), + Alter(AlterTableRequest), } // Handler to execute SQL except query -pub struct SqlHandler { - table_engine: Arc, +pub struct SqlHandler { + table_engine: TableEngineRef, catalog_manager: CatalogManagerRef, } -impl SqlHandler { - pub fn new(table_engine: Engine, catalog_manager: CatalogManagerRef) -> Self { +impl SqlHandler { + pub fn new(table_engine: TableEngineRef, catalog_manager: CatalogManagerRef) -> Self { Self { - table_engine: Arc::new(table_engine), + table_engine, catalog_manager, } } @@ -39,22 +42,92 @@ impl SqlHandler { match request { SqlRequest::Insert(req) => self.insert(req).await, SqlRequest::Create(req) => self.create(req).await, + SqlRequest::Alter(req) => self.alter(req).await, } } pub(crate) fn get_table(&self, table_name: &str) -> Result { self.table_engine .get_table(&EngineContext::default(), table_name) - .map_err(BoxedError::new) .context(GetTableSnafu { table_name })? .context(TableNotFoundSnafu { table_name }) } - pub fn table_engine(&self) -> Arc { + pub fn table_engine(&self) -> TableEngineRef { self.table_engine.clone() } } +/// Converts maybe fully-qualified table name (`..` or `
` when +/// catalog and schema are default) to tuple. +fn table_idents_to_full_name( + obj_name: &ObjectName, +) -> Result<(Option, Option, String)> { + match &obj_name.0[..] { + [table] => Ok((None, None, table.value.clone())), + [catalog, schema, table] => Ok(( + Some(catalog.value.clone()), + Some(schema.value.clone()), + table.value.clone(), + )), + _ => error::InvalidSqlSnafu { + msg: format!( + "expect table name to be ..
or
, actual: {}", + obj_name + ), + } + .fail(), + } +} + +fn column_def_to_schema(column_def: &ColumnDef) -> Result { + let is_nullable = column_def + .options + .iter() + .any(|o| matches!(o.option, ColumnOption::Null)); + Ok(ColumnSchema { + name: column_def.name.value.clone(), + data_type: sql_data_type_to_concrete_data_type(&column_def.data_type)?, + is_nullable, + }) +} + +fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result { + match data_type { + SqlDataType::BigInt(_) => Ok(ConcreteDataType::int64_datatype()), + SqlDataType::Int(_) => Ok(ConcreteDataType::int32_datatype()), + SqlDataType::SmallInt(_) => Ok(ConcreteDataType::int16_datatype()), + SqlDataType::Char(_) + | SqlDataType::Varchar(_) + | SqlDataType::Text + | SqlDataType::String => Ok(ConcreteDataType::string_datatype()), + SqlDataType::Float(_) => Ok(ConcreteDataType::float32_datatype()), + SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()), + SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()), + SqlDataType::Date => Ok(ConcreteDataType::date_datatype()), + SqlDataType::Custom(obj_name) => match &obj_name.0[..] { + [type_name] => { + if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) { + Ok(ConcreteDataType::datetime_datatype()) + } else { + error::SqlTypeNotSupportedSnafu { + t: data_type.clone(), + } + .fail() + } + } + _ => error::SqlTypeNotSupportedSnafu { + t: data_type.clone(), + } + .fail(), + }, + _ => error::SqlTypeNotSupportedSnafu { + t: data_type.clone(), + } + .fail(), + } +} + #[cfg(test)] mod tests { use std::any::Any; @@ -165,9 +238,10 @@ mod tests { ), object_store, ); + let table_engine = Arc::new(table_engine); let catalog_list = Arc::new( - catalog::LocalCatalogManager::try_new(Arc::new(table_engine.clone())) + catalog::LocalCatalogManager::try_new(table_engine.clone()) .await .unwrap(), ); diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs new file mode 100644 index 0000000000..ccdec5bd36 --- /dev/null +++ b/src/datanode/src/sql/alter.rs @@ -0,0 +1,91 @@ +use query::query_engine::Output; +use snafu::prelude::*; +use sql::statements::alter::{AlterTable, AlterTableOperation}; +use table::engine::EngineContext; +use table::requests::{AlterKind, AlterTableRequest}; + +use crate::error::{self, Result}; +use crate::sql::{column_def_to_schema, table_idents_to_full_name, SqlHandler}; + +impl SqlHandler { + pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result { + let ctx = EngineContext {}; + let table_name = &req.table_name.clone(); + if !self.table_engine.table_exists(&ctx, table_name) { + return error::TableNotFoundSnafu { table_name }.fail(); + } + self.table_engine + .alter_table(&ctx, req) + .await + .context(error::AlterTableSnafu { table_name })?; + // Tried in MySQL, it really prints "Affected Rows: 0". + Ok(Output::AffectedRows(0)) + } + + pub(crate) fn alter_to_request(&self, alter_table: AlterTable) -> Result { + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(alter_table.table_name())?; + + let alter_kind = match alter_table.alter_operation() { + AlterTableOperation::AddConstraint(table_constraint) => { + return error::InvalidSqlSnafu { + msg: format!("unsupported table constraint {}", table_constraint), + } + .fail() + } + AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumn { + new_column: column_def_to_schema(column_def)?, + }, + }; + Ok(AlterTableRequest { + catalog_name, + schema_name, + table_name, + alter_kind, + }) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use datatypes::prelude::ConcreteDataType; + use sql::dialect::GenericDialect; + use sql::parser::ParserContext; + use sql::statements::statement::Statement; + + use super::*; + use crate::tests::test_util::create_mock_sql_handler; + + fn parse_sql(sql: &str) -> AlterTable { + let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, stmt.len()); + let stmt = stmt.remove(0); + assert_matches!(stmt, Statement::Alter(_)); + match stmt { + Statement::Alter(alter_table) => alter_table, + _ => unreachable!(), + } + } + + #[tokio::test] + async fn test_alter_to_request_with_adding_column() { + let handler = create_mock_sql_handler().await; + let alter_table = parse_sql("ALTER TABLE my_metric_1 ADD tagk_i STRING Null;"); + let req = handler.alter_to_request(alter_table).unwrap(); + assert_eq!(req.catalog_name, None); + assert_eq!(req.schema_name, None); + assert_eq!(req.table_name, "my_metric_1"); + + let alter_kind = req.alter_kind; + assert_matches!(alter_kind, AlterKind::AddColumn { .. }); + match alter_kind { + AlterKind::AddColumn { new_column } => { + assert_eq!(new_column.name, "tagk_i"); + assert!(new_column.is_nullable); + assert_eq!(new_column.data_type, ConcreteDataType::string_datatype()); + } + } + } +} diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 5f8924762a..6e9ad77ac4 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -3,25 +3,23 @@ use std::sync::Arc; use catalog::RegisterTableRequest; use common_telemetry::tracing::info; -use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, SchemaBuilder}; -use datatypes::types::DateTimeType; +use datatypes::schema::SchemaBuilder; use query::query_engine::Output; use snafu::{OptionExt, ResultExt}; -use sql::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName, TableConstraint}; +use sql::ast::TableConstraint; use sql::statements::create_table::CreateTable; use store_api::storage::consts::TIME_INDEX_NAME; -use table::engine::{EngineContext, TableEngine}; +use table::engine::EngineContext; use table::metadata::TableId; use table::requests::*; use crate::error::{ - ConstraintNotSupportedSnafu, CreateSchemaSnafu, CreateTableSnafu, InsertSystemCatalogSnafu, - InvalidCreateTableSqlSnafu, KeyColumnNotFoundSnafu, Result, SqlTypeNotSupportedSnafu, + self, ConstraintNotSupportedSnafu, CreateSchemaSnafu, CreateTableSnafu, + InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu, Result, }; -use crate::sql::SqlHandler; +use crate::sql::{column_def_to_schema, table_idents_to_full_name, SqlHandler}; -impl SqlHandler { +impl SqlHandler { pub(crate) async fn create(&self, req: CreateTableRequest) -> Result { let ctx = EngineContext {}; let catalog_name = req.catalog_name.clone(); @@ -63,7 +61,7 @@ impl SqlHandler { let mut ts_index = usize::MAX; let mut primary_keys = vec![]; - let (catalog_name, schema_name, table_name) = table_idents_to_full_name(stmt.name)?; + let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&stmt.name)?; let col_map = stmt .columns @@ -88,7 +86,7 @@ impl SqlHandler { }, )?; } else { - return InvalidCreateTableSqlSnafu { + return error::InvalidSqlSnafu { msg: format!("Cannot recognize named UNIQUE constraint: {}", name), } .fail(); @@ -102,7 +100,7 @@ impl SqlHandler { )?); } } else { - return InvalidCreateTableSqlSnafu { + return error::InvalidSqlSnafu { msg: format!( "Unrecognized non-primary unnamed UNIQUE constraint: {:?}", name @@ -156,93 +154,21 @@ impl SqlHandler { } } -/// Converts maybe fully-qualified table name (`..
` or `
` when catalog and schema are default) -/// to tuples -fn table_idents_to_full_name( - obj_name: ObjectName, -) -> Result<(Option, Option, String)> { - match &obj_name.0[..] { - [table] => Ok((None, None, table.value.clone())), - [catalog, schema, table] => Ok(( - Some(catalog.value.clone()), - Some(schema.value.clone()), - table.value.clone(), - )), - _ => InvalidCreateTableSqlSnafu { - msg: format!( - "table name can only be ..
or
, but found: {}", - obj_name - ), - } - .fail(), - } -} - -fn column_def_to_schema(def: &ColumnDef) -> Result { - let is_nullable = def - .options - .iter() - .any(|o| matches!(o.option, ColumnOption::Null)); - Ok(ColumnSchema { - name: def.name.value.clone(), - data_type: sql_data_type_to_concrete_data_type(&def.data_type)?, - is_nullable, - }) -} - -fn sql_data_type_to_concrete_data_type(t: &SqlDataType) -> Result { - match t { - SqlDataType::BigInt(_) => Ok(ConcreteDataType::int64_datatype()), - SqlDataType::Int(_) => Ok(ConcreteDataType::int32_datatype()), - SqlDataType::SmallInt(_) => Ok(ConcreteDataType::int16_datatype()), - SqlDataType::Char(_) - | SqlDataType::Varchar(_) - | SqlDataType::Text - | SqlDataType::String => Ok(ConcreteDataType::string_datatype()), - SqlDataType::Float(_) => Ok(ConcreteDataType::float32_datatype()), - SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()), - SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()), - SqlDataType::Date => Ok(ConcreteDataType::date_datatype()), - SqlDataType::Custom(obj_name) => match &obj_name.0[..] { - [type_name] => { - if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) { - Ok(ConcreteDataType::datetime_datatype()) - } else { - SqlTypeNotSupportedSnafu { t: t.clone() }.fail() - } - } - _ => SqlTypeNotSupportedSnafu { t: t.clone() }.fail(), - }, - _ => SqlTypeNotSupportedSnafu { t: t.clone() }.fail(), - } -} - #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use datatypes::prelude::ConcreteDataType; use sql::ast::Ident; + use sql::ast::{DataType as SqlDataType, ObjectName}; use sql::dialect::GenericDialect; use sql::parser::ParserContext; use sql::statements::statement::Statement; - use table_engine::config::EngineConfig; - use table_engine::engine::MitoEngine; - use table_engine::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; use super::*; use crate::error::Error; - - async fn create_mock_sql_handler() -> SqlHandler> { - let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; - let mock_engine = - MockMitoEngine::new(EngineConfig::default(), MockEngine::default(), object_store); - let catalog_manager = Arc::new( - catalog::LocalCatalogManager::try_new(Arc::new(mock_engine.clone())) - .await - .unwrap(), - ); - SqlHandler::new(mock_engine, catalog_manager) - } + use crate::sql::sql_data_type_to_concrete_data_type; + use crate::tests::test_util::create_mock_sql_handler; fn sql_to_statement(sql: &str) -> CreateTable { let mut res = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index 949a0b19d3..40bc9b55c0 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -10,7 +10,6 @@ use snafu::OptionExt; use snafu::ResultExt; use sql::ast::Value as SqlValue; use sql::statements::insert::Insert; -use table::engine::TableEngine; use table::requests::*; use crate::error::{ @@ -19,7 +18,7 @@ use crate::error::{ }; use crate::sql::{SqlHandler, SqlRequest}; -impl SqlHandler { +impl SqlHandler { pub(crate) async fn insert(&self, req: InsertRequest) -> Result { let table_name = &req.table_name.to_string(); let table = self.get_table(table_name)?; diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 2a86829db9..d62fcaddc1 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -1,5 +1,7 @@ use arrow::array::UInt64Array; use common_recordbatch::util; +use datafusion::arrow_print; +use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use query::Output; use crate::instance::Instance; @@ -61,8 +63,6 @@ pub async fn test_execute_create() { let instance = Instance::new(&opts).await.unwrap(); instance.start().await.unwrap(); - test_util::create_test_table(&instance).await.unwrap(); - let output = instance .execute_sql( r#"create table test_table( @@ -78,3 +78,63 @@ pub async fn test_execute_create() { .unwrap(); assert!(matches!(output, Output::AffectedRows(1))); } + +#[tokio::test] +async fn test_alter_table() { + common_telemetry::init_default_ut_logging(); + + // TODO(LFC) Use real Mito engine when we can alter its region schema, + // and delete the `new_mock` method. + let instance = Instance::new_mock().await.unwrap(); + instance.start().await.unwrap(); + + test_util::create_test_table(&instance).await.unwrap(); + // make sure table insertion is ok before altering table + instance + .execute_sql("insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000)") + .await + .unwrap(); + + let output = instance + .execute_sql("alter table demo add my_tag string null") + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let output = instance + .execute_sql( + "insert into demo(host, cpu, memory, ts, my_tag) values ('host2', 2.2, 200, 2000, 'hello')", + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + let output = instance + .execute_sql("insert into demo(host, cpu, memory, ts) values ('host3', 3.3, 300, 3000)") + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let output = instance.execute_sql("select * from demo").await.unwrap(); + match output { + Output::RecordBatch(stream) => { + let recordbatches = util::collect(stream).await.unwrap(); + let recordbatch = recordbatches + .into_iter() + .map(|r| r.df_recordbatch) + .collect::>(); + let pretty_print = arrow_print::write(&recordbatch); + let pretty_print = pretty_print.lines().collect::>(); + let expected = vec![ + "+-------+------+-----+--------+--------+", + "| host | ts | cpu | memory | my_tag |", + "+-------+------+-----+--------+--------+", + "| host1 | 1000 | 1.1 | 100 | |", + "| host2 | 2000 | 2.2 | 200 | hello |", + "| host3 | 3000 | 3.3 | 300 | |", + "+-------+------+-----+--------+--------+", + ]; + assert_eq!(pretty_print, expected); + } + _ => unreachable!(), + } +} diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 110bd1e4c3..58a7b95230 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -8,11 +8,14 @@ use snafu::ResultExt; use table::engine::EngineContext; use table::engine::TableEngineRef; use table::requests::CreateTableRequest; +use table_engine::config::EngineConfig; +use table_engine::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; use tempdir::TempDir; use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{CreateTableSnafu, Result}; use crate::instance::Instance; +use crate::sql::SqlHandler; /// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`, /// Only for test. @@ -66,7 +69,7 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { .expect("ts is expected to be timestamp column"), ), create_if_not_exists: true, - primary_key_indices: Vec::default(), + primary_key_indices: vec![3, 0], // "host" and "ts" are primary keys table_options: HashMap::new(), }, ) @@ -84,3 +87,18 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { .unwrap(); Ok(()) } + +pub async fn create_mock_sql_handler() -> SqlHandler { + let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; + let mock_engine = Arc::new(MockMitoEngine::new( + EngineConfig::default(), + MockEngine::default(), + object_store, + )); + let catalog_manager = Arc::new( + catalog::LocalCatalogManager::try_new(mock_engine.clone()) + .await + .unwrap(), + ); + SqlHandler::new(mock_engine, catalog_manager) +} diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 165233a2a4..436e9649e8 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -49,7 +49,7 @@ where todo!("Currently not supported") } Statement::Query(qb) => self.query_to_plan(qb), - Statement::Create(_) | Statement::Insert(_) => unreachable!(), + Statement::Create(_) | Statement::Alter(_) | Statement::Insert(_) => unreachable!(), } } } diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index da5c607809..c2ecb0a431 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -77,6 +77,8 @@ impl<'a> ParserContext<'a> { Keyword::SELECT | Keyword::WITH | Keyword::VALUES => self.parse_query(), + Keyword::ALTER => self.parse_alter(), + // todo(hl) support more statements. _ => self.unsupported(self.peek_token_as_string()), } diff --git a/src/sql/src/parsers.rs b/src/sql/src/parsers.rs index 71117e627e..e32ee1e6c3 100644 --- a/src/sql/src/parsers.rs +++ b/src/sql/src/parsers.rs @@ -1,3 +1,4 @@ +mod alter_parser; pub(crate) mod create_parser; pub(crate) mod insert_parser; pub(crate) mod query_parser; diff --git a/src/sql/src/parsers/alter_parser.rs b/src/sql/src/parsers/alter_parser.rs new file mode 100644 index 0000000000..ec23c20388 --- /dev/null +++ b/src/sql/src/parsers/alter_parser.rs @@ -0,0 +1,79 @@ +use snafu::ResultExt; +use sqlparser::keywords::Keyword; +use sqlparser::parser::ParserError; + +use crate::error; +use crate::parser::ParserContext; +use crate::parser::Result; +use crate::statements::alter::{AlterTable, AlterTableOperation}; +use crate::statements::statement::Statement; + +impl<'a> ParserContext<'a> { + pub(crate) fn parse_alter(&mut self) -> Result { + let alter_table = self.parse().context(error::SyntaxSnafu { sql: self.sql })?; + Ok(Statement::Alter(alter_table)) + } + + fn parse(&mut self) -> std::result::Result { + let parser = &mut self.parser; + parser.expect_keywords(&[Keyword::ALTER, Keyword::TABLE])?; + + let table_name = parser.parse_object_name()?; + + let alter_operation = if parser.parse_keyword(Keyword::ADD) { + if let Some(constraint) = parser.parse_optional_table_constraint()? { + AlterTableOperation::AddConstraint(constraint) + } else { + let _ = parser.parse_keyword(Keyword::COLUMN); + let column_def = parser.parse_column_def()?; + AlterTableOperation::AddColumn { column_def } + } + } else { + return Err(ParserError::ParserError(format!( + "expect ADD or DROP after ALTER TABLE, found {}", + parser.peek_token() + ))); + }; + Ok(AlterTable::new(table_name, alter_operation)) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use sqlparser::ast::{ColumnOption, DataType}; + use sqlparser::dialect::GenericDialect; + + use super::*; + + #[test] + fn test_parse_alter_add_column() { + let sql = "ALTER TABLE my_metric_1 ADD tagk_i STRING Null;"; + let mut result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + assert_matches!(statement, Statement::Alter { .. }); + match statement { + Statement::Alter(alter_table) => { + assert_eq!("my_metric_1", alter_table.table_name().0[0].value); + + let alter_operation = alter_table.alter_operation(); + assert_matches!(alter_operation, AlterTableOperation::AddColumn { .. }); + match alter_operation { + AlterTableOperation::AddColumn { column_def } => { + assert_eq!("tagk_i", column_def.name.value); + assert_eq!(DataType::String, column_def.data_type); + assert!(column_def + .options + .iter() + .any(|o| matches!(o.option, ColumnOption::Null))); + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } + } +} diff --git a/src/sql/src/parsers/query_parser.rs b/src/sql/src/parsers/query_parser.rs index 603328fd71..9f055fc51b 100644 --- a/src/sql/src/parsers/query_parser.rs +++ b/src/sql/src/parsers/query_parser.rs @@ -22,7 +22,6 @@ impl<'a> ParserContext<'a> { mod tests { use sqlparser::dialect::GenericDialect; - use super::*; use crate::parser::ParserContext; #[test] @@ -36,19 +35,13 @@ mod tests { } #[test] - pub fn test_parser_invalid_query() { - // sql without selection - let sql = "SELECT FROM table_1"; - - let parser = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); - match &parser[0] { - Statement::ShowDatabases(_) => { - panic!("Not expected to be a show database statement") - } - Statement::Insert(_) => { - panic!("Not expected to be a show database statement") - } - Statement::Create(_) | Statement::Query(_) => {} - } + pub fn test_parse_invalid_query() { + let sql = "SELECT * FROM table_1 WHERE"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Expected an expression")); } } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 20230ce292..bdd8b1b317 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -1,3 +1,4 @@ +pub mod alter; pub mod create_table; pub mod insert; pub mod query; diff --git a/src/sql/src/statements/alter.rs b/src/sql/src/statements/alter.rs new file mode 100644 index 0000000000..cdd2ea7ea8 --- /dev/null +++ b/src/sql/src/statements/alter.rs @@ -0,0 +1,32 @@ +use sqlparser::ast::{ColumnDef, ObjectName, TableConstraint}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AlterTable { + table_name: ObjectName, + alter_operation: AlterTableOperation, +} + +impl AlterTable { + pub(crate) fn new(table_name: ObjectName, alter_operation: AlterTableOperation) -> Self { + Self { + table_name, + alter_operation, + } + } + + pub fn table_name(&self) -> &ObjectName { + &self.table_name + } + + pub fn alter_operation(&self) -> &AlterTableOperation { + &self.alter_operation + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum AlterTableOperation { + /// `ADD ` + AddConstraint(TableConstraint), + /// `ADD [ COLUMN ] ` + AddColumn { column_def: ColumnDef }, +} diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index da185e0c16..a5ede43c39 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -1,6 +1,7 @@ use sqlparser::ast::Statement as SpStatement; use sqlparser::parser::ParserError; +use crate::statements::alter::AlterTable; use crate::statements::create_table::CreateTable; use crate::statements::insert::Insert; use crate::statements::query::Query; @@ -20,6 +21,9 @@ pub enum Statement { /// CREATE TABLE Create(CreateTable), + + /// ALTER TABLE + Alter(AlterTable), } /// Converts Statement to sqlparser statement @@ -33,7 +37,7 @@ impl TryFrom for SpStatement { )), Statement::Query(s) => Ok(SpStatement::Query(Box::new(s.inner))), Statement::Insert(i) => Ok(i.inner), - Statement::Create(_) => unimplemented!(), + Statement::Create(_) | Statement::Alter(_) => unimplemented!(), } } } diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 2dce68781f..4126961fdc 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -12,7 +12,7 @@ pub mod memtable; pub mod metadata; pub mod proto; mod read; -mod region; +pub mod region; pub mod schema; mod snapshot; mod sst; diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 8ddbe00c07..c693d821ba 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -52,6 +52,10 @@ impl Region for RegionImpl { type WriteRequest = WriteBatch; type Snapshot = SnapshotImpl; + fn id(&self) -> RegionId { + self.inner.shared.id + } + fn name(&self) -> &str { &self.inner.shared.name } diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 5559269d9b..b7373a5603 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -26,6 +26,7 @@ use crate::storage::metadata::RegionMeta; use crate::storage::requests::WriteRequest; use crate::storage::responses::WriteResponse; use crate::storage::snapshot::{ReadContext, Snapshot}; +use crate::storage::{RegionDescriptor, RegionId}; /// Chunks of rows in storage engine. #[async_trait] @@ -35,6 +36,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { type WriteRequest: WriteRequest; type Snapshot: Snapshot; + fn id(&self) -> RegionId; + /// Returns name of the region. fn name(&self) -> &str; @@ -53,6 +56,10 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { /// Create write request fn write_request(&self) -> Self::WriteRequest; + + fn alter(&self, _descriptor: RegionDescriptor) -> Result<(), Self::Error> { + unimplemented!() + } } /// Context for write operations. diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 7c5540773b..42372300ad 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -5,12 +5,13 @@ use std::sync::RwLock; use async_trait::async_trait; use common_error::ext::BoxedError; use common_telemetry::logging; +use datatypes::schema::SchemaRef; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ - self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, - CreateOptions, OpenOptions, RegionDescriptorBuilder, RegionId, RowKeyDescriptor, - RowKeyDescriptorBuilder, StorageEngine, + ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, + CreateOptions, EngineContext as StorageEngineContext, OpenOptions, RegionDescriptorBuilder, + RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{EngineContext, TableEngine}; use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; @@ -29,7 +30,7 @@ use crate::error::{ use crate::table::MitoTable; pub const MITO_ENGINE: &str = "mito"; -const INIT_COLUMN_ID: ColumnId = 0; +pub const INIT_COLUMN_ID: ColumnId = 0; const INIT_TABLE_VERSION: TableVersion = 0; /// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}" @@ -89,18 +90,18 @@ impl TableEngine for MitoEngine { async fn alter_table( &self, - _ctx: &EngineContext, - _request: AlterTableRequest, + ctx: &EngineContext, + req: AlterTableRequest, ) -> TableResult { - unimplemented!(); + Ok(self.inner.alter_table(ctx, req).await?) } fn get_table(&self, _ctx: &EngineContext, name: &str) -> TableResult> { Ok(self.inner.get_table(name)) } - fn table_exists(&self, _ctx: &EngineContext, _name: &str) -> bool { - unimplemented!(); + fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool { + self.inner.get_table(name).is_some() } async fn drop_table( @@ -125,18 +126,17 @@ struct MitoEngineInner { table_mutex: Mutex<()>, } -fn build_row_key_desc_from_schema( +pub(crate) fn build_row_key_desc( mut column_id: ColumnId, - request: &CreateTableRequest, + table_name: &str, + table_schema: &SchemaRef, + primary_key_indices: &Vec, ) -> Result<(ColumnId, RowKeyDescriptor)> { - let ts_column_schema = - request - .schema - .timestamp_column() - .context(MissingTimestampIndexSnafu { - table_name: &request.table_name, - })?; - let timestamp_index = request.schema.timestamp_index().unwrap(); + let ts_column_schema = table_schema + .timestamp_column() + .context(MissingTimestampIndexSnafu { table_name })?; + // `unwrap` is safe because we've checked the `timestamp_column` above + let timestamp_index = table_schema.timestamp_index().unwrap(); let ts_column = ColumnDescriptorBuilder::new( column_id, @@ -147,16 +147,16 @@ fn build_row_key_desc_from_schema( .build() .context(BuildColumnDescriptorSnafu { column_name: &ts_column_schema.name, - table_name: &request.table_name, + table_name, })?; column_id += 1; - let column_schemas = &request.schema.column_schemas(); + let column_schemas = &table_schema.column_schemas(); //TODO(boyan): enable version column by table option? let mut builder = RowKeyDescriptorBuilder::new(ts_column); - for index in &request.primary_key_indices { + for index in primary_key_indices { if *index == timestamp_index { continue; } @@ -172,7 +172,7 @@ fn build_row_key_desc_from_schema( .build() .context(BuildColumnDescriptorSnafu { column_name: &column_schema.name, - table_name: &request.table_name, + table_name, })?; builder = builder.push_column(column); @@ -181,27 +181,24 @@ fn build_row_key_desc_from_schema( Ok(( column_id, - builder.build().context(BuildRowKeyDescriptorSnafu { - table_name: &request.table_name, - })?, + builder + .build() + .context(BuildRowKeyDescriptorSnafu { table_name })?, )) } -fn build_column_family_from_request( +pub(crate) fn build_column_family( mut column_id: ColumnId, - request: &CreateTableRequest, + table_name: &str, + table_schema: &SchemaRef, + primary_key_indices: &[usize], ) -> Result<(ColumnId, ColumnFamilyDescriptor)> { let mut builder = ColumnFamilyDescriptorBuilder::default(); - let primary_key_indices = &request.primary_key_indices; - let ts_index = request - .schema + let ts_index = table_schema .timestamp_index() - .context(MissingTimestampIndexSnafu { - table_name: &request.table_name, - })?; - let column_schemas = request - .schema + .context(MissingTimestampIndexSnafu { table_name })?; + let column_schemas = table_schema .column_schemas() .iter() .enumerate() @@ -217,7 +214,7 @@ fn build_column_family_from_request( .build() .context(BuildColumnDescriptorSnafu { column_name: &column_schema.name, - table_name: &request.table_name, + table_name, })?; builder = builder.push_column(column); @@ -226,9 +223,9 @@ fn build_column_family_from_request( Ok(( column_id, - builder.build().context(BuildColumnFamilyDescriptorSnafu { - table_name: &request.table_name, - })?, + builder + .build() + .context(BuildColumnFamilyDescriptorSnafu { table_name })?, )) } @@ -248,9 +245,20 @@ impl MitoEngineInner { } } - let (next_column_id, default_cf) = - build_column_family_from_request(INIT_COLUMN_ID, &request)?; - let (next_column_id, row_key) = build_row_key_desc_from_schema(next_column_id, &request)?; + let table_schema = &request.schema; + let primary_key_indices = &request.primary_key_indices; + let (next_column_id, default_cf) = build_column_family( + INIT_COLUMN_ID, + table_name, + table_schema, + primary_key_indices, + )?; + let (next_column_id, row_key) = build_row_key_desc( + next_column_id, + table_name, + table_schema, + primary_key_indices, + )?; let table_id = request.id; // TODO(dennis): supports multi regions; @@ -285,7 +293,7 @@ impl MitoEngineInner { let region = self .storage_engine - .create_region(&storage::EngineContext::default(), region_descriptor, &opts) + .create_region(&StorageEngineContext::default(), region_descriptor, &opts) .await .map_err(BoxedError::new) .context(error::CreateRegionSnafu)?; @@ -340,7 +348,7 @@ impl MitoEngineInner { return Ok(Some(table)); } - let engine_ctx = storage::EngineContext::default(); + let engine_ctx = StorageEngineContext::default(); let opts = OpenOptions { parent_dir: table_dir(table_name), }; @@ -379,6 +387,20 @@ impl MitoEngineInner { fn get_table(&self, name: &str) -> Option { self.tables.read().unwrap().get(name).cloned() } + + async fn alter_table(&self, _ctx: &EngineContext, req: AlterTableRequest) -> Result { + let table_name = &req.table_name.clone(); + let table = self + .get_table(table_name) + .context(error::TableNotFoundSnafu { table_name })?; + + logging::info!("start altering table {} with request {:?}", table_name, req); + table + .alter(req) + .await + .context(error::AlterTableSnafu { table_name })?; + Ok(table) + } } impl MitoEngineInner { @@ -397,13 +419,15 @@ mod tests { use common_recordbatch::util; use datafusion_common::field_util::FieldExt; use datafusion_common::field_util::SchemaExt; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; use datatypes::vectors::*; use store_api::manifest::Manifest; - use table::requests::InsertRequest; + use table::requests::{AlterKind, InsertRequest}; use super::*; use crate::table::test_util; - use crate::table::test_util::MockRegion; + use crate::table::test_util::{MockRegion, TABLE_NAME}; #[test] fn test_region_name() { @@ -617,4 +641,50 @@ mod tests { assert_eq!(8589934602, region_id(2, 10)); assert_eq!(18446744069414584330, region_id(u32::MAX, 10)); } + + #[tokio::test] + async fn test_alter_table_add_column() { + let (_engine, table_engine, table, _object_store, _dir) = + test_util::setup_mock_engine_and_table().await; + + let table = table + .as_any() + .downcast_ref::>() + .unwrap(); + let table_info = table.table_info(); + let old_info = (*table_info).clone(); + let old_meta = &old_info.meta; + let old_schema = &old_meta.schema; + + let new_column = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); + let req = AlterTableRequest { + catalog_name: None, + schema_name: None, + table_name: TABLE_NAME.to_string(), + alter_kind: AlterKind::AddColumn { + new_column: new_column.clone(), + }, + }; + let table = table_engine + .alter_table(&EngineContext::default(), req) + .await + .unwrap(); + + let table = table + .as_any() + .downcast_ref::>() + .unwrap(); + let new_info = table.table_info(); + let new_meta = &new_info.meta; + let new_schema = &new_meta.schema; + + assert_eq!(new_schema.num_columns(), old_schema.num_columns() + 1); + assert_eq!( + new_schema.column_schemas().split_last().unwrap(), + (&new_column, old_schema.column_schemas()) + ); + assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column()); + assert_eq!(new_schema.version(), old_schema.version() + 1); + assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 1); + } } diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index e9da912887..59affca5b1 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -130,6 +130,33 @@ pub enum Error { table_name: String, }, + #[snafu(display("Table not found: {}", table_name))] + TableNotFound { + backtrace: Backtrace, + table_name: String, + }, + + #[snafu(display("Column {} already exists in table {}", column_name, table_name))] + ColumnExists { + backtrace: Backtrace, + column_name: String, + table_name: String, + }, + + #[snafu(display("Failed to build schema, msg: {}, source: {}", msg, source))] + SchemaBuild { + #[snafu(backtrace)] + source: datatypes::error::Error, + msg: String, + }, + + #[snafu(display("Failed to alter table {}, source: {}", table_name, source))] + AlterTable { + table_name: String, + #[snafu(backtrace)] + source: table::error::Error, + }, + #[snafu(display( "Projected columnd not found in region, column: {}", column_qualified_name @@ -155,6 +182,10 @@ impl ErrorExt for Error { match self { CreateRegion { source, .. } | OpenRegion { source, .. } => source.status_code(), + AlterTable { source, .. } => source.status_code(), + + SchemaBuild { source, .. } => source.status_code(), + BuildRowKeyDescriptor { .. } | BuildColumnDescriptor { .. } | BuildColumnFamilyDescriptor { .. } @@ -162,8 +193,10 @@ impl ErrorExt for Error { | BuildTableInfo { .. } | BuildRegionDescriptor { .. } | TableExists { .. } + | ColumnExists { .. } | ProjectedColumnNotFound { .. } - | MissingTimestampIndex { .. } => StatusCode::InvalidArguments, + | MissingTimestampIndex { .. } + | TableNotFound { .. } => StatusCode::InvalidArguments, TableInfoNotFound { .. } => StatusCode::Unexpected, diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index e32a0a5839..d7257e8688 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -3,31 +3,38 @@ pub mod test_util; use std::any::Any; use std::pin::Pin; +use std::sync::Arc; +use arc_swap::ArcSwap; use async_trait::async_trait; use common_query::logical_plan::Expr; use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use common_telemetry::logging; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; use futures::task::{Context, Poll}; use futures::Stream; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::manifest::action::ProtocolAction; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; +use store_api::storage::RegionDescriptorBuilder; use store_api::storage::{ ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, }; use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult}; -use table::requests::InsertRequest; +use table::metadata::{TableInfoRef, TableMetaBuilder}; +use table::requests::{AlterKind, AlterTableRequest, InsertRequest}; use table::{ metadata::{TableInfo, TableType}, table::Table, }; +use tokio::sync::Mutex; +use crate::engine::{build_column_family, build_row_key_desc, INIT_COLUMN_ID}; use crate::error::{ - ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, + self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, UpdateTableManifestSnafu, }; use crate::manifest::action::*; @@ -41,9 +48,11 @@ fn table_manifest_dir(table_name: &str) -> String { /// [Table] implementation. pub struct MitoTable { manifest: TableManifest, - table_info: TableInfo, + // guarded by `self.alter_lock` + table_info: ArcSwap, // TODO(dennis): a table contains multi regions region: R, + alter_lock: Mutex<()>, } #[async_trait] @@ -53,7 +62,7 @@ impl Table for MitoTable { } fn schema(&self) -> SchemaRef { - self.table_info.meta.schema.clone() + self.table_info().meta.schema.clone() } async fn insert(&self, request: InsertRequest) -> TableResult { @@ -65,8 +74,10 @@ impl Table for MitoTable { let mut put_op = write_request.put_op(); let mut columns_values = request.columns_values; - let key_columns = self.table_info.meta.row_key_column_names(); - let value_columns = self.table_info.meta.value_column_names(); + + let table_info = self.table_info(); + let key_columns = table_info.meta.row_key_column_names(); + let value_columns = table_info.meta.value_column_names(); //Add row key and columns for name in key_columns { @@ -101,7 +112,7 @@ impl Table for MitoTable { } fn table_type(&self) -> TableType { - self.table_info.table_type + self.table_info().table_type } async fn scan( @@ -140,6 +151,138 @@ impl Table for MitoTable { Ok(Box::pin(ChunkStream { schema, stream })) } + + // Alter table changes the schemas of the table. The altering happens as cloning a new schema, + // change the new one, and swap the old. Though we can change the schema in place, considering + // the complex interwinding of inner data representation of schema, I think it's safer to + // change it like this to avoid partial inconsistent during the altering. For example, schema's + // `name_to_index` field must changed with `column_schemas` synchronously. If we add or remove + // columns from `column_schemas` *and then* update the `name_to_index`, there's a slightly time + // window of an inconsistency of the two field, which might bring some hard to trace down + // concurrency related bugs or failures. (Of course we could introduce some guards like readwrite + // lock to protect the consistency of schema altering, but that would hurt the performance of + // schema reads, and the reads are the dominant operation of schema. At last, altering is + // performed far lesser frequent.) + async fn alter(&self, req: AlterTableRequest) -> TableResult<()> { + let _lock = self.alter_lock.lock().await; + + let table_info = self.table_info(); + let table_name = &table_info.name; + let table_meta = &table_info.meta; + let table_schema = match &req.alter_kind { + AlterKind::AddColumn { new_column } => { + build_table_schema_with_new_column(table_name, &table_meta.schema, new_column)? + } + }; + + let primary_key_indices = &table_meta.primary_key_indices; + let (next_column_id, default_cf) = build_column_family( + INIT_COLUMN_ID, + table_name, + &table_schema, + primary_key_indices, + )?; + let (next_column_id, row_key) = build_row_key_desc( + next_column_id, + table_name, + &table_schema, + primary_key_indices, + )?; + + let new_meta = TableMetaBuilder::default() + .schema(table_schema.clone()) + .engine(&table_meta.engine) + .next_column_id(next_column_id) + .primary_key_indices(primary_key_indices.clone()) + .build() + .context(error::BuildTableMetaSnafu { table_name })?; + + let mut new_info = TableInfo::clone(&*table_info); + new_info.ident.version = table_info.ident.version + 1; + new_info.meta = new_meta; + + // first alter region + let region = self.region(); + let region_descriptor = RegionDescriptorBuilder::default() + .id(region.id()) + .name(region.name()) + .row_key(row_key) + .default_cf(default_cf) + .build() + .context(error::BuildRegionDescriptorSnafu { + table_name, + region_name: region.name(), + })?; + logging::debug!( + "start altering region {} of table {}, with new region descriptor {:?}", + region.name(), + table_name, + region_descriptor + ); + region.alter(region_descriptor).map_err(TableError::new)?; + + // then alter table info + logging::debug!( + "start updating the manifest of table {} with new table info {:?}", + table_name, + new_info + ); + self.manifest + .update(TableMetaActionList::new(vec![ + TableMetaAction::Protocol(ProtocolAction::new()), + TableMetaAction::Change(Box::new(TableChange { + table_info: new_info.clone(), + })), + ])) + .await + .context(UpdateTableManifestSnafu { + table_name: &self.table_info().name, + })?; + self.set_table_info(new_info); + + // TODO(LFC): Think of a way to properly handle the metadata integrity between region and table. + // Currently there are no "transactions" to alter the metadata of region and table together, + // they are altered in sequence. That means there might be cases where the metadata of region + // is altered while the table's is not. Then the metadata integrity between region and + // table cannot be hold. + Ok(()) + } +} + +fn build_table_schema_with_new_column( + table_name: &str, + table_schema: &SchemaRef, + new_column: &ColumnSchema, +) -> Result { + if table_schema + .column_schema_by_name(&new_column.name) + .is_some() + { + return error::ColumnExistsSnafu { + column_name: &new_column.name, + table_name, + } + .fail()?; + } + + let mut columns = table_schema.column_schemas().to_vec(); + columns.push(new_column.clone()); + + // Right now we are not support adding the column + // before or after some column, so just clone a new schema like this. + // TODO(LFC): support adding column before or after some column + let mut builder = SchemaBuilder::from_columns(columns).version(table_schema.version() + 1); + + if let Some(index) = table_schema.timestamp_index() { + builder = builder.timestamp_index(index); + } + for (k, v) in table_schema.arrow_schema().metadata.iter() { + builder = builder.add_metadata(k, v); + } + let new_schema = Arc::new(builder.build().with_context(|_| error::SchemaBuildSnafu { + msg: format!("cannot add new column {:?}", new_column), + })?); + Ok(new_schema) } struct ChunkStream { @@ -169,9 +312,10 @@ fn column_qualified_name(table_name: &str, region_name: &str, column_name: &str) impl MitoTable { fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self { Self { - table_info, + table_info: ArcSwap::new(Arc::new(table_info)), region, manifest, + alter_lock: Mutex::new(()), } } @@ -182,7 +326,9 @@ impl MitoTable { region: &R, projection: Option>, ) -> Result>> { - let table_schema = &self.table_info.meta.schema; + let table_info = self.table_info(); + let table_schema = &table_info.meta.schema; + let region_meta = region.in_memory_metadata(); let region_schema = region_meta.schema(); @@ -198,7 +344,7 @@ impl MitoTable { region_schema.column_index_by_name(name).with_context(|| { ProjectedColumnNotFoundSnafu { column_qualified_name: column_qualified_name( - &self.table_info.name, + &table_info.name, region.name(), name, ), @@ -217,7 +363,7 @@ impl MitoTable { region_schema.column_index_by_name(name).with_context(|| { ProjectedColumnNotFoundSnafu { column_qualified_name: column_qualified_name( - &self.table_info.name, + &table_info.name, region.name(), name, ), @@ -311,8 +457,17 @@ impl MitoTable { } #[inline] - pub fn table_info(&self) -> &TableInfo { - &self.table_info + pub fn region(&self) -> &R { + &self.region + } + + #[inline] + pub fn table_info(&self) -> TableInfoRef { + Arc::clone(&self.table_info.load()) + } + + pub fn set_table_info(&self, table_info: TableInfo) { + self.table_info.swap(Arc::new(table_info)); } #[inline] @@ -328,6 +483,8 @@ impl MitoTable { #[cfg(test)] mod tests { + use datatypes::prelude::ConcreteDataType; + use super::*; #[test] @@ -335,4 +492,36 @@ mod tests { assert_eq!("demo/manifest/", table_manifest_dir("demo")); assert_eq!("numbers/manifest/", table_manifest_dir("numbers")); } + + #[test] + fn test_build_table_schema_with_new_column() { + let table_info = test_util::build_test_table_info(); + let table_name = &table_info.name; + let table_meta = &table_info.meta; + let table_schema = &table_meta.schema; + + let new_column = ColumnSchema::new("host", ConcreteDataType::string_datatype(), true); + let result = build_table_schema_with_new_column(table_name, table_schema, &new_column); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Column host already exists in table demo")); + + let new_column = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); + let new_schema = + build_table_schema_with_new_column(table_name, table_schema, &new_column).unwrap(); + + assert_eq!(new_schema.num_columns(), table_schema.num_columns() + 1); + assert_eq!( + new_schema.column_schemas().split_last().unwrap(), + (&new_column, table_schema.column_schemas()) + ); + + assert_eq!( + new_schema.timestamp_column(), + table_schema.timestamp_column() + ); + assert_eq!(new_schema.version(), table_schema.version() + 1); + } } diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index da2dc1fea8..b07ecf3eb0 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -1,23 +1,27 @@ //! A mock storage engine for table test purpose. use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; +use arc_swap::ArcSwap; use async_trait::async_trait; use common_error::mock::MockError; use common_telemetry::logging; -use storage::metadata::{RegionMetaImpl, RegionMetadataRef}; -use storage::write_batch::WriteBatch; +use datatypes::prelude::{Value, VectorBuilder, VectorRef}; +use datatypes::schema::ColumnSchema; +use storage::metadata::{RegionMetaImpl, RegionMetadata}; +use storage::write_batch::{Mutation, WriteBatch}; use store_api::storage::{ Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions, - ReadContext, Region, RegionDescriptor, RegionMeta, ScanRequest, ScanResponse, SchemaRef, - Snapshot, StorageEngine, WriteContext, WriteResponse, + ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest, ScanResponse, + SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse, }; pub type Result = std::result::Result; pub struct MockChunkReader { schema: SchemaRef, + memtable: MockMemtable, } #[async_trait] @@ -29,12 +33,27 @@ impl ChunkReader for MockChunkReader { } async fn next_chunk(&mut self) -> Result> { - Ok(None) + let columns = self + .schema + .column_schemas() + .iter() + .map(|column_schema| { + let data = self.memtable.get(&column_schema.name).unwrap(); + let mut builder = + VectorBuilder::with_capacity(column_schema.data_type.clone(), data.len()); + for v in data { + builder.push(v); + } + builder.finish() + }) + .collect::>(); + Ok(Some(Chunk::new(columns))) } } pub struct MockSnapshot { - metadata: RegionMetadataRef, + schema: SchemaRef, + region: Arc, } #[async_trait] @@ -43,7 +62,7 @@ impl Snapshot for MockSnapshot { type Reader = MockChunkReader; fn schema(&self) -> &SchemaRef { - self.metadata.user_schema() + &self.schema } async fn scan( @@ -51,10 +70,14 @@ impl Snapshot for MockSnapshot { _ctx: &ReadContext, _request: ScanRequest, ) -> Result> { - let reader = MockChunkReader { - schema: self.metadata.user_schema().clone(), + let memtable = { + let memtable = self.region.memtable.read().unwrap(); + memtable.clone() + }; + let reader = MockChunkReader { + schema: self.schema().clone(), + memtable, }; - Ok(ScanResponse { reader }) } @@ -69,10 +92,17 @@ impl Snapshot for MockSnapshot { pub struct MockRegion { // FIXME(yingwen): Remove this once name is provided by metadata. name: String, - // We share the same metadata definition with the storage engine. - metadata: RegionMetadataRef, + pub inner: Arc, } +#[derive(Debug)] +pub struct MockRegionInner { + pub metadata: ArcSwap, + memtable: Arc>, +} + +type MockMemtable = HashMap>; + #[async_trait] impl Region for MockRegion { type Error = MockError; @@ -80,27 +110,85 @@ impl Region for MockRegion { type WriteRequest = WriteBatch; type Snapshot = MockSnapshot; + fn id(&self) -> RegionId { + self.inner.metadata.load().id() + } + fn name(&self) -> &str { &self.name } fn in_memory_metadata(&self) -> RegionMetaImpl { - RegionMetaImpl::new(self.metadata.clone()) + RegionMetaImpl::new(self.inner.metadata.load().clone()) } - async fn write(&self, _ctx: &WriteContext, _request: WriteBatch) -> Result { + async fn write(&self, _ctx: &WriteContext, request: WriteBatch) -> Result { + self.inner.write(request); Ok(WriteResponse {}) } fn snapshot(&self, _ctx: &ReadContext) -> Result { Ok(MockSnapshot { - metadata: self.metadata.clone(), + schema: self.inner.metadata.load().user_schema().clone(), + region: self.inner.clone(), }) } fn write_request(&self) -> WriteBatch { WriteBatch::new(self.in_memory_metadata().schema().clone()) } + + fn alter(&self, descriptor: RegionDescriptor) -> Result<()> { + let metadata = descriptor.try_into().unwrap(); + self.inner.update_metadata(metadata); + Ok(()) + } +} + +impl MockRegionInner { + fn new(metadata: RegionMetadata) -> Self { + let mut memtable = HashMap::new(); + for column in metadata.user_schema().column_schemas() { + memtable.insert(column.name.clone(), vec![]); + } + Self { + metadata: ArcSwap::new(Arc::new(metadata)), + memtable: Arc::new(RwLock::new(memtable)), + } + } + + fn update_metadata(&self, metadata: RegionMetadata) { + { + let mut memtable = self.memtable.write().unwrap(); + + let rows = memtable.values().last().unwrap().len(); + + // currently dropping columns are not supported, so we only add columns here + for column in metadata.user_schema().column_schemas() { + let _ = memtable + .entry(column.name.clone()) + .or_insert_with(|| vec![Value::Null; rows]); + } + } + self.metadata.swap(Arc::new(metadata)); + } + + fn write(&self, request: WriteBatch) { + let metadata = self.metadata.load(); + + let mut memtable = self.memtable.write().unwrap(); + + for Mutation::Put(put) in request.iter() { + for ColumnSchema { name, .. } in metadata.user_schema().column_schemas() { + let column = memtable.get_mut(name).unwrap(); + if let Some(data) = put.column_by_name(name) { + (0..data.len()).for_each(|i| column.push(data.get(i))); + } else { + column.extend_from_slice(&vec![Value::Null; put.num_rows()]); + } + } + } + } } type RegionMap = HashMap; @@ -165,7 +253,7 @@ impl StorageEngine for MockEngine { let metadata = descriptor.try_into().unwrap(); let region = MockRegion { name: name.clone(), - metadata: Arc::new(metadata), + inner: Arc::new(MockRegionInner::new(metadata)), }; regions.opened_regions.insert(name, region.clone()); diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 96b28b8554..01ec76fc19 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use chrono::{DateTime, Utc}; use datatypes::schema::SchemaRef; @@ -105,6 +106,8 @@ pub struct TableInfo { pub table_type: TableType, } +pub type TableInfoRef = Arc; + impl TableInfoBuilder { pub fn new>(name: S, meta: TableMeta) -> Self { Self { diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index b7e5a7c9c5..1fca70fdfc 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use datatypes::prelude::VectorRef; -use datatypes::schema::SchemaRef; +use datatypes::schema::{ColumnSchema, SchemaRef}; use crate::metadata::TableId; @@ -38,7 +38,17 @@ pub struct OpenTableRequest { /// Alter table request #[derive(Debug)] -pub struct AlterTableRequest {} +pub struct AlterTableRequest { + pub catalog_name: Option, + pub schema_name: Option, + pub table_name: String, + pub alter_kind: AlterKind, +} + +#[derive(Debug)] +pub enum AlterKind { + AddColumn { new_column: ColumnSchema }, +} /// Drop table request #[derive(Debug)] diff --git a/src/table/src/table.rs b/src/table/src/table.rs index a1d7c4487d..e7f648ba69 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -10,7 +10,7 @@ use datatypes::schema::SchemaRef; use crate::error::Result; use crate::metadata::{FilterPushDownType, TableType}; -use crate::requests::InsertRequest; +use crate::requests::{AlterTableRequest, InsertRequest}; /// Table abstraction. #[async_trait::async_trait] @@ -49,6 +49,10 @@ pub trait Table: Send + Sync { fn supports_filter_pushdown(&self, _filter: &Expr) -> Result { Ok(FilterPushDownType::Unsupported) } + + async fn alter(&self, _request: AlterTableRequest) -> Result<()> { + unimplemented!() + } } pub type TableRef = Arc;