From 932b30d2993f4a17945d1035e07afa17c6c2465f Mon Sep 17 00:00:00 2001 From: "Lei, Huang" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 26 Oct 2022 10:50:39 +0800 Subject: [PATCH] refactor: catalog crate (#331) * chore: refactor dir for local catalog manager * refactor: CatalogProvider returns Result * refactor: SchemaProvider returns Result * feat: add kv operations to remote catalog * chore: refactor some code * feat: impl catalog initialization * feat: add register table and register system table function * refactor: add table_info method for Table trait * chore: add some tests * chore: add register schema test * chore: fix build issue after rebase onto develop * refactor: mock to separate file * build: failed to compile * fix: use a container struct to bridge KvBackend and Accessor trait * feat: upgrade opendal to 0.17 * test: add more tests * chore: add catalog name and schema name to table info * chore: add catalog name and schema name to table info * chore: rebase onto develop * refactor: common-catalog crate * refactor: remove remote catalog related files * fix: compilation * feat: add table version to TableKey * feat: add node id to TableValue * fix: some CR comments * chore: change async fn create_expr_to_request to sync * fix: add backtrace to errors * fix: code style * fix: CatalogManager::table also requires both catalog_name and schema_name * chore: merge develop --- Cargo.lock | 30 +++ Cargo.toml | 1 + src/catalog/Cargo.toml | 8 + src/catalog/src/error.rs | 19 +- src/catalog/src/lib.rs | 86 ++++-- src/catalog/src/local.rs | 7 + src/catalog/src/{ => local}/manager.rs | 133 +++------ src/catalog/src/{ => local}/memory.rs | 68 +++-- src/catalog/src/schema.rs | 6 +- src/catalog/src/system.rs | 8 +- src/catalog/src/tables.rs | 88 ++++-- src/client/Cargo.toml | 8 +- src/common/catalog/Cargo.toml | 22 ++ src/{ => common}/catalog/src/consts.rs | 4 + src/common/catalog/src/error.rs | 45 ++++ src/common/catalog/src/helper.rs | 283 ++++++++++++++++++++ src/common/catalog/src/lib.rs | 8 + src/common/substrait/Cargo.toml | 1 + src/common/substrait/src/df_logical.rs | 21 +- src/datanode/Cargo.toml | 1 + src/datanode/src/error.rs | 19 +- src/datanode/src/instance.rs | 4 +- src/datanode/src/instance/grpc.rs | 11 +- src/datanode/src/instance/sql.rs | 10 +- src/datanode/src/server/grpc/ddl.rs | 5 +- src/datanode/src/sql.rs | 26 +- src/datanode/src/sql/create.rs | 7 +- src/datanode/src/sql/insert.rs | 5 +- src/datanode/src/sql/show.rs | 13 +- src/datanode/src/tests/test_util.rs | 6 +- src/query/Cargo.toml | 1 + src/query/src/datafusion.rs | 17 +- src/query/src/datafusion/catalog_adapter.rs | 62 +++-- src/query/src/query_engine.rs | 6 +- src/query/src/query_engine/state.rs | 6 +- src/query/tests/function.rs | 15 +- src/query/tests/my_sum_udaf_example.rs | 15 +- src/query/tests/percentile_test.rs | 15 +- src/query/tests/query_engine_test.rs | 27 +- src/script/Cargo.toml | 1 + src/script/src/manager.rs | 2 +- src/script/src/python/engine.rs | 17 +- src/script/src/table.rs | 6 +- src/servers/Cargo.toml | 1 + src/servers/tests/mod.rs | 15 +- src/sql/Cargo.toml | 1 + src/sql/src/statements.rs | 2 +- src/table-engine/src/engine.rs | 4 +- src/table-engine/src/table/test_util.rs | 2 + src/table/src/engine.rs | 1 + src/table/src/error.rs | 7 + src/table/src/test_util/memtable.rs | 57 +++- 52 files changed, 919 insertions(+), 314 deletions(-) create mode 100644 src/catalog/src/local.rs rename src/catalog/src/{ => local}/manager.rs (73%) rename src/catalog/src/{ => local}/memory.rs (76%) create mode 100644 src/common/catalog/Cargo.toml rename src/{ => common}/catalog/src/consts.rs (78%) create mode 100644 src/common/catalog/src/error.rs create mode 100644 src/common/catalog/src/helper.rs create mode 100644 src/common/catalog/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index a1ca0da6a1..80be665815 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -635,18 +635,23 @@ version = "0.1.0" dependencies = [ "async-stream", "async-trait", + "chrono", + "common-catalog", "common-error", "common-query", "common-recordbatch", + "common-runtime", "common-telemetry", "common-time", "datafusion", "datatypes", "futures", "futures-util", + "lazy_static", "log-store", "object-store", "opendal", + "regex", "serde", "serde_json", "snafu", @@ -876,6 +881,25 @@ dependencies = [ "snafu", ] +[[package]] +name = "common-catalog" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "common-error", + "common-telemetry", + "datatypes", + "lazy_static", + "regex", + "serde", + "serde_json", + "snafu", + "table", + "tempdir", + "tokio", +] + [[package]] name = "common-error" version = "0.1.0" @@ -1397,6 +1421,7 @@ dependencies = [ "catalog", "client", "common-base", + "common-catalog", "common-error", "common-grpc", "common-query", @@ -3743,6 +3768,7 @@ dependencies = [ "arrow2", "async-trait", "catalog", + "common-catalog", "common-error", "common-function", "common-function-macro", @@ -4468,6 +4494,7 @@ version = "0.1.0" dependencies = [ "async-trait", "catalog", + "common-catalog", "common-error", "common-function", "common-query", @@ -4608,6 +4635,7 @@ dependencies = [ "bytes", "catalog", "common-base", + "common-catalog", "common-error", "common-grpc", "common-query", @@ -4816,6 +4844,7 @@ name = "sql" version = "0.1.0" dependencies = [ "catalog", + "common-catalog", "common-error", "common-time", "datatypes", @@ -5047,6 +5076,7 @@ version = "0.1.0" dependencies = [ "bytes", "catalog", + "common-catalog", "common-error", "datafusion", "datatypes", diff --git a/Cargo.toml b/Cargo.toml index 379350be29..1c039e17d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "src/client", "src/cmd", "src/common/base", + "src/common/catalog", "src/common/error", "src/common/function", "src/common/function-macro", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index cf2b5b01f3..29935319c9 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -7,9 +7,11 @@ edition = "2021" [dependencies] async-stream = "0.3" async-trait = "0.1" +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } +common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ @@ -18,12 +20,18 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" +lazy_static = "1.4" +opendal = "0.17" +regex = "1.6" serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } +storage = { path = "../storage" } table = { path = "../table" } +tokio = { version = "1.18", features = ["full"] } [dev-dependencies] +chrono = "0.4" log-store = { path = "../log-store" } object-store = { path = "../object-store" } opendal = "0.17" diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 11c10fa1ce..0943aa7525 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -115,6 +115,17 @@ pub enum Error { #[snafu(backtrace)] source: common_query::error::Error, }, + #[snafu(display("Cannot parse catalog value, source: {}", source))] + InvalidCatalogValue { + #[snafu(backtrace)] + source: common_catalog::error::Error, + }, + + #[snafu(display("IO error occurred while fetching catalog info, source: {}", source))] + Io { + backtrace: Backtrace, + source: std::io::Error, + }, } pub type Result = std::result::Result; @@ -129,12 +140,14 @@ impl ErrorExt for Error { | Error::CatalogNotFound { .. } | Error::InvalidEntryType { .. } => StatusCode::Unexpected, - Error::SystemCatalog { .. } | Error::EmptyValue | Error::ValueDeserialize { .. } => { - StatusCode::StorageUnavailable - } + Error::SystemCatalog { .. } + | Error::EmptyValue + | Error::ValueDeserialize { .. } + | Error::Io { .. } => StatusCode::StorageUnavailable, Error::ReadSystemCatalog { source, .. } => source.status_code(), Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(), + Error::InvalidCatalogValue { source, .. } => source.status_code(), Error::RegisterTable { .. } => StatusCode::Internal, Error::TableExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index dd5e9d9f27..355dbfda78 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -3,18 +3,18 @@ use std::any::Any; use std::sync::Arc; +use common_telemetry::info; +use snafu::ResultExt; +use table::engine::{EngineContext, TableEngineRef}; use table::metadata::TableId; use table::requests::CreateTableRequest; use table::TableRef; -pub use crate::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; -pub use crate::manager::LocalCatalogManager; +use crate::error::{CreateTableSnafu, Result}; pub use crate::schema::{SchemaProvider, SchemaProviderRef}; -pub mod consts; pub mod error; -mod manager; -pub mod memory; +pub mod local; pub mod schema; pub mod system; pub mod tables; @@ -31,13 +31,13 @@ pub trait CatalogList: Sync + Send { &self, name: String, catalog: CatalogProviderRef, - ) -> Option; + ) -> Result>; /// Retrieves the list of available catalog names - fn catalog_names(&self) -> Vec; + fn catalog_names(&self) -> Result>; /// Retrieves a specific catalog by name, provided it exists. - fn catalog(&self, name: &str) -> Option; + fn catalog(&self, name: &str) -> Result>; } /// Represents a catalog, comprising a number of named schemas. @@ -47,14 +47,17 @@ pub trait CatalogProvider: Sync + Send { fn as_any(&self) -> &dyn Any; /// Retrieves the list of available schema names in this catalog. - fn schema_names(&self) -> Vec; + fn schema_names(&self) -> Result>; /// Registers schema to this catalog. - fn register_schema(&self, name: String, schema: SchemaProviderRef) - -> Option; + fn register_schema( + &self, + name: String, + schema: SchemaProviderRef, + ) -> Result>; /// Retrieves a specific schema from the catalog by name, provided it exists. - fn schema(&self, name: &str) -> Option; + fn schema(&self, name: &str) -> Result>; } pub type CatalogListRef = Arc; @@ -79,8 +82,8 @@ pub trait CatalogManager: CatalogList { /// Returns the table by catalog, schema and table name. fn table( &self, - catalog: Option<&str>, - schema: Option<&str>, + catalog: &str, + schema: &str, table_name: &str, ) -> error::Result>; } @@ -99,9 +102,10 @@ pub struct RegisterSystemTableRequest { pub open_hook: Option, } +#[derive(Clone)] pub struct RegisterTableRequest { - pub catalog: Option, - pub schema: Option, + pub catalog: String, + pub schema: String, pub table_name: String, pub table_id: TableId, pub table: TableRef, @@ -111,3 +115,53 @@ pub struct RegisterTableRequest { pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String { format!("{}.{}.{}", catalog, schema, table) } + +pub trait CatalogProviderFactory { + fn create(&self, catalog_name: String) -> CatalogProviderRef; +} + +pub trait SchemaProviderFactory { + fn create(&self, catalog_name: String, schema_name: String) -> SchemaProviderRef; +} + +pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>( + manager: &'a M, + engine: TableEngineRef, + sys_table_requests: &'a mut Vec, +) -> Result<()> { + for req in sys_table_requests.drain(..) { + let catalog_name = &req.create_table_request.catalog_name; + let schema_name = &req.create_table_request.schema_name; + let table_name = &req.create_table_request.table_name; + let table_id = req.create_table_request.id; + + let table = if let Some(table) = manager.table(catalog_name, schema_name, table_name)? { + table + } else { + let table = engine + .create_table(&EngineContext::default(), req.create_table_request.clone()) + .await + .with_context(|_| CreateTableSnafu { + table_info: format!( + "{}.{}.{}, id: {}", + catalog_name, schema_name, table_name, table_id, + ), + })?; + manager + .register_table(RegisterTableRequest { + catalog: catalog_name.clone(), + schema: schema_name.clone(), + table_name: table_name.clone(), + table_id, + table: table.clone(), + }) + .await?; + info!("Created and registered system table: {}", table_name); + table + }; + if let Some(hook) = req.open_hook { + (hook)(table)?; + } + } + Ok(()) +} diff --git a/src/catalog/src/local.rs b/src/catalog/src/local.rs new file mode 100644 index 0000000000..087d86f01b --- /dev/null +++ b/src/catalog/src/local.rs @@ -0,0 +1,7 @@ +pub mod manager; +pub mod memory; + +pub use manager::LocalCatalogManager; +pub use memory::{ + new_memory_catalog_list, MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider, +}; diff --git a/src/catalog/src/manager.rs b/src/catalog/src/local/manager.rs similarity index 73% rename from src/catalog/src/manager.rs rename to src/catalog/src/local/manager.rs index 28e336ddee..2e76e3677a 100644 --- a/src/catalog/src/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -2,6 +2,10 @@ use std::any::Any; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use common_catalog::consts::{ + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID, + SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME, +}; use common_recordbatch::RecordBatch; use common_telemetry::{debug, info}; use datatypes::prelude::ScalarVector; @@ -15,25 +19,22 @@ use table::requests::OpenTableRequest; use table::table::numbers::NumbersTable; use table::TableRef; -use super::error::Result; -use crate::consts::{ - INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME, -}; +use crate::error::Result; use crate::error::{ - CatalogNotFoundSnafu, CreateTableSnafu, IllegalManagerStateSnafu, OpenTableSnafu, - ReadSystemCatalogSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, - SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu, + CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, + SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu, + TableNotFoundSnafu, }; -use crate::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use crate::local::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; use crate::system::{ decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX, VALUE_INDEX, }; use crate::tables::SystemCatalog; use crate::{ - format_full_table_name, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, - RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, + format_full_table_name, handle_system_table_request, CatalogList, CatalogManager, + CatalogProvider, CatalogProviderRef, RegisterSystemTableRequest, RegisterTableRequest, + SchemaProvider, }; /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. @@ -50,7 +51,7 @@ impl LocalCatalogManager { /// Create a new [CatalogManager] with given user catalogs and table engine pub async fn try_new(engine: TableEngineRef) -> Result { let table = SystemCatalogTable::new(engine.clone()).await?; - let memory_catalog_list = crate::memory::new_memory_catalog_list()?; + let memory_catalog_list = crate::local::memory::new_memory_catalog_list()?; let system_catalog = Arc::new(SystemCatalog::new( table, memory_catalog_list.clone(), @@ -90,46 +91,7 @@ impl LocalCatalogManager { // Processing system table hooks let mut sys_table_requests = self.system_table_requests.lock().await; - for req in sys_table_requests.drain(..) { - let catalog_name = &req.create_table_request.catalog_name; - let schema_name = &req.create_table_request.schema_name; - let table_name = &req.create_table_request.table_name; - let table_id = req.create_table_request.id; - - let table = if let Some(table) = - self.table(Some(catalog_name), Some(schema_name), table_name)? - { - table - } else { - let table = self - .engine - .create_table(&EngineContext::default(), req.create_table_request.clone()) - .await - .with_context(|_| CreateTableSnafu { - table_info: format!( - "{}.{}.{}, id: {}", - catalog_name, schema_name, table_name, table_id, - ), - })?; - self.register_table(RegisterTableRequest { - catalog: Some(catalog_name.clone()), - schema: Some(schema_name.clone()), - table_name: table_name.clone(), - table_id, - table: table.clone(), - }) - .await?; - - info!("Created and registered system table: {}", table_name); - - table - }; - - if let Some(hook) = req.open_hook { - (hook)(table)?; - } - } - + handle_system_table_request(self, self.engine.clone(), &mut sys_table_requests).await?; Ok(()) } @@ -140,9 +102,9 @@ impl LocalCatalogManager { self.system.information_schema.system.clone(), )?; let system_catalog = Arc::new(MemoryCatalogProvider::new()); - system_catalog.register_schema(INFORMATION_SCHEMA_NAME.to_string(), system_schema); + system_catalog.register_schema(INFORMATION_SCHEMA_NAME.to_string(), system_schema)?; self.catalogs - .register_catalog(SYSTEM_CATALOG_NAME.to_string(), system_catalog); + .register_catalog(SYSTEM_CATALOG_NAME.to_string(), system_catalog)?; let default_catalog = Arc::new(MemoryCatalogProvider::new()); let default_schema = Arc::new(MemorySchemaProvider::new()); @@ -152,9 +114,9 @@ impl LocalCatalogManager { let table = Arc::new(NumbersTable::default()); default_schema.register_table("numbers".to_string(), table)?; - default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema); + default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?; self.catalogs - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog); + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)?; Ok(()) } @@ -210,14 +172,14 @@ impl LocalCatalogManager { Entry::Schema(s) => { let catalog = self.catalogs - .catalog(&s.catalog_name) + .catalog(&s.catalog_name)? .context(CatalogNotFoundSnafu { catalog_name: &s.catalog_name, })?; catalog.register_schema( s.schema_name.clone(), Arc::new(MemorySchemaProvider::new()), - ); + )?; info!("Registered schema: {:?}", s); } Entry::Table(t) => { @@ -234,12 +196,12 @@ impl LocalCatalogManager { async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> { let catalog = self .catalogs - .catalog(&t.catalog_name) + .catalog(&t.catalog_name)? .context(CatalogNotFoundSnafu { catalog_name: &t.catalog_name, })?; let schema = catalog - .schema(&t.schema_name) + .schema(&t.schema_name)? .context(SchemaNotFoundSnafu { schema_info: format!("{}.{}", &t.catalog_name, &t.schema_name), })?; @@ -283,19 +245,19 @@ impl CatalogList for LocalCatalogManager { &self, name: String, catalog: CatalogProviderRef, - ) -> Option> { + ) -> Result> { self.catalogs.register_catalog(name, catalog) } - fn catalog_names(&self) -> Vec { - let mut res = self.catalogs.catalog_names(); + fn catalog_names(&self) -> Result> { + let mut res = self.catalogs.catalog_names()?; res.push(SYSTEM_CATALOG_NAME.to_string()); - res + Ok(res) } - fn catalog(&self, name: &str) -> Option> { + fn catalog(&self, name: &str) -> Result> { if name.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) { - Some(self.system.clone()) + Ok(Some(self.system.clone())) } else { self.catalogs.catalog(name) } @@ -304,7 +266,7 @@ impl CatalogList for LocalCatalogManager { #[async_trait::async_trait] impl CatalogManager for LocalCatalogManager { - /// Start [MemoryCatalogManager] to load all information from system catalog table. + /// Start [LocalCatalogManager] to load all information from system catalog table. /// Make sure table engine is initialized before starting [MemoryCatalogManager]. async fn start(&self) -> Result<()> { self.init().await @@ -325,36 +287,30 @@ impl CatalogManager for LocalCatalogManager { } ); - let catalog_name = request - .catalog - .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()); - let schema_name = request - .schema - .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); + let catalog_name = &request.catalog; + let schema_name = &request.schema; let catalog = self .catalogs - .catalog(&catalog_name) - .context(CatalogNotFoundSnafu { - catalog_name: &catalog_name, - })?; + .catalog(catalog_name)? + .context(CatalogNotFoundSnafu { catalog_name })?; let schema = catalog - .schema(&schema_name) + .schema(schema_name)? .with_context(|| SchemaNotFoundSnafu { schema_info: format!("{}.{}", catalog_name, schema_name), })?; - if schema.table_exist(&request.table_name) { + if schema.table_exist(&request.table_name)? { return TableExistsSnafu { - table: format_full_table_name(&catalog_name, &schema_name, &request.table_name), + table: format_full_table_name(catalog_name, schema_name, &request.table_name), } .fail(); } self.system .register_table( - catalog_name, - schema_name, + catalog_name.clone(), + schema_name.clone(), request.table_name.clone(), request.table_id, ) @@ -380,22 +336,19 @@ impl CatalogManager for LocalCatalogManager { fn table( &self, - catalog: Option<&str>, - schema: Option<&str>, + catalog_name: &str, + schema_name: &str, table_name: &str, ) -> Result> { - let catalog_name = catalog.unwrap_or(DEFAULT_CATALOG_NAME); - let schema_name = schema.unwrap_or(DEFAULT_SCHEMA_NAME); - let catalog = self .catalogs - .catalog(catalog_name) + .catalog(catalog_name)? .context(CatalogNotFoundSnafu { catalog_name })?; let schema = catalog - .schema(schema_name) + .schema(schema_name)? .with_context(|| SchemaNotFoundSnafu { schema_info: format!("{}.{}", catalog_name, schema_name), })?; - Ok(schema.table(table_name)) + schema.table(table_name) } } diff --git a/src/catalog/src/memory.rs b/src/catalog/src/local/memory.rs similarity index 76% rename from src/catalog/src/memory.rs rename to src/catalog/src/local/memory.rs index 6250efd4f2..f195b07718 100644 --- a/src/catalog/src/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -23,7 +23,7 @@ impl MemoryCatalogList { pub fn register_catalog_if_absent( &self, name: String, - catalog: Arc, + catalog: CatalogProviderRef, ) -> Option { let mut catalogs = self.catalogs.write().unwrap(); let entry = catalogs.entry(name); @@ -46,19 +46,19 @@ impl CatalogList for MemoryCatalogList { &self, name: String, catalog: CatalogProviderRef, - ) -> Option { + ) -> Result> { let mut catalogs = self.catalogs.write().unwrap(); - catalogs.insert(name, catalog) + Ok(catalogs.insert(name, catalog)) } - fn catalog_names(&self) -> Vec { + fn catalog_names(&self) -> Result> { let catalogs = self.catalogs.read().unwrap(); - catalogs.keys().map(|s| s.to_string()).collect() + Ok(catalogs.keys().map(|s| s.to_string()).collect()) } - fn catalog(&self, name: &str) -> Option { + fn catalog(&self, name: &str) -> Result> { let catalogs = self.catalogs.read().unwrap(); - catalogs.get(name).cloned() + Ok(catalogs.get(name).cloned()) } } @@ -87,23 +87,23 @@ impl CatalogProvider for MemoryCatalogProvider { self } - fn schema_names(&self) -> Vec { + fn schema_names(&self) -> Result> { let schemas = self.schemas.read().unwrap(); - schemas.keys().cloned().collect() + Ok(schemas.keys().cloned().collect()) } fn register_schema( &self, name: String, schema: SchemaProviderRef, - ) -> Option { + ) -> Result> { let mut schemas = self.schemas.write().unwrap(); - schemas.insert(name, schema) + Ok(schemas.insert(name, schema)) } - fn schema(&self, name: &str) -> Option> { + fn schema(&self, name: &str) -> Result>> { let schemas = self.schemas.read().unwrap(); - schemas.get(name).cloned() + Ok(schemas.get(name).cloned()) } } @@ -132,18 +132,18 @@ impl SchemaProvider for MemorySchemaProvider { self } - fn table_names(&self) -> Vec { + fn table_names(&self) -> Result> { let tables = self.tables.read().unwrap(); - tables.keys().cloned().collect() + Ok(tables.keys().cloned().collect()) } - fn table(&self, name: &str) -> Option { + fn table(&self, name: &str) -> Result> { let tables = self.tables.read().unwrap(); - tables.get(name).cloned() + Ok(tables.get(name).cloned()) } fn register_table(&self, name: String, table: TableRef) -> Result> { - if self.table_exist(name.as_str()) { + if self.table_exist(name.as_str())? { return TableExistsSnafu { table: name }.fail()?; } let mut tables = self.tables.write().unwrap(); @@ -155,9 +155,9 @@ impl SchemaProvider for MemorySchemaProvider { Ok(tables.remove(name)) } - fn table_exist(&self, name: &str) -> bool { + fn table_exist(&self, name: &str) -> Result { let tables = self.tables.read().unwrap(); - tables.contains_key(name) + Ok(tables.contains_key(name)) } } @@ -168,40 +168,50 @@ pub fn new_memory_catalog_list() -> Result> { #[cfg(test)] mod tests { + use common_catalog::consts::*; use common_error::ext::ErrorExt; use common_error::prelude::StatusCode; use table::table::numbers::NumbersTable; use super::*; - use crate::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; #[test] fn test_new_memory_catalog_list() { let catalog_list = new_memory_catalog_list().unwrap(); - assert!(catalog_list.catalog(DEFAULT_CATALOG_NAME).is_none()); + assert!(catalog_list + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .is_none()); let default_catalog = Arc::new(MemoryCatalogProvider::default()); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog.clone()); + catalog_list + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog.clone()) + .unwrap(); - assert!(default_catalog.schema(DEFAULT_SCHEMA_NAME).is_none()); + assert!(default_catalog + .schema(DEFAULT_SCHEMA_NAME) + .unwrap() + .is_none()); let default_schema = Arc::new(MemorySchemaProvider::default()); - default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone()); + default_catalog + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone()) + .unwrap(); default_schema .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) .unwrap(); - let table = default_schema.table("numbers"); + let table = default_schema.table("numbers").unwrap(); assert!(table.is_some()); - assert!(default_schema.table("not_exists").is_none()); + assert!(default_schema.table("not_exists").unwrap().is_none()); } #[tokio::test] async fn test_mem_provider() { let provider = MemorySchemaProvider::new(); let table_name = "numbers"; - assert!(!provider.table_exist(table_name)); + assert!(!provider.table_exist(table_name).unwrap()); assert!(provider.deregister_table(table_name).unwrap().is_none()); let test_table = NumbersTable::default(); // register table successfully @@ -209,7 +219,7 @@ mod tests { .register_table(table_name.to_string(), Arc::new(test_table)) .unwrap() .is_none()); - assert!(provider.table_exist(table_name)); + assert!(provider.table_exist(table_name).unwrap()); let other_table = NumbersTable::default(); let result = provider.register_table(table_name.to_string(), Arc::new(other_table)); let err = result.err().unwrap(); diff --git a/src/catalog/src/schema.rs b/src/catalog/src/schema.rs index 5b59939668..739c9cc835 100644 --- a/src/catalog/src/schema.rs +++ b/src/catalog/src/schema.rs @@ -12,10 +12,10 @@ pub trait SchemaProvider: Sync + Send { fn as_any(&self) -> &dyn Any; /// Retrieves the list of available table names in this schema. - fn table_names(&self) -> Vec; + fn table_names(&self) -> Result>; /// Retrieves a specific table from the schema by name, provided it exists. - fn table(&self, name: &str) -> Option; + fn table(&self, name: &str) -> Result>; /// If supported by the implementation, adds a new table to this schema. /// If a table of the same name existed before, it returns "Table already exists" error. @@ -28,7 +28,7 @@ pub trait SchemaProvider: Sync + Send { /// If supported by the implementation, checks the table exist in the schema provider or not. /// If no matched table in the schema provider, return false. /// Otherwise, return true. - fn table_exist(&self, name: &str) -> bool; + fn table_exist(&self, name: &str) -> Result; } pub type SchemaProviderRef = Arc; diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index b6540361ca..c0c359550a 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -2,6 +2,10 @@ use std::any::Any; use std::collections::HashMap; use std::sync::Arc; +use common_catalog::consts::{ + INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, + SYSTEM_CATALOG_TABLE_NAME, +}; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use common_query::physical_plan::RuntimeEnv; @@ -19,10 +23,6 @@ use table::metadata::{TableId, TableInfoRef}; use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest}; use table::{Table, TableRef}; -use crate::consts::{ - INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, - SYSTEM_CATALOG_TABLE_NAME, -}; use crate::error::{ self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu, OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu, diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index a22347a018..9ae5943b6a 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -6,6 +6,8 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_stream::stream; +use common_catalog::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME}; +use common_error::ext::BoxedError; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::Result as RecordBatchResult; @@ -17,12 +19,12 @@ use datatypes::vectors::VectorRef; use futures::Stream; use snafu::ResultExt; use table::engine::TableEngineRef; +use table::error::TablesRecordBatchSnafu; use table::metadata::{TableId, TableInfoRef}; use table::table::scan::SimpleTableScan; use table::{Table, TableRef}; -use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME}; -use crate::error::InsertTableRecordSnafu; +use crate::error::{Error, InsertTableRecordSnafu}; use crate::system::{build_table_insert_request, SystemCatalogTable}; use crate::{ format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef, @@ -70,12 +72,38 @@ impl Table for Tables { let engine_name = self.engine_name.clone(); let stream = stream!({ - for catalog_name in catalogs.catalog_names() { - let catalog = catalogs.catalog(&catalog_name).unwrap(); - for schema_name in catalog.schema_names() { - let mut tables_in_schema = Vec::with_capacity(catalog.schema_names().len()); - let schema = catalog.schema(&schema_name).unwrap(); - for table_name in schema.table_names() { + for catalog_name in catalogs + .catalog_names() + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)? + { + let catalog = catalogs + .catalog(&catalog_name) + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)? + .unwrap(); + for schema_name in catalog + .schema_names() + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)? + { + let mut tables_in_schema = Vec::with_capacity( + catalog + .schema_names() + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)? + .len(), + ); + let schema = catalog + .schema(&schema_name) + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)? + .unwrap(); + for table_name in schema + .table_names() + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)? + { tables_in_schema.push(table_name); } @@ -159,17 +187,20 @@ impl SchemaProvider for InformationSchema { self } - fn table_names(&self) -> Vec { - vec!["tables".to_string(), SYSTEM_CATALOG_TABLE_NAME.to_string()] + fn table_names(&self) -> Result, Error> { + Ok(vec![ + "tables".to_string(), + SYSTEM_CATALOG_TABLE_NAME.to_string(), + ]) } - fn table(&self, name: &str) -> Option { + fn table(&self, name: &str) -> Result, Error> { if name.eq_ignore_ascii_case("tables") { - Some(self.tables.clone()) + Ok(Some(self.tables.clone())) } else if name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) { - Some(self.system.clone()) + Ok(Some(self.system.clone())) } else { - None + Ok(None) } } @@ -185,8 +216,9 @@ impl SchemaProvider for InformationSchema { panic!("System catalog & schema does not support deregister table") } - fn table_exist(&self, name: &str) -> bool { - name.eq_ignore_ascii_case("tables") || name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) + fn table_exist(&self, name: &str) -> Result { + Ok(name.eq_ignore_ascii_case("tables") + || name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME)) } } @@ -231,23 +263,23 @@ impl CatalogProvider for SystemCatalog { self } - fn schema_names(&self) -> Vec { - vec![INFORMATION_SCHEMA_NAME.to_string()] + fn schema_names(&self) -> Result, Error> { + Ok(vec![INFORMATION_SCHEMA_NAME.to_string()]) } fn register_schema( &self, _name: String, _schema: SchemaProviderRef, - ) -> Option { + ) -> Result, Error> { panic!("System catalog does not support registering schema!") } - fn schema(&self, name: &str) -> Option> { + fn schema(&self, name: &str) -> Result>, Error> { if name.eq_ignore_ascii_case(INFORMATION_SCHEMA_NAME) { - Some(self.information_schema.clone()) + Ok(Some(self.information_schema.clone())) } else { - None + Ok(None) } } } @@ -287,7 +319,9 @@ mod tests { use table::table::numbers::NumbersTable; use super::*; - use crate::memory::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; + use crate::local::memory::{ + new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider, + }; use crate::CatalogList; #[tokio::test] @@ -298,8 +332,12 @@ mod tests { schema .register_table("test_table".to_string(), Arc::new(NumbersTable::default())) .unwrap(); - catalog_provider.register_schema("test_schema".to_string(), schema); - catalog_list.register_catalog("test_catalog".to_string(), catalog_provider); + catalog_provider + .register_schema("test_schema".to_string(), schema) + .unwrap(); + catalog_list + .register_catalog("test_catalog".to_string(), catalog_provider) + .unwrap(); let tables = Tables::new(catalog_list, "test_engine".to_string()); let tables_stream = tables.scan(&None, &[], None).await.unwrap(); diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 620037e4a8..d54cddba42 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -27,11 +27,11 @@ tokio = { version = "1.0", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -[dev-dependencies.substrait_proto] -package = "substrait" -version = "0.2" - # TODO(ruihang): upgrade to 0.11 once substrait-rs supports it. [dev-dependencies.prost_09] package = "prost" version = "0.9" + +[dev-dependencies.substrait_proto] +package = "substrait" +version = "0.2" diff --git a/src/common/catalog/Cargo.toml b/src/common/catalog/Cargo.toml new file mode 100644 index 0000000000..1623bf6038 --- /dev/null +++ b/src/common/catalog/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "common-catalog" +version = "0.1.0" +edition = "2021" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +common-error = { path = "../error" } +common-telemetry = { path = "../telemetry" } +datatypes = { path = "../../datatypes" } +lazy_static = "1.4" +regex = "1.6" +serde = "1.0" +serde_json = "1.0" +snafu = { version = "0.7", features = ["backtraces"] } +table = { path = "../../table" } + +[dev-dependencies] +chrono = "0.4" +tempdir = "0.3" +tokio = { version = "1.0", features = ["full"] } diff --git a/src/catalog/src/consts.rs b/src/common/catalog/src/consts.rs similarity index 78% rename from src/catalog/src/consts.rs rename to src/common/catalog/src/consts.rs index 68a82a4170..db6b41af3a 100644 --- a/src/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -11,3 +11,7 @@ pub const MIN_USER_TABLE_ID: u32 = 1024; pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0; /// scripts table id pub const SCRIPTS_TABLE_ID: u32 = 1; + +pub(crate) const CATALOG_KEY_PREFIX: &str = "__c"; +pub(crate) const SCHEMA_KEY_PREFIX: &str = "__s"; +pub(crate) const TABLE_KEY_PREFIX: &str = "__t"; diff --git a/src/common/catalog/src/error.rs b/src/common/catalog/src/error.rs new file mode 100644 index 0000000000..36682b0ae6 --- /dev/null +++ b/src/common/catalog/src/error.rs @@ -0,0 +1,45 @@ +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::prelude::{Snafu, StatusCode}; +use snafu::{Backtrace, ErrorCompat}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Invalid catalog info: {}", key))] + InvalidCatalog { key: String, backtrace: Backtrace }, + + #[snafu(display("Failed to deserialize catalog entry value: {}", raw))] + DeserializeCatalogEntryValue { + raw: String, + backtrace: Backtrace, + source: serde_json::error::Error, + }, + + #[snafu(display("Failed to serialize catalog entry value"))] + SerializeCatalogEntryValue { + backtrace: Backtrace, + source: serde_json::error::Error, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidCatalog { .. } + | Error::DeserializeCatalogEntryValue { .. } + | Error::SerializeCatalogEntryValue { .. } => StatusCode::Unexpected, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/common/catalog/src/helper.rs b/src/common/catalog/src/helper.rs new file mode 100644 index 0000000000..02dc88e85d --- /dev/null +++ b/src/common/catalog/src/helper.rs @@ -0,0 +1,283 @@ +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize, Serializer}; +use snafu::{ensure, OptionExt, ResultExt}; +use table::metadata::{TableId, TableMeta, TableVersion}; + +use crate::consts::{CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_KEY_PREFIX}; +use crate::error::{ + DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu, +}; + +lazy_static! { + static ref CATALOG_KEY_PATTERN: Regex = Regex::new(&format!( + "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)$", + CATALOG_KEY_PREFIX + )) + .unwrap(); +} + +lazy_static! { + static ref SCHEMA_KEY_PATTERN: Regex = Regex::new(&format!( + "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)$", + SCHEMA_KEY_PREFIX + )) + .unwrap(); +} + +lazy_static! { + static ref TABLE_KEY_PATTERN: Regex = Regex::new(&format!( + "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)-([a-zA-Z_]+)$", + TABLE_KEY_PREFIX + )) + .unwrap(); +} + +pub fn build_catalog_prefix() -> String { + format!("{}-", CATALOG_KEY_PREFIX) +} + +pub fn build_schema_prefix(catalog_name: impl AsRef) -> String { + format!("{}-{}-", SCHEMA_KEY_PREFIX, catalog_name.as_ref()) +} + +pub fn build_table_prefix(catalog_name: impl AsRef, schema_name: impl AsRef) -> String { + format!( + "{}-{}-{}-", + TABLE_KEY_PREFIX, + catalog_name.as_ref(), + schema_name.as_ref() + ) +} + +pub struct TableKey { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub version: TableVersion, + pub node_id: String, +} + +impl Display for TableKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(TABLE_KEY_PREFIX)?; + f.write_str("-")?; + f.write_str(&self.catalog_name)?; + f.write_str("-")?; + f.write_str(&self.schema_name)?; + f.write_str("-")?; + f.write_str(&self.table_name)?; + f.write_str("-")?; + f.serialize_u64(self.version)?; + f.write_str("-")?; + f.write_str(&self.node_id) + } +} + +impl TableKey { + pub fn parse>(s: S) -> Result { + let key = s.as_ref(); + let captures = TABLE_KEY_PATTERN + .captures(key) + .context(InvalidCatalogSnafu { key })?; + ensure!(captures.len() == 6, InvalidCatalogSnafu { key }); + + let version = + u64::from_str(&captures[4]).map_err(|_| InvalidCatalogSnafu { key }.build())?; + Ok(Self { + catalog_name: captures[1].to_string(), + schema_name: captures[2].to_string(), + table_name: captures[3].to_string(), + version, + node_id: captures[5].to_string(), + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TableValue { + pub id: TableId, + pub node_id: String, + pub meta: TableMeta, +} + +impl TableValue { + pub fn parse(s: impl AsRef) -> Result { + serde_json::from_str(s.as_ref()) + .context(DeserializeCatalogEntryValueSnafu { raw: s.as_ref() }) + } + + pub fn as_bytes(&self) -> Result, Error> { + Ok(serde_json::to_string(self) + .context(SerializeCatalogEntryValueSnafu)? + .into_bytes()) + } +} + +pub struct CatalogKey { + pub catalog_name: String, + pub node_id: String, +} + +impl Display for CatalogKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(CATALOG_KEY_PREFIX)?; + f.write_str("-")?; + f.write_str(&self.catalog_name)?; + f.write_str("-")?; + f.write_str(&self.node_id) + } +} + +impl CatalogKey { + pub fn parse(s: impl AsRef) -> Result { + let key = s.as_ref(); + let captures = CATALOG_KEY_PATTERN + .captures(key) + .context(InvalidCatalogSnafu { key })?; + ensure!(captures.len() == 3, InvalidCatalogSnafu { key }); + Ok(Self { + catalog_name: captures[1].to_string(), + node_id: captures[2].to_string(), + }) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CatalogValue; + +impl CatalogValue { + pub fn to_bytes(&self) -> Result, Error> { + Ok(serde_json::to_string(self) + .context(SerializeCatalogEntryValueSnafu)? + .into_bytes()) + } +} + +pub struct SchemaKey { + pub catalog_name: String, + pub schema_name: String, + pub node_id: String, +} + +impl Display for SchemaKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(SCHEMA_KEY_PREFIX)?; + f.write_str("-")?; + f.write_str(&self.catalog_name)?; + f.write_str("-")?; + f.write_str(&self.schema_name)?; + f.write_str("-")?; + f.write_str(&self.node_id) + } +} + +impl SchemaKey { + pub fn parse(s: impl AsRef) -> Result { + let key = s.as_ref(); + let captures = SCHEMA_KEY_PATTERN + .captures(key) + .context(InvalidCatalogSnafu { key })?; + ensure!(captures.len() == 4, InvalidCatalogSnafu { key }); + + Ok(Self { + catalog_name: captures[1].to_string(), + schema_name: captures[2].to_string(), + node_id: captures[3].to_string(), + }) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SchemaValue; + +impl SchemaValue { + pub fn to_bytes(&self) -> Result, Error> { + Ok(serde_json::to_string(self) + .context(SerializeCatalogEntryValueSnafu)? + .into_bytes()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + + use super::*; + + #[test] + fn test_parse_catalog_key() { + let key = "__c-C-N"; + let catalog_key = CatalogKey::parse(key).unwrap(); + assert_eq!("C", catalog_key.catalog_name); + assert_eq!("N", catalog_key.node_id); + assert_eq!(key, catalog_key.to_string()); + } + + #[test] + fn test_parse_schema_key() { + let key = "__s-C-S-N"; + let schema_key = SchemaKey::parse(key).unwrap(); + assert_eq!("C", schema_key.catalog_name); + assert_eq!("S", schema_key.schema_name); + assert_eq!("N", schema_key.node_id); + assert_eq!(key, schema_key.to_string()); + } + + #[test] + fn test_parse_table_key() { + let key = "__t-C-S-T-42-N"; + let entry = TableKey::parse(key).unwrap(); + assert_eq!("C", entry.catalog_name); + assert_eq!("S", entry.schema_name); + assert_eq!("T", entry.table_name); + assert_eq!("N", entry.node_id); + assert_eq!(42, entry.version); + assert_eq!(key, &entry.to_string()); + } + + #[test] + fn test_build_prefix() { + assert_eq!("__c-", build_catalog_prefix()); + assert_eq!("__s-CATALOG-", build_schema_prefix("CATALOG")); + assert_eq!( + "__t-CATALOG-SCHEMA-", + build_table_prefix("CATALOG", "SCHEMA") + ); + } + + #[test] + fn test_serialize_schema() { + let schema_ref = Arc::new(Schema::new(vec![ColumnSchema::new( + "name", + ConcreteDataType::string_datatype(), + true, + )])); + + let meta = TableMeta { + schema: schema_ref, + engine: "mito".to_string(), + created_on: chrono::DateTime::default(), + primary_key_indices: vec![0, 1], + next_column_id: 3, + engine_options: Default::default(), + value_indices: vec![2, 3], + options: Default::default(), + }; + + let value = TableValue { + id: 42, + node_id: "localhost".to_string(), + meta, + }; + let serialized = serde_json::to_string(&value).unwrap(); + let deserialized = TableValue::parse(&serialized).unwrap(); + assert_eq!(value, deserialized); + } +} diff --git a/src/common/catalog/src/lib.rs b/src/common/catalog/src/lib.rs new file mode 100644 index 0000000000..3f40c38d46 --- /dev/null +++ b/src/common/catalog/src/lib.rs @@ -0,0 +1,8 @@ +pub mod consts; +pub mod error; +mod helper; + +pub use helper::{ + build_catalog_prefix, build_schema_prefix, build_table_prefix, CatalogKey, CatalogValue, + SchemaKey, SchemaValue, TableKey, TableValue, +}; diff --git a/src/common/substrait/Cargo.toml b/src/common/substrait/Cargo.toml index 960a21e541..db563275b4 100644 --- a/src/common/substrait/Cargo.toml +++ b/src/common/substrait/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] bytes = "1.1" catalog = { path = "../../catalog" } +common-catalog = { path = "../catalog" } common-error = { path = "../error" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ "simd", diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index 92ba98a923..a8f2426340 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -151,7 +151,7 @@ impl DFLogicalSubstraitConvertor { // Get table handle from catalog manager let table_ref = self .catalog_manager - .table(Some(&catalog_name), Some(&schema_name), &table_name) + .table(&catalog_name, &schema_name, &table_name) .map_err(BoxedError::new) .context(InternalSnafu)? .context(TableNotFoundSnafu { @@ -279,11 +279,12 @@ impl DFLogicalSubstraitConvertor { #[cfg(test)] mod test { + use catalog::local::LocalCatalogManager; use catalog::{ - memory::{MemoryCatalogProvider, MemorySchemaProvider}, - CatalogList, CatalogProvider, LocalCatalogManager, RegisterTableRequest, - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, + local::{MemoryCatalogProvider, MemorySchemaProvider}, + CatalogList, CatalogProvider, RegisterTableRequest, }; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::schema::Schema; use table::{requests::CreateTableRequest, test_util::EmptyTable, test_util::MockTableEngine}; @@ -300,8 +301,12 @@ mod test { ); let schema_provider = Arc::new(MemorySchemaProvider::new()); let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); - catalog_manager.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + catalog_provider + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .unwrap(); + catalog_manager + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .unwrap(); catalog_manager.init().await.unwrap(); catalog_manager @@ -338,8 +343,8 @@ mod test { ))); catalog_manager .register_table(RegisterTableRequest { - catalog: Some(DEFAULT_CATALOG_NAME.to_string()), - schema: Some(DEFAULT_SCHEMA_NAME.to_string()), + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: DEFAULT_TABLE_NAME.to_string(), table_id: 1, table: table_ref.clone(), diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index e6ae94049a..031e78a424 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -14,6 +14,7 @@ axum = "0.6.0-rc.2" axum-macros = "0.3.0-rc.1" catalog = { path = "../catalog" } common-base = { path = "../common/base" } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-query = { path = "../common/query" } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a0b7be3784..408a1cbd21 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -267,6 +267,18 @@ pub enum Error { duplicated: String, backtrace: Backtrace, }, + + #[snafu(display("Failed to access catalog, source: {}", source))] + Catalog { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display("Failed to find table {} from catalog, source: {}", table_name, source))] + FindTable { + table_name: String, + source: catalog::error::Error, + }, } pub type Result = std::result::Result; @@ -278,7 +290,7 @@ impl ErrorExt for Error { Error::DecodeLogicalPlan { source } => source.status_code(), Error::ExecutePhysicalPlan { source } => source.status_code(), Error::NewCatalog { source } => source.status_code(), - + Error::FindTable { source, .. } => source.status_code(), Error::CreateTable { source, .. } | Error::GetTable { source, .. } | Error::AlterTable { source, .. } => source.status_code(), @@ -321,7 +333,8 @@ impl ErrorExt for Error { | Error::Conversion { .. } | Error::IntoPhysicalPlan { .. } | Error::UnsupportedExpr { .. } - | Error::ColumnDataType { .. } => StatusCode::Internal, + | Error::ColumnDataType { .. } + | Error::Catalog { .. } => StatusCode::Internal, Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), @@ -366,7 +379,7 @@ mod tests { ))) } - fn throw_catalog_error() -> std::result::Result<(), catalog::error::Error> { + fn throw_catalog_error() -> catalog::error::Result<()> { Err(catalog::error::Error::RegisterTable { source: BoxedError::new(MockError::with_backtrace(StatusCode::Internal)), }) diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 914edff089..4c88fdb5ed 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -47,7 +47,7 @@ impl Instance { object_store, )); let catalog_manager = Arc::new( - catalog::LocalCatalogManager::try_new(table_engine.clone()) + catalog::local::LocalCatalogManager::try_new(table_engine.clone()) .await .context(NewCatalogSnafu)?, ); @@ -96,7 +96,7 @@ impl Instance { )); let catalog_manager = Arc::new( - catalog::LocalCatalogManager::try_new(mock_engine.clone()) + catalog::local::manager::LocalCatalogManager::try_new(mock_engine.clone()) .await .unwrap(), ); diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 5b736f7345..f0e93dd4d3 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -3,7 +3,7 @@ use api::v1::{ ObjectExpr, ObjectResult, SelectExpr, }; use async_trait::async_trait; -use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::status_code::StatusCode; use common_query::Output; use common_telemetry::logging::{debug, info}; @@ -14,8 +14,8 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::requests::AddColumnRequest; use crate::error::{ - self, DecodeLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, Result, TableNotFoundSnafu, - UnsupportedExprSnafu, + self, CatalogSnafu, DecodeLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, Result, + TableNotFoundSnafu, UnsupportedExprSnafu, }; use crate::instance::Instance; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; @@ -99,13 +99,15 @@ impl Instance { .catalog_manager .catalog(catalog_name) .unwrap() + .expect("default catalog must exist") .schema(schema_name) + .expect("default schema must exist") .unwrap(); let insert_batches = insert::insert_batches(values.values)?; ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu); - let table = if let Some(table) = schema_provider.table(table_name) { + let table = if let Some(table) = schema_provider.table(table_name).context(CatalogSnafu)? { let schema = table.schema(); if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? { self.add_new_columns_to_table(table_name, add_columns) @@ -124,6 +126,7 @@ impl Instance { schema_provider .table(table_name) + .context(CatalogSnafu)? .context(TableNotFoundSnafu { table_name })? }; diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index e204a04621..004d247bdd 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::BoxedError; use common_query::Output; use common_telemetry::{ @@ -38,8 +38,10 @@ impl Instance { let schema_provider = self .catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .expect("datafusion does not accept fallible catalog access") .unwrap() .schema(DEFAULT_SCHEMA_NAME) + .expect("datafusion does not accept fallible catalog access") .unwrap(); let request = self.sql_handler.insert_to_request(schema_provider, *i)?; @@ -52,9 +54,9 @@ impl Instance { // TODO(hl): Select table engine by engine_name let request = self.sql_handler.create_to_request(table_id, c)?; - let catalog_name = request.catalog_name.clone(); - let schema_name = request.schema_name.clone(); - let table_name = request.table_name.clone(); + let catalog_name = &request.catalog_name; + let schema_name = &request.schema_name; + let table_name = &request.table_name; let table_id = request.id; info!( "Creating table, catalog: {:?}, schema: {:?}, table name: {:?}, table id: {}", diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index dde2b56202..bb57008f92 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDef, CreateExpr}; -use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::{ErrorExt, StatusCode}; use common_query::Output; use datatypes::schema::ColumnDefaultConstraint; @@ -173,7 +173,6 @@ fn create_column_schema(column_def: &ColumnDef) -> Result { mod tests { use std::collections::HashMap; - use catalog::MIN_USER_TABLE_ID; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; @@ -188,7 +187,7 @@ mod tests { let expr = testing_create_expr(); let request = instance.create_expr_to_request(expr).unwrap(); - assert_eq!(request.id, MIN_USER_TABLE_ID); + assert_eq!(request.id, common_catalog::consts::MIN_USER_TABLE_ID); assert_eq!(request.catalog_name, "greptime".to_string()); assert_eq!(request.schema_name, "public".to_string()); assert_eq!(request.table_name, "my-metrics"); diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index f9f1b3009d..2de3806811 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -1,9 +1,7 @@ //! sql handler -use catalog::{ - schema::SchemaProviderRef, CatalogManagerRef, CatalogProviderRef, DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, -}; +use catalog::{schema::SchemaProviderRef, CatalogManagerRef, CatalogProviderRef}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use snafu::{OptionExt, ResultExt}; use sql::statements::show::{ShowDatabases, ShowTables}; @@ -12,7 +10,8 @@ use table::requests::*; use table::TableRef; use crate::error::{ - CatalogNotFoundSnafu, GetTableSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, + CatalogNotFoundSnafu, CatalogSnafu, GetTableSnafu, Result, SchemaNotFoundSnafu, + TableNotFoundSnafu, }; mod alter; @@ -63,6 +62,7 @@ impl SqlHandler { pub(crate) fn get_default_catalog(&self) -> Result { self.catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .context(CatalogSnafu)? .context(CatalogNotFoundSnafu { name: DEFAULT_CATALOG_NAME, }) @@ -71,10 +71,12 @@ impl SqlHandler { pub(crate) fn get_default_schema(&self) -> Result { self.catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .context(CatalogSnafu)? .context(CatalogNotFoundSnafu { name: DEFAULT_CATALOG_NAME, })? .schema(DEFAULT_SCHEMA_NAME) + .context(CatalogSnafu)? .context(SchemaNotFoundSnafu { name: DEFAULT_SCHEMA_NAME, }) @@ -158,13 +160,13 @@ mod tests { self } - fn table_names(&self) -> Vec { - vec!["demo".to_string()] + fn table_names(&self) -> catalog::error::Result> { + Ok(vec!["demo".to_string()]) } - fn table(&self, name: &str) -> Option { + fn table(&self, name: &str) -> catalog::error::Result> { assert_eq!(name, "demo"); - Some(Arc::new(DemoTable {})) + Ok(Some(Arc::new(DemoTable {}))) } fn register_table( @@ -177,8 +179,8 @@ mod tests { fn deregister_table(&self, _name: &str) -> catalog::error::Result> { unimplemented!(); } - fn table_exist(&self, name: &str) -> bool { - name == "demo" + fn table_exist(&self, name: &str) -> catalog::error::Result { + Ok(name == "demo") } } @@ -205,7 +207,7 @@ mod tests { )); let catalog_list = Arc::new( - catalog::LocalCatalogManager::try_new(table_engine.clone()) + catalog::local::LocalCatalogManager::try_new(table_engine.clone()) .await .unwrap(), ); diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index f096a8aaff..6887da8a8c 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -23,8 +23,7 @@ use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn create(&self, req: CreateTableRequest) -> Result { let ctx = EngineContext {}; - let catalog_name = req.catalog_name.clone(); - let schema_name = req.schema_name.clone(); + // determine catalog and schema from the very beginning let table_name = req.table_name.clone(); let table_id = req.id; @@ -37,8 +36,8 @@ impl SqlHandler { })?; let register_req = RegisterTableRequest { - catalog: Some(catalog_name.to_string()), - schema: Some(schema_name.to_string()), + catalog: table.table_info().catalog_name.clone(), + schema: table.table_info().schema_name.clone(), table_name: table_name.clone(), table_id, table, diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index e4a0d52db7..b7fbfe39bc 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -10,8 +10,8 @@ use sql::statements::{self, insert::Insert}; use table::requests::*; use crate::error::{ - ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlValueSnafu, Result, - TableNotFoundSnafu, + CatalogSnafu, ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, + ParseSqlValueSnafu, Result, TableNotFoundSnafu, }; use crate::sql::{SqlHandler, SqlRequest}; @@ -41,6 +41,7 @@ impl SqlHandler { let table = schema_provider .table(&table_name) + .context(CatalogSnafu)? .context(TableNotFoundSnafu { table_name: &table_name, })?; diff --git a/src/datanode/src/sql/show.rs b/src/datanode/src/sql/show.rs index f8554e23cf..d5c0212cd8 100644 --- a/src/datanode/src/sql/show.rs +++ b/src/datanode/src/sql/show.rs @@ -11,8 +11,8 @@ use snafu::{ensure, OptionExt, ResultExt}; use sql::statements::show::{ShowDatabases, ShowKind, ShowTables}; use crate::error::{ - ArrowComputationSnafu, CastVectorSnafu, NewRecordBatchSnafu, NewRecordBatchesSnafu, Result, - SchemaNotFoundSnafu, UnsupportedExprSnafu, + ArrowComputationSnafu, CastVectorSnafu, CatalogSnafu, NewRecordBatchSnafu, + NewRecordBatchesSnafu, Result, SchemaNotFoundSnafu, UnsupportedExprSnafu, }; use crate::sql::SqlHandler; @@ -43,7 +43,7 @@ impl SqlHandler { let catalog = self.get_default_catalog()?; // TODO(dennis): return an iterator or stream would be better. - let schemas = catalog.schema_names(); + let schemas = catalog.schema_names().context(CatalogSnafu)?; let column_schemas = vec![ColumnSchema::new( SCHEMAS_COLUMN, @@ -77,11 +77,14 @@ impl SqlHandler { let schema = if let Some(name) = &stmt.database { let catalog = self.get_default_catalog()?; - catalog.schema(name).context(SchemaNotFoundSnafu { name })? + catalog + .schema(name) + .context(CatalogSnafu)? + .context(SchemaNotFoundSnafu { name })? } else { self.get_default_schema()? }; - let tables = schema.table_names(); + let tables = schema.table_names().context(CatalogSnafu)?; let column_schemas = vec![ColumnSchema::new( TABLES_COLUMN, diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 9fff98da2c..19996908c6 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use snafu::ResultExt; @@ -81,7 +81,9 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { .catalog_manager() .catalog(DEFAULT_CATALOG_NAME) .unwrap() + .unwrap() .schema(DEFAULT_SCHEMA_NAME) + .unwrap() .unwrap(); schema_provider .register_table(table_name.to_string(), table) @@ -97,7 +99,7 @@ pub async fn create_mock_sql_handler() -> SqlHandler { object_store, )); let catalog_manager = Arc::new( - catalog::LocalCatalogManager::try_new(mock_engine.clone()) + catalog::local::LocalCatalogManager::try_new(mock_engine.clone()) .await .unwrap(), ); diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 831b43e908..ad1c5a9003 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" arc-swap = "1.0" async-trait = "0.1" catalog = { path = "../catalog" } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-function = { path = "../common/function" } common-query = { path = "../common/query" } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 074ed76efb..bba9ff86e1 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -233,10 +233,9 @@ mod tests { use std::sync::Arc; use arrow::array::UInt64Array; - use catalog::memory::{MemoryCatalogProvider, MemorySchemaProvider}; - use catalog::{ - CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, - }; + use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; + use catalog::{CatalogList, CatalogProvider, SchemaProvider}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use common_recordbatch::util; use datafusion::field_util::FieldExt; @@ -246,15 +245,19 @@ mod tests { use crate::query_engine::{QueryEngineFactory, QueryEngineRef}; fn create_test_engine() -> QueryEngineRef { - let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); + let catalog_list = catalog::local::new_memory_catalog_list().unwrap(); let default_schema = Arc::new(MemorySchemaProvider::new()); default_schema .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) .unwrap(); let default_catalog = Arc::new(MemoryCatalogProvider::new()); - default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog); + default_catalog + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema) + .unwrap(); + catalog_list + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog) + .unwrap(); let factory = QueryEngineFactory::new(catalog_list); factory.query_engine().clone() diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index fe704c9aaf..ab4cfdcedd 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -3,6 +3,7 @@ use std::any::Any; use std::sync::Arc; +use catalog::error::Error; use catalog::{ CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef, }; @@ -45,16 +46,21 @@ impl DfCatalogList for DfCatalogListAdapter { }); self.catalog_list .register_catalog(name, catalog_adapter) + .expect("datafusion does not accept fallible catalog access") // TODO(hl): datafusion register catalog does not handles errors .map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _) } fn catalog_names(&self) -> Vec { - self.catalog_list.catalog_names() + // TODO(hl): datafusion register catalog does not handles errors + self.catalog_list + .catalog_names() + .expect("datafusion does not accept fallible catalog access") } fn catalog(&self, name: &str) -> Option> { self.catalog_list .catalog(name) + .expect("datafusion does not accept fallible catalog access") // TODO(hl): datafusion register catalog does not handles errors .map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _) } } @@ -69,22 +75,23 @@ impl CatalogProvider for CatalogProviderAdapter { self } - fn schema_names(&self) -> Vec { - self.df_catalog_provider.schema_names() + fn schema_names(&self) -> catalog::error::Result> { + Ok(self.df_catalog_provider.schema_names()) } fn register_schema( &self, _name: String, _schema: SchemaProviderRef, - ) -> Option { + ) -> catalog::error::Result> { todo!("register_schema is not supported in Datafusion catalog provider") } - fn schema(&self, name: &str) -> Option> { - self.df_catalog_provider + fn schema(&self, name: &str) -> catalog::error::Result>> { + Ok(self + .df_catalog_provider .schema(name) - .map(|df_schema_provider| Arc::new(SchemaProviderAdapter { df_schema_provider }) as _) + .map(|df_schema_provider| Arc::new(SchemaProviderAdapter { df_schema_provider }) as _)) } } @@ -99,12 +106,15 @@ impl DfCatalogProvider for DfCatalogProviderAdapter { } fn schema_names(&self) -> Vec { - self.catalog_provider.schema_names() + self.catalog_provider + .schema_names() + .expect("datafusion does not accept fallible catalog access") } fn schema(&self, name: &str) -> Option> { self.catalog_provider .schema(name) + .expect("datafusion does not accept fallible catalog access") .map(|schema_provider| Arc::new(DfSchemaProviderAdapter { schema_provider }) as _) } } @@ -120,12 +130,15 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { } fn table_names(&self) -> Vec { - self.schema_provider.table_names() + self.schema_provider + .table_names() + .expect("datafusion does not accept fallible catalog access") } fn table(&self, name: &str) -> Option> { self.schema_provider .table(name) + .expect("datafusion does not accept fallible catalog access") .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) } @@ -149,7 +162,9 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { } fn table_exist(&self, name: &str) -> bool { - self.schema_provider.table_exist(name) + self.schema_provider + .table_exist(name) + .expect("datafusion does not accept fallible catalog access") } } @@ -164,12 +179,12 @@ impl SchemaProvider for SchemaProviderAdapter { } /// Retrieves the list of available table names in this schema. - fn table_names(&self) -> Vec { - self.df_schema_provider.table_names() + fn table_names(&self) -> Result, Error> { + Ok(self.df_schema_provider.table_names()) } - fn table(&self, name: &str) -> Option { - self.df_schema_provider.table(name).map(|table_provider| { + fn table(&self, name: &str) -> Result, Error> { + let table = self.df_schema_provider.table(name).map(|table_provider| { match table_provider .as_any() .downcast_ref::() @@ -182,7 +197,8 @@ impl SchemaProvider for SchemaProviderAdapter { Arc::new(adapter) as _ } } - }) + }); + Ok(table) } fn register_table( @@ -213,14 +229,14 @@ impl SchemaProvider for SchemaProviderAdapter { .transpose() } - fn table_exist(&self, name: &str) -> bool { - self.df_schema_provider.table_exist(name) + fn table_exist(&self, name: &str) -> Result { + Ok(self.df_schema_provider.table_exist(name)) } } #[cfg(test)] mod tests { - use catalog::memory::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; + use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; use table::table::numbers::NumbersTable; use super::*; @@ -234,10 +250,12 @@ mod tests { ), }; - adapter.register_schema( - "whatever".to_string(), - Arc::new(MemorySchemaProvider::new()), - ); + adapter + .register_schema( + "whatever".to_string(), + Arc::new(MemorySchemaProvider::new()), + ) + .unwrap(); } #[test] diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 621a9a2ecc..97e09959a4 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -3,7 +3,7 @@ mod state; use std::sync::Arc; -use catalog::CatalogList; +use catalog::CatalogListRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY}; use common_query::physical_plan::PhysicalPlan; @@ -43,7 +43,7 @@ pub struct QueryEngineFactory { } impl QueryEngineFactory { - pub fn new(catalog_list: Arc) -> Self { + pub fn new(catalog_list: CatalogListRef) -> Self { let query_engine = Arc::new(DatafusionQueryEngine::new(catalog_list)); for func in FUNCTION_REGISTRY.functions() { @@ -72,7 +72,7 @@ mod tests { #[test] fn test_query_engine_factory() { - let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); + let catalog_list = catalog::local::new_memory_catalog_list().unwrap(); let factory = QueryEngineFactory::new(catalog_list); let engine = factory.query_engine(); diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 2902177f99..a1d50a7cf8 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -3,6 +3,7 @@ use std::fmt; use std::sync::{Arc, RwLock}; use catalog::CatalogListRef; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::physical_plan::RuntimeEnv; use common_query::prelude::ScalarUdf; @@ -39,10 +40,7 @@ impl fmt::Debug for QueryEngineState { impl QueryEngineState { pub(crate) fn new(catalog_list: CatalogListRef) -> Self { let config = ExecutionConfig::new() - .with_default_catalog_and_schema( - catalog::DEFAULT_CATALOG_NAME, - catalog::DEFAULT_SCHEMA_NAME, - ) + .with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) .with_optimizer_rules(vec![ // TODO(hl): SimplifyExpressions is not exported. Arc::new(TypeConversionRule {}), diff --git a/src/query/tests/function.rs b/src/query/tests/function.rs index 3d51c7cd51..b55568f7b9 100644 --- a/src/query/tests/function.rs +++ b/src/query/tests/function.rs @@ -1,9 +1,8 @@ use std::sync::Arc; -use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{ - CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; +use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{CatalogList, CatalogProvider, SchemaProvider}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use common_recordbatch::{util, RecordBatch}; use datatypes::for_all_primitive_types; @@ -47,8 +46,12 @@ pub fn create_query_engine() -> Arc { .register_table(number_table.table_name().to_string(), number_table) .unwrap(); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + catalog_provider + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .unwrap(); + catalog_list + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .unwrap(); let factory = QueryEngineFactory::new(catalog_list); factory.query_engine().clone() diff --git a/src/query/tests/my_sum_udaf_example.rs b/src/query/tests/my_sum_udaf_example.rs index a2fd37ded6..40170b5d89 100644 --- a/src/query/tests/my_sum_udaf_example.rs +++ b/src/query/tests/my_sum_udaf_example.rs @@ -2,10 +2,9 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; -use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{ - CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; +use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{CatalogList, CatalogProvider, SchemaProvider}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_function::scalars::aggregate::AggregateFunctionMeta; use common_function_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_query::error::CreateAccumulatorSnafu; @@ -246,8 +245,12 @@ fn new_query_engine_factory(table: MemTable) -> QueryEngineFactory { let catalog_list = Arc::new(MemoryCatalogList::default()); schema_provider.register_table(table_name, table).unwrap(); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + catalog_provider + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .unwrap(); + catalog_list + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .unwrap(); QueryEngineFactory::new(catalog_list) } diff --git a/src/query/tests/percentile_test.rs b/src/query/tests/percentile_test.rs index 37511fde6f..3737f5d3df 100644 --- a/src/query/tests/percentile_test.rs +++ b/src/query/tests/percentile_test.rs @@ -1,9 +1,8 @@ use std::sync::Arc; mod function; -use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{ - CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; +use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{CatalogList, CatalogProvider, SchemaProvider}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use common_recordbatch::error::Result as RecordResult; use common_recordbatch::{util, RecordBatch}; @@ -137,8 +136,12 @@ fn create_correctness_engine() -> Arc { .register_table(number_table.table_name().to_string(), number_table) .unwrap(); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + catalog_provider + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .unwrap(); + catalog_list + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .unwrap(); let factory = QueryEngineFactory::new(catalog_list); factory.query_engine().clone() diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index 48e5045900..b763ab7485 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -3,10 +3,9 @@ mod pow; use std::sync::Arc; use arrow::array::UInt32Array; -use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{ - CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; +use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{CatalogList, CatalogProvider, SchemaProvider}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::prelude::{create_udf, make_scalar_function, Volatility}; use common_query::Output; use common_recordbatch::error::Result as RecordResult; @@ -34,7 +33,7 @@ use crate::pow::pow; #[tokio::test] async fn test_datafusion_query_engine() -> Result<()> { common_telemetry::init_default_ut_logging(); - let catalog_list = catalog::memory::new_memory_catalog_list()?; + let catalog_list = catalog::local::new_memory_catalog_list()?; let factory = QueryEngineFactory::new(catalog_list); let engine = factory.query_engine(); @@ -90,15 +89,19 @@ async fn test_datafusion_query_engine() -> Result<()> { #[tokio::test] async fn test_udf() -> Result<()> { common_telemetry::init_default_ut_logging(); - let catalog_list = catalog::memory::new_memory_catalog_list()?; + let catalog_list = catalog::local::new_memory_catalog_list()?; let default_schema = Arc::new(MemorySchemaProvider::new()); default_schema .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) .unwrap(); let default_catalog = Arc::new(MemoryCatalogProvider::new()); - default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog); + default_catalog + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema) + .unwrap(); + catalog_list + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog) + .unwrap(); let factory = QueryEngineFactory::new(catalog_list); let engine = factory.query_engine(); @@ -206,8 +209,12 @@ fn create_query_engine() -> Arc { .register_table(odd_number_table.table_name().to_string(), odd_number_table) .unwrap(); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + catalog_provider + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .unwrap(); + catalog_list + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .unwrap(); let factory = QueryEngineFactory::new(catalog_list); factory.query_engine().clone() diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index 469a289548..009ed069e8 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -21,6 +21,7 @@ python = [ [dependencies] async-trait = "0.1" catalog = { path = "../catalog" } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-function = { path = "../common/function" } common-query = { path = "../common/query" } diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 4c5d8e4f29..86baa4be2d 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -118,7 +118,7 @@ mod tests { )); let catalog_manager = Arc::new( - catalog::LocalCatalogManager::try_new(mock_engine.clone()) + catalog::local::LocalCatalogManager::try_new(mock_engine.clone()) .await .unwrap(), ); diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 7eea7602db..acdee232ad 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -133,10 +133,9 @@ impl ScriptEngine for PyEngine { #[cfg(test)] mod tests { - use catalog::memory::{MemoryCatalogProvider, MemorySchemaProvider}; - use catalog::{ - CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, - }; + use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; + use catalog::{CatalogList, CatalogProvider, SchemaProvider}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_recordbatch::util; use datafusion_common::field_util::FieldExt; use datafusion_common::field_util::SchemaExt; @@ -149,15 +148,19 @@ mod tests { #[tokio::test] async fn test_compile_execute() { - let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); + let catalog_list = catalog::local::new_memory_catalog_list().unwrap(); let default_schema = Arc::new(MemorySchemaProvider::new()); default_schema .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) .unwrap(); let default_catalog = Arc::new(MemoryCatalogProvider::new()); - default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog); + default_catalog + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema) + .unwrap(); + catalog_list + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog) + .unwrap(); let factory = QueryEngineFactory::new(catalog_list); let query_engine = factory.query_engine(); diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 8abd93d6a6..073601db73 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::sync::Arc; -use catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID}; use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID}; use common_query::Output; use common_recordbatch::util as record_util; use common_telemetry::logging; @@ -107,8 +107,8 @@ impl ScriptsTable { let table = self .catalog_manager .table( - Some(DEFAULT_CATALOG_NAME), - Some(DEFAULT_SCHEMA_NAME), + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_NAME, ) .context(FindScriptsTableSnafu)? diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index e2e12fa6a5..c642c43d43 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -10,6 +10,7 @@ axum = "0.6.0-rc.2" axum-macros = "0.3.0-rc.1" bytes = "1.2" common-base = { path = "../common/base" } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-query = { path = "../common/query" } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 28726853c0..25726a773a 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -2,10 +2,9 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use async_trait::async_trait; -use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{ - CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; +use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{CatalogList, CatalogProvider, SchemaProvider}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use query::{QueryEngineFactory, QueryEngineRef}; use servers::error::Result; @@ -73,8 +72,12 @@ fn create_testing_sql_query_handler(table: MemTable) -> SqlQueryHandlerRef { let catalog_provider = Arc::new(MemoryCatalogProvider::new()); let catalog_list = Arc::new(MemoryCatalogList::default()); schema_provider.register_table(table_name, table).unwrap(); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + catalog_provider + .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .unwrap(); + catalog_list + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .unwrap(); let factory = QueryEngineFactory::new(catalog_list); let query_engine = factory.query_engine().clone(); diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 3fe9b2649f..5ec1e2912e 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] catalog = { path = "../catalog" } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 0f5a89c8a5..8ea13b47d7 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -7,7 +7,7 @@ pub mod statement; use std::str::FromStr; -use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 5185ee3eb8..d429b3be43 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -238,8 +238,8 @@ impl MitoEngineInner { _ctx: &EngineContext, request: CreateTableRequest, ) -> Result { - let catalog_name = request.catalog_name; - let schema_name = request.schema_name; + let catalog_name = &request.catalog_name; + let schema_name = &request.schema_name; let table_name = &request.table_name; if let Some(table) = self.get_table(table_name) { diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index a580183aa6..2d57422cf0 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -58,6 +58,8 @@ pub fn build_test_table_info() -> TableInfo { .ident(0) .table_version(0u64) .table_type(TableType::Base) + .catalog_name("greptime".to_string()) + .schema_name("public".to_string()) .build() .unwrap() } diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 212a31220c..db418f2529 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -40,6 +40,7 @@ pub trait TableEngine: Send + Sync { fn get_table(&self, ctx: &EngineContext, name: &str) -> Result>; /// Returns true when the given table is exists. + /// TODO(hl): support catalog and schema fn table_exists(&self, ctx: &EngineContext, name: &str) -> bool; /// Drops the given table. diff --git a/src/table/src/error.rs b/src/table/src/error.rs index e1dba45c6e..2c64664ee3 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -45,6 +45,12 @@ pub enum InnerError { source: ArrowError, backtrace: Backtrace, }, + + #[snafu(display("Failed to create record batch for Tables, source: {}", source))] + TablesRecordBatch { + #[snafu(backtrace)] + source: BoxedError, + }, } impl ErrorExt for InnerError { @@ -55,6 +61,7 @@ impl ErrorExt for InnerError { | InnerError::SchemaConversion { .. } | InnerError::TableProjection { .. } => StatusCode::EngineExecuteQuery, InnerError::MissingColumn { .. } => StatusCode::InvalidArguments, + InnerError::TablesRecordBatch { .. } => StatusCode::Unexpected, } } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 08a7634d1c..c7e90e0197 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -15,26 +15,69 @@ use futures::Stream; use snafu::prelude::*; use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu}; -use crate::metadata::TableInfoRef; +use crate::metadata::{ + TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, TableVersion, +}; use crate::table::scan::SimpleTableScan; use crate::Table; #[derive(Debug, Clone)] pub struct MemTable { - table_name: String, + info: TableInfoRef, recordbatch: RecordBatch, } impl MemTable { pub fn new(table_name: impl Into, recordbatch: RecordBatch) -> Self { - Self { - table_name: table_name.into(), + Self::new_with_catalog( + table_name, recordbatch, - } + 0, + "greptime".to_string(), + "public".to_string(), + ) + } + + pub fn new_with_catalog( + table_name: impl Into, + recordbatch: RecordBatch, + table_id: TableId, + catalog_name: String, + schema_name: String, + ) -> Self { + let schema = recordbatch.schema.clone(); + + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![]) + .value_indices(vec![]) + .engine("mock".to_string()) + .next_column_id(0) + .engine_options(Default::default()) + .options(Default::default()) + .created_on(Default::default()) + .build() + .unwrap(); + + let info = Arc::new( + TableInfoBuilder::default() + .table_id(table_id) + .table_version(0 as TableVersion) + .name(table_name.into()) + .schema_name(schema_name) + .catalog_name(catalog_name) + .desc(None) + .table_type(TableType::Base) + .meta(meta) + .build() + .unwrap(), + ); + + Self { info, recordbatch } } pub fn table_name(&self) -> &str { - &self.table_name + &self.info.name } /// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and @@ -65,7 +108,7 @@ impl Table for MemTable { } fn table_info(&self) -> TableInfoRef { - unimplemented!() + self.info.clone() } async fn scan(