diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 6157fb839e..c7c45c758f 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -25,7 +25,7 @@ use tokio::sync::Mutex; use crate::config::EngineConfig; use crate::error::{ self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, - BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, + BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, TableExistsSnafu }; use crate::table::MitoTable; @@ -238,7 +238,11 @@ impl MitoEngineInner { let table_name = &request.name; if let Some(table) = self.get_table(table_name) { - return Ok(table); + if request.create_if_not_exists { + return Ok(table); + } else { + return TableExistsSnafu { table_name }.fail(); + } } let (next_column_id, default_cf) = @@ -262,7 +266,11 @@ impl MitoEngineInner { let _lock = self.table_mutex.lock().await; // Checks again, read lock should be enough since we are guarded by the mutex. if let Some(table) = self.get_table(table_name) { - return Ok(table); + if request.create_if_not_exists { + return Ok(table); + } else { + return TableExistsSnafu { table_name }.fail(); + } } let opts = CreateOptions { @@ -389,10 +397,26 @@ mod tests { use datafusion_common::field_util::FieldExt; use datafusion_common::field_util::SchemaExt; use datatypes::vectors::*; + use store_api::manifest::Manifest; use table::requests::InsertRequest; use super::*; use crate::table::test_util; + use crate::table::test_util::MockRegion; + + #[test] + fn test_region_name() { + assert_eq!("0000000000", region_name(0)); + assert_eq!("0000000001", region_name(1)); + assert_eq!("0000000100", region_name(100)); + assert_eq!("0000009999", region_name(9999)); + } + + #[test] + fn test_table_dir() { + assert_eq!("test_table/", table_dir("test_table")); + assert_eq!("demo/", table_dir("demo")); + } #[tokio::test] async fn test_create_table_insert_scan() { @@ -444,6 +468,53 @@ mod tests { assert_eq!(memories.to_arrow_array(), columns[3]); } + #[tokio::test] + async fn test_create_if_not_exists() { + common_telemetry::init_default_ut_logging(); + let ctx = EngineContext::default(); + + let (_engine, table_engine, table, _object_store, _dir) = + test_util::setup_mock_engine_and_table().await; + + let table = table + .as_any() + .downcast_ref::>() + .unwrap(); + let table_info = table.table_info(); + + let request = CreateTableRequest { + name: table_info.name.to_string(), + schema: table_info.meta.schema.clone(), + create_if_not_exists: true, + desc: None, + primary_key_indices: Vec::default(), + }; + + let created_table = table_engine.create_table(&ctx, request).await.unwrap(); + assert_eq!( + table_info, + created_table + .as_any() + .downcast_ref::>() + .unwrap() + .table_info() + ); + + // test create_if_not_exists=false + let request = CreateTableRequest { + name: table_info.name.to_string(), + schema: table_info.meta.schema.clone(), + create_if_not_exists: false, + desc: None, + primary_key_indices: Vec::default(), + }; + + let result = table_engine.create_table(&ctx, request).await; + + assert!(result.is_err()); + assert!(matches!(result, Err(e) if format!("{:?}", e).contains("Table already exists"))); + } + #[tokio::test] async fn test_open_table() { common_telemetry::init_default_ut_logging(); @@ -457,8 +528,8 @@ mod tests { table_id: 0, }; - let (engine, table, object_store) = { - let (engine, table_engine, table, object_store) = + let (engine, table, object_store, _dir) = { + let (engine, table_engine, table, object_store, dir) = test_util::setup_mock_engine_and_table().await; assert_eq!(MITO_ENGINE, table_engine.name()); // Now try to open the table again. @@ -469,7 +540,7 @@ mod tests { .unwrap(); assert_eq!(table.schema(), reopened.schema()); - (engine, table, object_store) + (engine, table, object_store, dir) }; // Construct a new table engine, and try to open the table. @@ -480,5 +551,17 @@ mod tests { .unwrap() .unwrap(); assert_eq!(table.schema(), reopened.schema()); + + let table = table + .as_any() + .downcast_ref::>() + .unwrap(); + let reopened = reopened + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!(table.table_info(), reopened.table_info()); + assert_eq!(reopened.manifest().last_version(), 1); } } diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index d15fbcdfff..62e87d8312 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -123,6 +123,12 @@ pub enum Error { backtrace: Backtrace, table_name: String, }, + + #[snafu(display("Table already exists: {}", table_name))] + TableExists { + backtrace: Backtrace, + table_name: String, + }, } impl From for table::error::Error { @@ -146,6 +152,7 @@ impl ErrorExt for Error { | BuildTableMeta { .. } | BuildTableInfo { .. } | BuildRegionDescriptor { .. } + | TableExists { .. } | MissingTimestampIndex { .. } => StatusCode::InvalidArguments, UpdateTableManifest { .. } => StatusCode::StorageUnavailable, diff --git a/src/table-engine/src/manifest.rs b/src/table-engine/src/manifest.rs index 76d8c79f65..47a4581604 100644 --- a/src/table-engine/src/manifest.rs +++ b/src/table-engine/src/manifest.rs @@ -8,4 +8,77 @@ use crate::manifest::action::*; pub type TableManifest = ManifestImpl; #[cfg(test)] -mod tests {} +mod tests { + use storage::manifest::MetaActionIteratorImpl; + use store_api::manifest::action::ProtocolAction; + use store_api::manifest::{Manifest, MetaActionIterator}; + use table::metadata::TableInfo; + + use super::*; + use crate::table::test_util; + type TableManifestActionIter = MetaActionIteratorImpl; + + async fn assert_actions( + iter: &mut TableManifestActionIter, + protocol: &ProtocolAction, + table_info: &TableInfo, + ) { + match iter.next_action().await.unwrap() { + Some((v, action_list)) => { + assert_eq!(v, 0); + assert_eq!(2, action_list.actions.len()); + assert!( + matches!(&action_list.actions[0], TableMetaAction::Protocol(p) if *p == *protocol) + ); + assert!( + matches!(&action_list.actions[1], TableMetaAction::Change(c) if c.table_info == *table_info) + ); + } + _ => unreachable!(), + } + } + + #[tokio::test] + async fn test_table_manifest() { + let (_dir, object_store) = test_util::new_test_object_store("test_table_manifest").await; + + let manifest = TableManifest::new("manifest/", object_store); + + let mut iter = manifest.scan(0, 100).await.unwrap(); + assert!(iter.next_action().await.unwrap().is_none()); + + let protocol = ProtocolAction::new(); + let table_info = test_util::build_test_table_info(); + let action_list = TableMetaActionList::new(vec![ + TableMetaAction::Protocol(protocol.clone()), + TableMetaAction::Change(Box::new(TableChange { + table_info: table_info.clone(), + })), + ]); + + assert_eq!(0, manifest.update(action_list).await.unwrap()); + + let mut iter = manifest.scan(0, 100).await.unwrap(); + assert_actions(&mut iter, &protocol, &table_info).await; + assert!(iter.next_action().await.unwrap().is_none()); + + // update another action + let action_list = TableMetaActionList::new(vec![TableMetaAction::Remove(TableRemove { + table_name: table_info.name.clone(), + table_ident: table_info.ident.clone(), + })]); + assert_eq!(1, manifest.update(action_list).await.unwrap()); + let mut iter = manifest.scan(0, 100).await.unwrap(); + assert_actions(&mut iter, &protocol, &table_info).await; + + match iter.next_action().await.unwrap() { + Some((v, action_list)) => { + assert_eq!(v, 1); + assert_eq!(1, action_list.actions.len()); + assert!(matches!(&action_list.actions[0], + TableMetaAction::Remove(r) if r.table_name == table_info.name && r.table_ident == table_info.ident)); + } + _ => unreachable!(), + } + } +} diff --git a/src/table-engine/src/manifest/action.rs b/src/table-engine/src/manifest/action.rs index 6434c99e1a..6b9cae8646 100644 --- a/src/table-engine/src/manifest/action.rs +++ b/src/table-engine/src/manifest/action.rs @@ -113,4 +113,42 @@ impl MetaAction for TableMetaActionList { } #[cfg(test)] -mod tests {} +mod tests { + use common_telemetry::logging; + + use super::*; + use crate::table::test_util; + + #[test] + fn test_encode_decode_action_list() { + common_telemetry::init_default_ut_logging(); + let mut protocol = ProtocolAction::new(); + protocol.min_reader_version = 1; + + let table_info = test_util::build_test_table_info(); + + let mut action_list = TableMetaActionList::new(vec![ + TableMetaAction::Protocol(protocol.clone()), + TableMetaAction::Change(Box::new(TableChange { table_info })), + ]); + action_list.set_prev_version(3); + + let bs = action_list.encode().unwrap(); + + logging::debug!( + "Encoded action list: \r\n{}", + String::from_utf8(bs.clone()).unwrap() + ); + + let e = TableMetaActionList::decode(&bs, 0); + assert!(e.is_err()); + assert_eq!( + "Manifest protocol forbid to read, min_version: 1, supported_version: 0", + format!("{}", e.err().unwrap()) + ); + + let (decode_list, p) = TableMetaActionList::decode(&bs, 1).unwrap(); + assert_eq!(decode_list, action_list); + assert_eq!(p.unwrap(), protocol); + } +} diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index a44d9f1d18..e1cfbc4f2b 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -39,7 +39,7 @@ fn table_manifest_dir(table_name: &str) -> String { /// [Table] implementation. pub struct MitoTable { - _manifest: TableManifest, + manifest: TableManifest, table_info: TableInfo, // TODO(dennis): a table contains multi regions region: R, @@ -159,7 +159,7 @@ impl MitoTable { Self { table_info, region, - _manifest: manifest, + manifest, } } @@ -249,8 +249,24 @@ impl MitoTable { &self.table_info } + #[inline] + pub fn manifest(&self) -> &TableManifest { + &self.manifest + } + fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) { // TODO(dennis): use manifest version in catalog ? (manifest::MIN_VERSION, manifest::MAX_VERSION) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_table_manifest_dir() { + assert_eq!("demo/manifest/", table_manifest_dir("demo")); + assert_eq!("numbers/manifest/", table_manifest_dir("numbers")); + } +} diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index dff33eb88f..4ce70625c2 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -11,17 +11,20 @@ use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::engine::EngineContext; use table::engine::TableEngine; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::CreateTableRequest; use table::TableRef; use tempdir::TempDir; use crate::config::EngineConfig; use crate::engine::MitoEngine; -use crate::table::test_util::mock_engine::MockEngine; +use crate::engine::DEFAULT_ENGINE; +pub use crate::table::test_util::mock_engine::MockEngine; +pub use crate::table::test_util::mock_engine::MockRegion; pub const TABLE_NAME: &str = "demo"; -fn schema_for_test() -> Schema { +pub fn schema_for_test() -> Schema { let column_schemas = vec![ ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), @@ -34,6 +37,23 @@ fn schema_for_test() -> Schema { pub type MockMitoEngine = MitoEngine; +pub fn build_test_table_info() -> TableInfo { + let table_meta = TableMetaBuilder::default() + .schema(Arc::new(schema_for_test())) + .engine(DEFAULT_ENGINE) + .next_column_id(1) + .primary_key_indices(vec![0, 1]) + .build() + .unwrap(); + + TableInfoBuilder::new(TABLE_NAME.to_string(), table_meta) + .ident(0) + .table_version(0u64) + .table_type(TableType::Base) + .build() + .unwrap() +} + pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) { let dir = TempDir::new(prefix).unwrap(); let store_dir = dir.path().to_string_lossy(); @@ -78,9 +98,10 @@ pub async fn setup_test_engine_and_table() -> ( (table_engine, table, schema, dir) } -pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef, ObjectStore) { +pub async fn setup_mock_engine_and_table( +) -> (MockEngine, MockMitoEngine, TableRef, ObjectStore, TempDir) { let mock_engine = MockEngine::default(); - let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; + let (dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; let table_engine = MitoEngine::new( EngineConfig::default(), mock_engine.clone(), @@ -102,5 +123,5 @@ pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, Table .await .unwrap(); - (mock_engine, table_engine, table, object_store) + (mock_engine, table_engine, table, object_store, dir) }