diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 322daeb621..48dcd06eec 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -82,6 +82,13 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Table engine not found: {}, source: {}", engine_name, source))] + TableEngineNotFound { + engine_name: String, + #[snafu(backtrace)] + source: table::error::Error, + }, + #[snafu(display("Cannot find catalog by name: {}", catalog_name))] CatalogNotFound { catalog_name: String, @@ -253,7 +260,9 @@ impl ErrorExt for Error { Error::TableExists { .. } => StatusCode::TableAlreadyExists, Error::TableNotExist { .. } => StatusCode::TableNotFound, - Error::SchemaExists { .. } => StatusCode::InvalidArguments, + Error::SchemaExists { .. } | Error::TableEngineNotFound { .. } => { + StatusCode::InvalidArguments + } Error::OpenSystemCatalog { source, .. } | Error::CreateSystemCatalog { source, .. } diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index d18594da60..ed3d528f45 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID, - SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME, + MITO_ENGINE, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME, }; use common_catalog::format_full_table_name; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; @@ -27,7 +27,8 @@ use datatypes::prelude::ScalarVector; use datatypes::vectors::{BinaryVector, UInt8Vector}; use futures_util::lock::Mutex; use snafu::{ensure, OptionExt, ResultExt}; -use table::engine::{EngineContext, TableEngineRef}; +use table::engine::manager::TableEngineManagerRef; +use table::engine::EngineContext; use table::metadata::TableId; use table::requests::OpenTableRequest; use table::table::numbers::NumbersTable; @@ -37,7 +38,8 @@ use table::TableRef; use crate::error::{ self, CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, - SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu, + SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, TableNotExistSnafu, + TableNotFoundSnafu, }; use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use crate::system::{ @@ -55,7 +57,7 @@ use crate::{ pub struct LocalCatalogManager { system: Arc, catalogs: Arc, - engine: TableEngineRef, + engine_manager: TableEngineManagerRef, next_table_id: AtomicU32, init_lock: Mutex, register_lock: Mutex<()>, @@ -63,19 +65,20 @@ pub struct LocalCatalogManager { } impl LocalCatalogManager { - /// Create a new [CatalogManager] with given user catalogs and table engine - pub async fn try_new(engine: TableEngineRef) -> Result { + /// Create a new [CatalogManager] with given user catalogs and mito engine + pub async fn try_new(engine_manager: TableEngineManagerRef) -> Result { + let engine = engine_manager + .engine(MITO_ENGINE) + .context(TableEngineNotFoundSnafu { + engine_name: MITO_ENGINE, + })?; let table = SystemCatalogTable::new(engine.clone()).await?; let memory_catalog_list = crate::local::memory::new_memory_catalog_list()?; - let system_catalog = Arc::new(SystemCatalog::new( - table, - memory_catalog_list.clone(), - engine.clone(), - )); + let system_catalog = Arc::new(SystemCatalog::new(table, memory_catalog_list.clone())); Ok(Self { system: system_catalog, catalogs: memory_catalog_list, - engine, + engine_manager, next_table_id: AtomicU32::new(MIN_USER_TABLE_ID), init_lock: Mutex::new(false), register_lock: Mutex::new(()), @@ -100,7 +103,14 @@ impl LocalCatalogManager { // Processing system table hooks let mut sys_table_requests = self.system_table_requests.lock().await; - handle_system_table_request(self, self.engine.clone(), &mut sys_table_requests).await?; + let engine = self + .engine_manager + .engine(MITO_ENGINE) + .context(TableEngineNotFoundSnafu { + engine_name: MITO_ENGINE, + })?; + + handle_system_table_request(self, engine, &mut sys_table_requests).await?; Ok(()) } @@ -253,9 +263,14 @@ impl LocalCatalogManager { table_name: t.table_name.clone(), table_id: t.table_id, }; + let engine = self + .engine_manager + .engine(&t.engine) + .context(TableEngineNotFoundSnafu { + engine_name: &t.engine, + })?; - let option = self - .engine + let option = engine .open_table(&context, request) .await .with_context(|_| OpenTableSnafu { @@ -364,6 +379,7 @@ impl CatalogManager for LocalCatalogManager { // Try to register table with same table id, just ignore. Ok(false) } else { + let engine = request.table.table_info().meta.engine.to_string(); // table does not exist self.system .register_table( @@ -371,6 +387,7 @@ impl CatalogManager for LocalCatalogManager { schema_name.clone(), request.table_name.clone(), request.table_id, + engine, ) .await?; schema.register_table(request.table_name, request.table)?; @@ -404,6 +421,14 @@ impl CatalogManager for LocalCatalogManager { schema: schema_name, })?; + let old_table = schema + .table(&request.table_name) + .await? + .context(TableNotExistSnafu { + table: &request.table_name, + })?; + + let engine = old_table.table_info().meta.engine.to_string(); // rename table in system catalog self.system .register_table( @@ -411,6 +436,7 @@ impl CatalogManager for LocalCatalogManager { schema_name.clone(), request.new_table_name.clone(), request.table_id, + engine, ) .await?; Ok(schema @@ -530,6 +556,8 @@ impl CatalogManager for LocalCatalogManager { mod tests { use std::assert_matches::assert_matches; + use mito::engine::MITO_ENGINE; + use super::*; use crate::system::{CatalogEntry, SchemaEntry}; @@ -541,6 +569,7 @@ mod tests { schema_name: "S1".to_string(), table_name: "T1".to_string(), table_id: 1, + engine: MITO_ENGINE.to_string(), }), Entry::Catalog(CatalogEntry { catalog_name: "C2".to_string(), @@ -561,6 +590,7 @@ mod tests { schema_name: "S1".to_string(), table_name: "T2".to_string(), table_id: 2, + engine: MITO_ENGINE.to_string(), }), ]; let res = LocalCatalogManager::sort_entries(vec); diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 1bf93633a7..5279a66ce3 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -20,14 +20,17 @@ use std::sync::Arc; use arc_swap::ArcSwap; use async_stream::stream; use async_trait::async_trait; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use common_catalog::consts::{ + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE, +}; use common_telemetry::{debug, error, info}; use dashmap::DashMap; use futures::Stream; use futures_util::StreamExt; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt}; -use table::engine::{EngineContext, TableEngineRef}; +use table::engine::manager::TableEngineManagerRef; +use table::engine::EngineContext; use table::metadata::TableId; use table::requests::{CreateTableRequest, OpenTableRequest}; use table::table::numbers::NumbersTable; @@ -36,7 +39,7 @@ use tokio::sync::Mutex; use crate::error::{ CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, Result, - SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu, + SchemaNotFoundSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu, }; use crate::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, @@ -55,14 +58,14 @@ pub struct RemoteCatalogManager { node_id: u64, backend: KvBackendRef, catalogs: Arc>>, - engine: TableEngineRef, + engine_manager: TableEngineManagerRef, system_table_requests: Mutex>, } impl RemoteCatalogManager { - pub fn new(engine: TableEngineRef, node_id: u64, backend: KvBackendRef) -> Self { + pub fn new(engine_manager: TableEngineManagerRef, node_id: u64, backend: KvBackendRef) -> Self { Self { - engine, + engine_manager, node_id, backend, catalogs: Default::default(), @@ -331,8 +334,13 @@ impl RemoteCatalogManager { table_name: table_name.clone(), table_id, }; - match self - .engine + let engine = self + .engine_manager + .engine(&table_info.meta.engine) + .context(TableEngineNotFoundSnafu { + engine_name: &table_info.meta.engine, + })?; + match engine .open_table(&context, request) .await .with_context(|_| OpenTableSnafu { @@ -363,9 +371,10 @@ impl RemoteCatalogManager { primary_key_indices: meta.primary_key_indices.clone(), create_if_not_exists: true, table_options: meta.options.clone(), + engine: engine.name().to_string(), }; - self.engine + engine .create_table(&context, req) .await .context(CreateTableSnafu { @@ -398,7 +407,13 @@ impl CatalogManager for RemoteCatalogManager { info!("Max table id allocated: {}", max_table_id); let mut system_table_requests = self.system_table_requests.lock().await; - handle_system_table_request(self, self.engine.clone(), &mut system_table_requests).await?; + let engine = self + .engine_manager + .engine(MITO_ENGINE) + .context(TableEngineNotFoundSnafu { + engine_name: MITO_ENGINE, + })?; + handle_system_table_request(self, engine, &mut system_table_requests).await?; info!("All system table opened"); self.catalog(DEFAULT_CATALOG_NAME) diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index c49c26f616..7a51c9af53 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -17,8 +17,8 @@ use std::collections::HashMap; use std::sync::Arc; use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, - SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME, + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE, + SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME, }; use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlanRef, SessionContext}; @@ -112,6 +112,7 @@ impl SystemCatalogTable { primary_key_indices: vec![ENTRY_TYPE_INDEX, KEY_INDEX], create_if_not_exists: true, table_options: TableOptions::default(), + engine: engine.name().to_string(), }; let table = engine @@ -194,12 +195,13 @@ pub fn build_table_insert_request( schema: String, table_name: String, table_id: TableId, + engine: String, ) -> InsertRequest { let entry_key = format_table_entry_key(&catalog, &schema, table_id); build_insert_request( EntryType::Table, entry_key.as_bytes(), - serde_json::to_string(&TableEntryValue { table_name }) + serde_json::to_string(&TableEntryValue { table_name, engine }) .unwrap() .as_bytes(), ) @@ -330,6 +332,7 @@ pub fn decode_system_catalog( schema_name: table_parts[1].to_string(), table_name: table_meta.table_name, table_id, + engine: table_meta.engine, })) } } @@ -385,11 +388,19 @@ pub struct TableEntry { pub schema_name: String, pub table_name: String, pub table_id: TableId, + pub engine: String, } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct TableEntryValue { pub table_name: String, + + #[serde(default = "mito_engine")] + pub engine: String, +} + +fn mito_engine() -> String { + MITO_ENGINE.to_string() } #[cfg(test)] @@ -399,7 +410,7 @@ mod tests { use datatypes::value::Value; use log_store::NoopLogStore; use mito::config::EngineConfig; - use mito::engine::MitoEngine; + use mito::engine::{MitoEngine, MITO_ENGINE}; use object_store::ObjectStore; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; @@ -528,6 +539,7 @@ mod tests { DEFAULT_SCHEMA_NAME.to_string(), "my_table".to_string(), 1, + MITO_ENGINE.to_string(), ); let result = catalog_table.insert(table_insertion).await.unwrap(); assert_eq!(result, 1); @@ -548,6 +560,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "my_table".to_string(), table_id: 1, + engine: MITO_ENGINE.to_string(), }); assert_eq!(entry, expected); diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 71a1d30f33..64e2c65b97 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -33,7 +33,6 @@ use datatypes::value::ValueRef; use datatypes::vectors::VectorRef; use futures::Stream; use snafu::ResultExt; -use table::engine::TableEngineRef; use table::error::TablesRecordBatchSnafu; use table::metadata::{TableId, TableInfoRef}; use table::table::scan::SimpleTableScan; @@ -52,15 +51,13 @@ use crate::{ pub struct Tables { schema: SchemaRef, catalogs: CatalogListRef, - engine_name: String, } impl Tables { - pub fn new(catalogs: CatalogListRef, engine_name: String) -> Self { + pub fn new(catalogs: CatalogListRef) -> Self { Self { schema: Arc::new(build_schema_for_tables()), catalogs, - engine_name, } } } @@ -87,7 +84,6 @@ impl Table for Tables { ) -> table::error::Result { let catalogs = self.catalogs.clone(); let schema_ref = self.schema.clone(); - let engine_name = self.engine_name.clone(); let stream = stream!({ for catalog_name in catalogs @@ -105,32 +101,29 @@ impl Table for Tables { .map_err(BoxedError::new) .context(TablesRecordBatchSnafu)? { - let mut tables_in_schema = Vec::with_capacity( - catalog - .schema_names() - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)? - .len(), - ); let schema = catalog .schema(&schema_name) .map_err(BoxedError::new) .context(TablesRecordBatchSnafu)? .unwrap(); - for table_name in schema + let names = schema .table_names() .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)? - { - tables_in_schema.push(table_name); + .context(TablesRecordBatchSnafu)?; + let mut tables = Vec::with_capacity(names.len()); + + for name in names { + let table = schema + .table(&name) + .await + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)? + .unwrap(); + + tables.push(table); } - let vec = tables_to_record_batch( - &catalog_name, - &schema_name, - tables_in_schema, - &engine_name, - ); + let vec = tables_to_record_batch(&catalog_name, &schema_name, tables); let record_batch_res = RecordBatch::new(schema_ref.clone(), vec); yield record_batch_res; } @@ -149,23 +142,21 @@ impl Table for Tables { fn tables_to_record_batch( catalog_name: &str, schema_name: &str, - table_names: Vec, - engine: &str, + tables: Vec, ) -> Vec { - let mut catalog_vec = - ConcreteDataType::string_datatype().create_mutable_vector(table_names.len()); - let mut schema_vec = - ConcreteDataType::string_datatype().create_mutable_vector(table_names.len()); + let mut catalog_vec = ConcreteDataType::string_datatype().create_mutable_vector(tables.len()); + let mut schema_vec = ConcreteDataType::string_datatype().create_mutable_vector(tables.len()); let mut table_name_vec = - ConcreteDataType::string_datatype().create_mutable_vector(table_names.len()); - let mut engine_vec = - ConcreteDataType::string_datatype().create_mutable_vector(table_names.len()); + ConcreteDataType::string_datatype().create_mutable_vector(tables.len()); + let mut engine_vec = ConcreteDataType::string_datatype().create_mutable_vector(tables.len()); - for table_name in table_names { + for table in tables { + let name = &table.table_info().name; + let engine = &table.table_info().meta.engine; // Safety: All these vectors are string type. catalog_vec.push_value_ref(ValueRef::String(catalog_name)); schema_vec.push_value_ref(ValueRef::String(schema_name)); - table_name_vec.push_value_ref(ValueRef::String(&table_name)); + table_name_vec.push_value_ref(ValueRef::String(name)); engine_vec.push_value_ref(ValueRef::String(engine)); } @@ -251,13 +242,9 @@ pub struct SystemCatalog { } impl SystemCatalog { - pub fn new( - system: SystemCatalogTable, - catalogs: CatalogListRef, - engine: TableEngineRef, - ) -> Self { + pub fn new(system: SystemCatalogTable, catalogs: CatalogListRef) -> Self { let schema = InformationSchema { - tables: Arc::new(Tables::new(catalogs, engine.name().to_string())), + tables: Arc::new(Tables::new(catalogs)), system: Arc::new(system), }; Self { @@ -271,8 +258,9 @@ impl SystemCatalog { schema: String, table_name: String, table_id: TableId, + engine: String, ) -> crate::error::Result { - let request = build_table_insert_request(catalog, schema, table_name, table_id); + let request = build_table_insert_request(catalog, schema, table_name, table_id, engine); self.information_schema .system .insert(request) @@ -383,10 +371,13 @@ mod tests { .unwrap() .unwrap(); schema - .register_table("test_table".to_string(), Arc::new(NumbersTable::default())) + .register_table( + "test_table".to_string(), + Arc::new(NumbersTable::with_name(1, "test_table".to_string())), + ) .unwrap(); - let tables = Tables::new(catalog_list, "test_engine".to_string()); + let tables = Tables::new(catalog_list); let tables_stream = tables.scan(None, &[], None).await.unwrap(); let session_ctx = SessionContext::new(); let mut tables_stream = tables_stream.execute(0, session_ctx.task_ctx()).unwrap(); diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index ccf2836c63..ab125f741d 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -21,6 +21,7 @@ mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_telemetry::{error, info}; use mito::config::EngineConfig; + use table::engine::manager::MemoryTableEngineManager; use table::table::numbers::NumbersTable; use table::TableRef; use tokio::sync::Mutex; @@ -33,7 +34,8 @@ mod tests { mito::table::test_util::MockEngine::default(), object_store, )); - let catalog_manager = LocalCatalogManager::try_new(mock_engine).await.unwrap(); + let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone())); + let catalog_manager = LocalCatalogManager::try_new(engine_manager).await.unwrap(); catalog_manager.start().await?; Ok(catalog_manager) } diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 32e8a49e56..8f6edcc37a 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -27,9 +27,10 @@ mod tests { KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, }; use catalog::{CatalogList, CatalogManager, RegisterTableRequest}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use datatypes::schema::RawSchema; use futures_util::StreamExt; + use table::engine::manager::MemoryTableEngineManager; use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; @@ -80,8 +81,11 @@ mod tests { ) -> (KvBackendRef, TableEngineRef, Arc) { let backend = Arc::new(MockKvBackend::default()) as KvBackendRef; let table_engine = Arc::new(MockTableEngine::default()); - let catalog_manager = - RemoteCatalogManager::new(table_engine.clone(), node_id, backend.clone()); + let engine_manager = Arc::new(MemoryTableEngineManager::alias( + MITO_ENGINE.to_string(), + table_engine.clone(), + )); + let catalog_manager = RemoteCatalogManager::new(engine_manager, node_id, backend.clone()); catalog_manager.start().await.unwrap(); (backend, table_engine, Arc::new(catalog_manager)) } @@ -131,6 +135,7 @@ mod tests { primary_key_indices: vec![], create_if_not_exists: false, table_options: Default::default(), + engine: MITO_ENGINE.to_string(), }, ) .await @@ -191,6 +196,7 @@ mod tests { primary_key_indices: vec![], create_if_not_exists: false, table_options: Default::default(), + engine: MITO_ENGINE.to_string(), }, ) .await @@ -251,6 +257,7 @@ mod tests { primary_key_indices: vec![], create_if_not_exists: false, table_options: Default::default(), + engine: MITO_ENGINE.to_string(), }, ) .await diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index a33b44f938..2a126a0e36 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -25,3 +25,5 @@ pub const MIN_USER_TABLE_ID: u32 = 1024; pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0; /// scripts table id pub const SCRIPTS_TABLE_ID: u32 = 1; + +pub const MITO_ENGINE: &str = "mito"; diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 649009bb95..c6769ce58f 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -14,7 +14,7 @@ use api::v1::alter_expr::Kind; use api::v1::{column_def, AlterExpr, CreateTableExpr, DropColumns, RenameTable}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use datatypes::schema::{ColumnSchema, RawSchema}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; @@ -178,6 +178,8 @@ pub fn create_expr_to_request( primary_key_indices, create_if_not_exists: expr.create_if_not_exists, table_options, + // TODO(weny): remove hard-code + engine: MITO_ENGINE.to_string(), }) } diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index a3c2ef61de..a3956fc027 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -535,10 +535,11 @@ fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> b mod test { use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use catalog::{CatalogList, CatalogProvider, RegisterTableRequest}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use datafusion::common::{DFSchema, ToDFSchema}; use datafusion_expr::TableSource; use datatypes::schema::RawSchema; + use table::engine::manager::MemoryTableEngineManager; use table::requests::CreateTableRequest; use table::test_util::{EmptyTable, MockTableEngine}; @@ -549,11 +550,11 @@ mod test { async fn build_mock_catalog_manager() -> CatalogManagerRef { let mock_table_engine = Arc::new(MockTableEngine::new()); - let catalog_manager = Arc::new( - LocalCatalogManager::try_new(mock_table_engine) - .await - .unwrap(), - ); + let engine_manager = Arc::new(MemoryTableEngineManager::alias( + MITO_ENGINE.to_string(), + mock_table_engine.clone(), + )); + let catalog_manager = Arc::new(LocalCatalogManager::try_new(engine_manager).await.unwrap()); let schema_provider = Arc::new(MemorySchemaProvider::new()); let catalog_provider = Arc::new(MemoryCatalogProvider::new()); catalog_provider @@ -579,6 +580,7 @@ mod test { primary_key_indices: vec![], create_if_not_exists: true, table_options: Default::default(), + engine: MITO_ENGINE.to_string(), } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 2d05189b42..730da2422a 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -101,6 +101,13 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Table engine not found: {}, source: {}", engine_name, source))] + TableEngineNotFound { + engine_name: String, + #[snafu(backtrace)] + source: table::error::Error, + }, + #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String, @@ -533,6 +540,7 @@ impl ErrorExt for Error { Insert { source, .. } => source.status_code(), Delete { source, .. } => source.status_code(), + TableEngineNotFound { source, .. } => source.status_code(), TableNotFound { .. } => StatusCode::TableNotFound, ColumnNotFound { .. } => StatusCode::TableColumnNotFound, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index f5bb721162..1f76350b63 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -45,6 +45,7 @@ use storage::config::EngineConfig as StorageEngineConfig; use storage::scheduler::{LocalScheduler, SchedulerConfig}; use storage::EngineImpl; use store_api::logstore::LogStore; +use table::engine::manager::MemoryTableEngineManager; use table::requests::FlushTableRequest; use table::table::numbers::NumbersTable; use table::table::TableIdProviderRef; @@ -117,6 +118,8 @@ impl Instance { object_store, )); + let engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine.clone())); + // create remote catalog manager let (catalog_manager, table_id_provider) = match opts.mode { Mode::Standalone => { @@ -141,7 +144,7 @@ impl Instance { ) } else { let catalog = Arc::new( - catalog::local::LocalCatalogManager::try_new(table_engine.clone()) + catalog::local::LocalCatalogManager::try_new(engine_manager) .await .context(CatalogSnafu)?, ); @@ -155,7 +158,7 @@ impl Instance { Mode::Distributed => { let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new( - table_engine.clone(), + engine_manager, opts.node_id.context(MissingNodeIdSnafu)?, Arc::new(MetaKvBackend { client: meta_client.as_ref().unwrap().clone(), @@ -194,7 +197,7 @@ impl Instance { Ok(Self { query_engine: query_engine.clone(), sql_handler: SqlHandler::new( - table_engine.clone(), + Arc::new(MemoryTableEngineManager::new(table_engine.clone())), catalog_manager.clone(), table_engine, procedure_manager.clone(), diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 1de85d7ef0..c5f611aa5d 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use catalog::CatalogManagerRef; use common_error::prelude::BoxedError; use common_procedure::ProcedureManagerRef; @@ -21,12 +23,14 @@ use query::sql::{show_databases, show_tables}; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use sql::statements::show::{ShowDatabases, ShowTables}; -use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference}; +use table::engine::manager::TableEngineManagerRef; +use table::engine::{TableEngineProcedureRef, TableEngineRef, TableReference}; use table::requests::*; -use table::TableRef; +use table::{Table, TableRef}; use crate::error::{ - CloseTableEngineSnafu, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu, + self, CloseTableEngineSnafu, ExecuteSqlSnafu, Result, TableEngineNotFoundSnafu, + TableNotFoundSnafu, }; use crate::instance::sql::table_idents_to_full_name; @@ -53,7 +57,7 @@ pub enum SqlRequest { // Handler to execute SQL except query #[derive(Clone)] pub struct SqlHandler { - table_engine: TableEngineRef, + table_engine_manager: TableEngineManagerRef, catalog_manager: CatalogManagerRef, engine_procedure: TableEngineProcedureRef, procedure_manager: Option, @@ -61,13 +65,13 @@ pub struct SqlHandler { impl SqlHandler { pub fn new( - table_engine: TableEngineRef, + table_engine_manager: TableEngineManagerRef, catalog_manager: CatalogManagerRef, engine_procedure: TableEngineProcedureRef, procedure_manager: Option, ) -> Self { Self { - table_engine, + table_engine_manager, catalog_manager, engine_procedure, procedure_manager, @@ -103,23 +107,38 @@ impl SqlHandler { result } - pub(crate) fn get_table(&self, table_ref: &TableReference) -> Result { - self.table_engine - .get_table(&EngineContext::default(), table_ref) - .with_context(|_| GetTableSnafu { - table_name: table_ref.to_string(), - })? + pub async fn get_table(&self, table_ref: &TableReference<'_>) -> Result { + let TableReference { + catalog, + schema, + table, + } = table_ref; + let table = self + .catalog_manager + .table(catalog, schema, table) + .await + .context(error::CatalogSnafu)? .with_context(|| TableNotFoundSnafu { table_name: table_ref.to_string(), - }) + })?; + Ok(table) } - pub fn table_engine(&self) -> TableEngineRef { - self.table_engine.clone() + pub fn table_engine_manager(&self) -> TableEngineManagerRef { + self.table_engine_manager.clone() + } + + pub fn table_engine(&self, table: Arc) -> Result { + let engine_name = &table.table_info().meta.engine; + let engine = self + .table_engine_manager + .engine(engine_name) + .context(TableEngineNotFoundSnafu { engine_name })?; + Ok(engine) } pub async fn close(&self) -> Result<()> { - self.table_engine + self.table_engine_manager .close() .await .map_err(BoxedError::new) diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index 48d1b63b14..f9159f0de1 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -35,20 +35,24 @@ impl SqlHandler { let full_table_name = table_ref.to_string(); + // fetches table via catalog + let table = self.get_table(&table_ref).await?; + // checks the table engine exist + let table_engine = self.table_engine(table)?; ensure!( - self.table_engine.table_exists(&ctx, &table_ref), + table_engine.table_exists(&ctx, &table_ref), error::TableNotFoundSnafu { table_name: &full_table_name, } ); let is_rename = req.is_rename_table(); - let table = - self.table_engine - .alter_table(&ctx, req) - .await - .context(error::AlterTableSnafu { - table_name: full_table_name, - })?; + + let table = table_engine + .alter_table(&ctx, req) + .await + .context(error::AlterTableSnafu { + table_name: full_table_name, + })?; if is_rename { let table_info = &table.table_info(); let rename_table_req = RenameTableRequest { diff --git a/src/datanode/src/sql/copy_table_from.rs b/src/datanode/src/sql/copy_table_from.rs index b661036857..93b1dcf52f 100644 --- a/src/datanode/src/sql/copy_table_from.rs +++ b/src/datanode/src/sql/copy_table_from.rs @@ -40,7 +40,7 @@ impl SqlHandler { schema: &req.schema_name, table: &req.table_name, }; - let table = self.get_table(&table_ref)?; + let table = self.get_table(&table_ref).await?; let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?; diff --git a/src/datanode/src/sql/copy_table_to.rs b/src/datanode/src/sql/copy_table_to.rs index 98abadbce1..8704529a88 100644 --- a/src/datanode/src/sql/copy_table_to.rs +++ b/src/datanode/src/sql/copy_table_to.rs @@ -32,7 +32,7 @@ impl SqlHandler { schema: &req.schema_name, table: &req.table_name, }; - let table = self.get_table(&table_ref)?; + let table = self.get_table(&table_ref).await?; let stream = table .scan(None, &[], None) diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 8882d01610..4c29d3c2e4 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -34,7 +34,7 @@ use crate::error::{ self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu, IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu, RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SubmitProcedureSnafu, - UnrecognizedTableOptionSnafu, WaitProcedureSnafu, + TableEngineNotFoundSnafu, UnrecognizedTableOptionSnafu, WaitProcedureSnafu, }; use crate::sql::SqlHandler; @@ -107,8 +107,14 @@ impl SqlHandler { // determine catalog and schema from the very beginning let table_name = req.table_name.clone(); - let table = self - .table_engine + let table_engine = + self.table_engine_manager + .engine(&req.engine) + .context(TableEngineNotFoundSnafu { + engine_name: &req.engine, + })?; + + let table = table_engine .create_table(&ctx, req) .await .with_context(|_| CreateTableSnafu { @@ -138,10 +144,16 @@ impl SqlHandler { req: CreateTableRequest, ) -> Result { let table_name = req.table_name.clone(); + let table_engine = + self.table_engine_manager + .engine(&req.engine) + .context(TableEngineNotFoundSnafu { + engine_name: &req.engine, + })?; let procedure = CreateTableProcedure::new( req, self.catalog_manager.clone(), - self.table_engine.clone(), + table_engine.clone(), self.engine_procedure.clone(), ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -286,6 +298,7 @@ impl SqlHandler { primary_key_indices: primary_keys, create_if_not_exists: stmt.if_not_exists, table_options, + engine: stmt.engine, }; Ok(request) } diff --git a/src/datanode/src/sql/drop_table.rs b/src/datanode/src/sql/drop_table.rs index 8f4c7d8720..5ad684a245 100644 --- a/src/datanode/src/sql/drop_table.rs +++ b/src/datanode/src/sql/drop_table.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use catalog::error::TableNotExistSnafu; use catalog::DeregisterTableRequest; use common_error::prelude::BoxedError; use common_query::Output; use common_telemetry::info; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table::engine::{EngineContext, TableReference}; use table::requests::DropTableRequest; @@ -38,20 +39,36 @@ impl SqlHandler { }; let table_full_name = table_reference.to_string(); + let table = self + .catalog_manager + .table(&req.catalog_name, &req.schema_name, &req.table_name) + .await + .context(error::CatalogSnafu)? + .context(TableNotExistSnafu { + table: &table_full_name, + }) + .map_err(BoxedError::new) + .context(error::DropTableSnafu { + table_name: &table_full_name, + })?; + self.catalog_manager .deregister_table(deregister_table_req) .await .map_err(BoxedError::new) .context(error::DropTableSnafu { - table_name: table_full_name.clone(), + table_name: &table_full_name, })?; let ctx = EngineContext {}; - self.table_engine() + + let engine = self.table_engine(table)?; + + engine .drop_table(&ctx, req) .await .map_err(BoxedError::new) - .context(error::DropTableSnafu { + .with_context(|_| error::DropTableSnafu { table_name: table_full_name.clone(), })?; diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index 508cb230db..37efa75d96 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -46,7 +46,7 @@ impl SqlHandler { table: &req.table_name.to_string(), }; - let table = self.get_table(&table_ref)?; + let table = self.get_table(&table_ref).await?; let affected_rows = table.insert(req).await.with_context(|_| InsertSnafu { table_name: table_ref.to_string(), diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index ed3ff40cba..8477edcb27 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -15,7 +15,9 @@ use std::sync::Arc; use std::time::Duration; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use common_catalog::consts::{ + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE, +}; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; @@ -23,6 +25,7 @@ use mito::config::EngineConfig; use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; use servers::Mode; use snafu::ResultExt; +use table::engine::manager::MemoryTableEngineManager; use table::engine::{EngineContext, TableEngineRef}; use table::requests::{CreateTableRequest, TableOptions}; @@ -122,7 +125,11 @@ pub(crate) async fn create_test_table( ]; let table_name = "demo"; - let table_engine: TableEngineRef = instance.sql_handler().table_engine(); + let table_engine: TableEngineRef = instance + .sql_handler() + .table_engine_manager() + .engine(MITO_ENGINE) + .unwrap(); let table = table_engine .create_table( &EngineContext::default(), @@ -137,6 +144,7 @@ pub(crate) async fn create_test_table( primary_key_indices: vec![0], // "host" is in primary keys table_options: TableOptions::default(), region_numbers: vec![0], + engine: MITO_ENGINE.to_string(), }, ) .await @@ -160,10 +168,11 @@ pub async fn create_mock_sql_handler() -> SqlHandler { MockEngine::default(), object_store, )); + let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone())); let catalog_manager = Arc::new( - catalog::local::LocalCatalogManager::try_new(mock_engine.clone()) + catalog::local::LocalCatalogManager::try_new(engine_manager.clone()) .await .unwrap(), ); - SqlHandler::new(mock_engine.clone(), catalog_manager, mock_engine, None) + SqlHandler::new(engine_manager, catalog_manager, mock_engine, None) } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 11d0ae9f3c..e082e95d8b 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -435,7 +435,7 @@ impl SchemaProvider for FrontendSchemaProvider { #[cfg(test)] mod tests { - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use script::table::{build_scripts_schema, SCRIPTS_TABLE_NAME}; use table::requests::{CreateTableRequest, TableOptions}; @@ -460,6 +460,7 @@ mod tests { primary_key_indices: vec![0, 1], create_if_not_exists: true, table_options: TableOptions::default(), + engine: MITO_ENGINE.to_string(), }; let result = instance diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index ef20fefb3c..937865d226 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use async_trait::async_trait; +pub use common_catalog::consts::MITO_ENGINE; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_procedure::{BoxedProcedure, ProcedureManager}; @@ -55,8 +56,6 @@ use crate::error::{ }; use crate::manifest::TableManifest; use crate::table::MitoTable; - -pub const MITO_ENGINE: &str = "mito"; pub const INIT_COLUMN_ID: ColumnId = 0; const INIT_TABLE_VERSION: TableVersion = 0; diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 417ecf98ee..5ded50169e 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -98,6 +98,7 @@ async fn setup_table_with_column_default_constraint() -> (TempDir, String, Table primary_key_indices: Vec::default(), table_options: TableOptions::default(), region_numbers: vec![0], + engine: MITO_ENGINE.to_string(), }, ) .await @@ -212,6 +213,7 @@ fn test_validate_create_table_request() { primary_key_indices: vec![0, 1], table_options: TableOptions::default(), region_numbers: vec![0], + engine: MITO_ENGINE.to_string(), }; let err = validate_create_table_request(&request).unwrap_err(); @@ -373,6 +375,7 @@ async fn test_create_if_not_exists() { primary_key_indices: Vec::default(), table_options: TableOptions::default(), region_numbers: vec![0], + engine: MITO_ENGINE.to_string(), }; let created_table = table_engine.create_table(&ctx, request).await.unwrap(); @@ -390,6 +393,7 @@ async fn test_create_if_not_exists() { primary_key_indices: Vec::default(), table_options: TableOptions::default(), region_numbers: vec![0], + engine: MITO_ENGINE.to_string(), }; let result = table_engine.create_table(&ctx, request).await; @@ -600,6 +604,7 @@ async fn test_alter_rename_table() { primary_key_indices: vec![0], create_if_not_exists: true, table_options: TableOptions::default(), + engine: MITO_ENGINE.to_string(), }; table_engine .create_table(&ctx, req) @@ -684,6 +689,7 @@ async fn test_drop_table() { primary_key_indices: Vec::default(), table_options: TableOptions::default(), region_numbers: vec![0], + engine: MITO_ENGINE.to_string(), }; let created_table = table_engine @@ -717,6 +723,7 @@ async fn test_drop_table() { primary_key_indices: Vec::default(), table_options: TableOptions::default(), region_numbers: vec![0], + engine: MITO_ENGINE.to_string(), }; table_engine.create_table(&ctx, request).await.unwrap(); assert!(table_engine.table_exists(&engine_ctx, &table_reference)); diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index 0da99944a2..ff908f3f0c 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -117,6 +117,7 @@ pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { create_if_not_exists: true, primary_key_indices: vec![0], table_options: TableOptions::default(), + engine: MITO_ENGINE.to_string(), } } diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index dc8125221b..45cfa120e5 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -117,6 +117,7 @@ mod tests { use mito::config::EngineConfig as TableEngineConfig; use mito::table::test_util::new_test_object_store; use query::QueryEngineFactory; + use table::engine::manager::MemoryTableEngineManager; use super::*; type DefaultEngine = MitoEngine>; @@ -153,9 +154,9 @@ mod tests { ), object_store, )); - + let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone())); let catalog_manager = Arc::new( - catalog::local::LocalCatalogManager::try_new(mock_engine.clone()) + catalog::local::LocalCatalogManager::try_new(engine_manager) .await .unwrap(), ); diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 4a38fbf218..c5371e0731 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -17,7 +17,9 @@ use std::collections::HashMap; use std::sync::Arc; use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID}; +use common_catalog::consts::{ + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE, SCRIPTS_TABLE_ID, +}; use common_catalog::format_full_table_name; use common_query::Output; use common_recordbatch::util as record_util; @@ -65,6 +67,7 @@ impl ScriptsTable { primary_key_indices: vec![0, 1], create_if_not_exists: true, table_options: TableOptions::default(), + engine: MITO_ENGINE.to_string(), }; catalog_manager diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index 5a00cec3e0..62b686b4ff 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -309,6 +309,7 @@ impl CreateTableData { mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; + use mito::engine::MITO_ENGINE; use table::engine::{EngineContext, TableEngine}; use super::*; @@ -345,6 +346,7 @@ mod tests { create_if_not_exists: true, primary_key_indices: vec![0], table_options: Default::default(), + engine: MITO_ENGINE.to_string(), } } diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index c679362cfd..a240869e41 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -22,6 +22,7 @@ use crate::error::Result; use crate::metadata::TableId; use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use crate::TableRef; +pub mod manager; /// Represents a resolved path to a table of the form “catalog.schema.table” #[derive(Debug, PartialEq)] diff --git a/src/table/src/engine/manager.rs b/src/table/src/engine/manager.rs new file mode 100644 index 0000000000..671186d38a --- /dev/null +++ b/src/table/src/engine/manager.rs @@ -0,0 +1,127 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; +use common_telemetry::error; +use snafu::{ensure, OptionExt}; + +use crate::engine::TableEngineRef; +use crate::error::{EngineExistSnafu, EngineNotFoundSnafu, Result}; + +#[async_trait::async_trait] +pub trait TableEngineManager: Send + Sync { + /// returns `error::EngineNotFound` if engine not found + fn engine(&self, name: &str) -> Result; + + /// returns `error::EngineExist` if engine exists + fn register_engine(&self, name: &str, engine: TableEngineRef) -> Result<()>; + + /// closes all registered engines + async fn close(&self) -> Result<()>; +} +pub type TableEngineManagerRef = Arc; + +/// Simple in-memory table engine manager +pub struct MemoryTableEngineManager { + pub engines: RwLock>, +} + +impl MemoryTableEngineManager { + pub fn new(engine: TableEngineRef) -> Self { + MemoryTableEngineManager::alias(engine.name().to_string(), engine) + } + + pub fn alias(name: String, engine: TableEngineRef) -> Self { + let mut engines = HashMap::new(); + engines.insert(name, engine); + let engines = RwLock::new(engines); + + MemoryTableEngineManager { engines } + } +} + +#[async_trait] +impl TableEngineManager for MemoryTableEngineManager { + fn engine(&self, name: &str) -> Result { + let engines = self.engines.read().unwrap(); + engines + .get(name) + .cloned() + .context(EngineNotFoundSnafu { engine: name }) + } + + fn register_engine(&self, name: &str, engine: TableEngineRef) -> Result<()> { + let mut engines = self.engines.write().unwrap(); + + ensure!( + !engines.contains_key(name), + EngineExistSnafu { engine: name } + ); + + engines.insert(name.to_string(), engine); + + Ok(()) + } + + async fn close(&self) -> Result<()> { + let engines = { + let engines = self.engines.write().unwrap(); + engines.values().cloned().collect::>() + }; + + if let Err(err) = + futures::future::try_join_all(engines.iter().map(|engine| engine.close())).await + { + error!("Failed to close engine: {}", err); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use super::*; + use crate::engine::TableEngine; + use crate::error; + use crate::test_util::MockTableEngine; + + #[test] + fn test_table_engine_manager() { + let table_engine = MockTableEngine::new(); + let table_engine_ref = Arc::new(table_engine); + let table_engine_manager = MemoryTableEngineManager::new(table_engine_ref.clone()); + + table_engine_manager + .register_engine("yet_another", table_engine_ref.clone()) + .unwrap(); + + let got = table_engine_manager.engine(table_engine_ref.name()); + + assert_eq!(got.unwrap().name(), table_engine_ref.name()); + + let got = table_engine_manager.engine("yet_another"); + + assert_eq!(got.unwrap().name(), table_engine_ref.name()); + + let missing = table_engine_manager.engine("not_exists"); + + assert_matches!(missing.err().unwrap(), error::Error::EngineNotFound { .. }) + } +} diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 0376db64cc..e2fbe0b2c6 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -43,6 +43,18 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Engine not found: {}", engine))] + EngineNotFound { + engine: String, + backtrace: Backtrace, + }, + + #[snafu(display("Engine exist: {}", engine))] + EngineExist { + engine: String, + backtrace: Backtrace, + }, + #[snafu(display("Table projection error, source: {}", source))] TableProjection { source: ArrowError, @@ -133,7 +145,9 @@ impl ErrorExt for Error { Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound, Error::RegionSchemaMismatch { .. } => StatusCode::StorageUnavailable, Error::Unsupported { .. } => StatusCode::Unsupported, - Error::ParseTableOption { .. } => StatusCode::InvalidArguments, + Error::ParseTableOption { .. } + | Error::EngineNotFound { .. } + | Error::EngineExist { .. } => StatusCode::InvalidArguments, } } diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index eb49686c9c..2e9ceb7168 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#![feature(assert_matches)] pub mod engine; pub mod error; diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index e9f4854c7a..ac36c374df 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -47,6 +47,7 @@ pub struct CreateTableRequest { pub primary_key_indices: Vec, pub create_if_not_exists: bool, pub table_options: TableOptions, + pub engine: String, } #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 90b31cc52c..b814601078 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -38,10 +38,16 @@ use crate::table::{Expr, Table}; pub struct NumbersTable { table_id: TableId, schema: SchemaRef, + name: String, + engine: String, } impl NumbersTable { pub fn new(table_id: TableId) -> Self { + NumbersTable::with_name(table_id, "numbers".to_string()) + } + + pub fn with_name(table_id: TableId, name: String) -> Self { let column_schemas = vec![ColumnSchema::new( "number", ConcreteDataType::uint32_datatype(), @@ -49,6 +55,8 @@ impl NumbersTable { )]; Self { table_id, + name, + engine: "test_engine".to_string(), schema: Arc::new( SchemaBuilder::try_from_columns(column_schemas) .unwrap() @@ -79,7 +87,7 @@ impl Table for NumbersTable { Arc::new( TableInfoBuilder::default() .table_id(self.table_id) - .name("numbers") + .name(&self.name) .catalog_name("greptime") .schema_name("public") .table_version(0) @@ -90,6 +98,7 @@ impl Table for NumbersTable { .region_numbers(vec![0]) .primary_key_indices(vec![0]) .next_column_id(1) + .engine(&self.engine) .build() .unwrap(), ) diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 895895e9d9..dab0458111 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -20,7 +20,9 @@ use std::time::Duration; use axum::Router; use catalog::CatalogManagerRef; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use common_catalog::consts::{ + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE, +}; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datanode::datanode::{ @@ -222,7 +224,10 @@ pub async fn create_test_table( ]; let table_name = "demo"; - let table_engine: TableEngineRef = sql_handler.table_engine(); + let table_engine: TableEngineRef = sql_handler + .table_engine_manager() + .engine(MITO_ENGINE) + .unwrap(); let table = table_engine .create_table( &EngineContext::default(), @@ -237,6 +242,7 @@ pub async fn create_test_table( primary_key_indices: vec![0], // "host" is in primary keys table_options: TableOptions::default(), region_numbers: vec![0], + engine: MITO_ENGINE.to_string(), }, ) .await