From 1dd780d85785b261e921896d7fca4d6442217c3e Mon Sep 17 00:00:00 2001 From: "Lei, Huang" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 11 Aug 2022 15:43:59 +0800 Subject: [PATCH] feat: implement catalog manager (#129) Implement catalog manager that provides a vision of all existing tables while instance start. Current implementation is based on local table engine, all catalog info is stored in an system catalog table. --- Cargo.lock | 11 + src/catalog/Cargo.toml | 9 + src/catalog/src/catalog.rs | 44 --- src/catalog/src/consts.rs | 6 + src/catalog/src/error.rs | 208 +++++++++++++- src/catalog/src/lib.rs | 67 ++++- src/catalog/src/manager.rs | 233 +++++++++++++++ src/catalog/src/memory.rs | 128 +++++---- src/catalog/src/system.rs | 303 ++++++++++++++++++++ src/catalog/src/tables.rs | 251 ++++++++++++++++ src/datanode/src/datanode.rs | 10 +- src/datanode/src/error.rs | 6 +- src/datanode/src/instance.rs | 38 ++- src/datanode/src/server/http/handler.rs | 7 +- src/datanode/src/tests/http_test.rs | 3 +- src/query/Cargo.toml | 2 + src/query/src/datafusion.rs | 16 ++ src/query/src/datafusion/catalog_adapter.rs | 84 +++++- src/query/src/datafusion/error.rs | 17 +- src/query/tests/my_sum_udaf_example.rs | 6 +- src/query/tests/query_engine_test.rs | 16 +- src/sql/src/parsers/create_parser.rs | 2 +- src/storage/src/manifest/storage.rs | 1 - src/table-engine/src/engine.rs | 17 +- src/table-engine/src/error.rs | 4 +- src/table/src/engine.rs | 3 + 26 files changed, 1325 insertions(+), 167 deletions(-) delete mode 100644 src/catalog/src/catalog.rs create mode 100644 src/catalog/src/consts.rs create mode 100644 src/catalog/src/manager.rs create mode 100644 src/catalog/src/system.rs create mode 100644 src/catalog/src/tables.rs diff --git a/Cargo.lock b/Cargo.lock index f10e7ed396..2b09c5ae07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,9 +510,18 @@ checksum = "a2698f953def977c68f935bb0dfa959375ad4638570e969e2f1e9f433cbf1af6" name = "catalog" version = "0.1.0" dependencies = [ + "async-stream", + "async-trait", "common-error", + "common-query", + "common-recordbatch", "common-telemetry", "datafusion", + "datatypes", + "futures", + "futures-util", + "serde", + "serde_json", "snafu", "table", "tokio", @@ -3015,6 +3024,8 @@ dependencies = [ "num", "num-traits", "rand 0.8.5", + "serde", + "serde_json", "snafu", "sql", "table", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 649786a08e..09311839ff 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -6,9 +6,18 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1" +async-stream = "0.3" common-error = { path = "../common/error" } +common-query = { path = "../common/query" } +common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datatypes = { path = "../datatypes" } +futures = "0.3" +futures-util = "0.3" +serde = "1.0" +serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } diff --git a/src/catalog/src/catalog.rs b/src/catalog/src/catalog.rs deleted file mode 100644 index c2040eedc1..0000000000 --- a/src/catalog/src/catalog.rs +++ /dev/null @@ -1,44 +0,0 @@ -use std::any::Any; -use std::sync::Arc; - -use crate::schema::SchemaProvider; - -/// Represent a list of named catalogs -pub trait CatalogList: Sync + Send { - /// Returns the catalog list as [`Any`](std::any::Any) - /// so that it can be downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Adds a new catalog to this catalog list - /// If a catalog of the same name existed before, it is replaced in the list and returned. - fn register_catalog( - &self, - name: String, - catalog: Arc, - ) -> Option>; - - /// Retrieves the list of available catalog names - fn catalog_names(&self) -> Vec; - - /// Retrieves a specific catalog by name, provided it exists. - fn catalog(&self, name: &str) -> Option>; -} - -/// Represents a catalog, comprising a number of named schemas. -pub trait CatalogProvider: Sync + Send { - /// Returns the catalog provider as [`Any`](std::any::Any) - /// so that it can be downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Retrieves the list of available schema names in this catalog. - fn schema_names(&self) -> Vec; - - /// Retrieves a specific schema from the catalog by name, provided it exists. - fn schema(&self, name: &str) -> Option>; -} - -pub type CatalogListRef = Arc; -pub type CatalogProviderRef = Arc; - -pub const DEFAULT_CATALOG_NAME: &str = "greptime"; -pub const DEFAULT_SCHEMA_NAME: &str = "public"; diff --git a/src/catalog/src/consts.rs b/src/catalog/src/consts.rs new file mode 100644 index 0000000000..f0307d162f --- /dev/null +++ b/src/catalog/src/consts.rs @@ -0,0 +1,6 @@ +pub const SYSTEM_CATALOG_NAME: &str = "system"; +pub const INFORMATION_SCHEMA_NAME: &str = "information_schema"; +pub const SYSTEM_CATALOG_TABLE_ID: u64 = 0; +pub const SYSTEM_CATALOG_TABLE_NAME: &str = "system_catalog"; +pub const DEFAULT_CATALOG_NAME: &str = "greptime"; +pub const DEFAULT_SCHEMA_NAME: &str = "public"; diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 1253da97d2..f527b3d845 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -1,11 +1,209 @@ -use datafusion::error::DataFusionError; +use std::any::Any; -common_error::define_opaque_error!(Error); +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::prelude::{Snafu, StatusCode}; +use datafusion::error::DataFusionError; +use datatypes::arrow; +use snafu::{Backtrace, ErrorCompat}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to open system catalog table, source: {}", source))] + OpenSystemCatalog { + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("Failed to create system catalog table, source: {}", source))] + CreateSystemCatalog { + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("System catalog is not valid: {}", msg))] + SystemCatalog { msg: String, backtrace: Backtrace }, + + #[snafu(display( + "System catalog table type mismatch, expected: binary, found: {:?} source: {}", + data_type, + source + ))] + SystemCatalogTypeMismatch { + data_type: arrow::datatypes::DataType, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Invalid system catalog entry type: {:?}", entry_type))] + InvalidEntryType { entry_type: Option }, + + #[snafu(display("Invalid system catalog key: {:?}", key))] + InvalidKey { key: Option }, + + #[snafu(display("Catalog value is not present"))] + EmptyValue, + + #[snafu(display("Failed to deserialize value, source: {}", source))] + ValueDeserialize { + source: serde_json::error::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Cannot find catalog by name: {}", catalog_name))] + CatalogNotFound { catalog_name: String }, + + #[snafu(display("Cannot find schema, schema info: {}", schema_info))] + SchemaNotFound { schema_info: String }, + + #[snafu(display("Table {} already exists", table))] + TableExists { table: String, backtrace: Backtrace }, + + #[snafu(display("Failed to register table"))] + RegisterTable { + #[snafu(backtrace)] + source: BoxedError, + }, + + #[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))] + OpenTable { + table_info: String, + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("Table not found while opening table, table info: {}", table_info))] + TableNotFound { table_info: String }, + + #[snafu(display("Failed to read system catalog table records"))] + ReadSystemCatalog { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to build table schema for system catalog table"))] + SystemCatalogSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, +} pub type Result = std::result::Result; -impl From for DataFusionError { - fn from(e: Error) -> DataFusionError { - DataFusionError::External(Box::new(e)) +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidKey { .. } + | Error::OpenSystemCatalog { .. } + | Error::CreateSystemCatalog { .. } + | Error::SchemaNotFound { .. } + | Error::TableNotFound { .. } + | Error::InvalidEntryType { .. } => StatusCode::Unexpected, + Error::SystemCatalog { .. } + | Error::SystemCatalogTypeMismatch { .. } + | Error::EmptyValue + | Error::ValueDeserialize { .. } + | Error::CatalogNotFound { .. } + | Error::OpenTable { .. } + | Error::ReadSystemCatalog { .. } => StatusCode::StorageUnavailable, + Error::RegisterTable { .. } | Error::SystemCatalogSchema { .. } => StatusCode::Internal, + Error::TableExists { .. } => StatusCode::TableAlreadyExists, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for DataFusionError { + fn from(e: Error) -> Self { + DataFusionError::Internal(e.to_string()) + } +} + +#[cfg(test)] +mod tests { + use common_error::mock::MockError; + use datatypes::arrow::datatypes::DataType; + use snafu::GenerateImplicitData; + + use super::*; + + #[test] + pub fn test_error_status_code() { + assert_eq!( + StatusCode::TableAlreadyExists, + Error::TableExists { + table: "some_table".to_string(), + backtrace: Backtrace::generate(), + } + .status_code() + ); + + assert_eq!( + StatusCode::Unexpected, + Error::InvalidKey { key: None }.status_code() + ); + + assert_eq!( + StatusCode::Unexpected, + Error::OpenSystemCatalog { + source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable)) + } + .status_code() + ); + + assert_eq!( + StatusCode::Unexpected, + Error::CreateSystemCatalog { + source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable)) + } + .status_code() + ); + + assert_eq!( + StatusCode::StorageUnavailable, + Error::SystemCatalog { + msg: "".to_string(), + backtrace: Backtrace::generate(), + } + .status_code() + ); + + assert_eq!( + StatusCode::StorageUnavailable, + Error::SystemCatalogTypeMismatch { + data_type: DataType::Boolean, + source: datatypes::error::Error::UnsupportedArrowType { + arrow_type: DataType::Boolean, + backtrace: Backtrace::generate() + } + } + .status_code() + ); + assert_eq!( + StatusCode::StorageUnavailable, + Error::EmptyValue.status_code() + ); + } + + #[test] + pub fn test_errors_to_datafusion_error() { + let e: DataFusionError = Error::TableExists { + table: "test_table".to_string(), + backtrace: Backtrace::generate(), + } + .into(); + match e { + DataFusionError::Internal(_) => {} + _ => { + panic!("catalog error should be converted to DataFusionError::Internal") + } + } } } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index b15a6a4c40..39581c4f5e 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -1,8 +1,63 @@ -mod catalog; -pub mod error; -pub mod memory; -mod schema; +#![feature(assert_matches)] -pub use crate::catalog::{CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef}; -pub use crate::catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use std::any::Any; +use std::sync::Arc; + +pub use crate::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +pub use crate::manager::LocalCatalogManager; pub use crate::schema::{SchemaProvider, SchemaProviderRef}; +mod consts; +pub mod error; +mod manager; +pub mod memory; +pub mod schema; +mod system; +mod tables; + +/// Represent a list of named catalogs +pub trait CatalogList: Sync + Send { + /// Returns the catalog list as [`Any`](std::any::Any) + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Adds a new catalog to this catalog list + /// If a catalog of the same name existed before, it is replaced in the list and returned. + fn register_catalog( + &self, + name: String, + catalog: CatalogProviderRef, + ) -> Option; + + /// Retrieves the list of available catalog names + fn catalog_names(&self) -> Vec; + + /// Retrieves a specific catalog by name, provided it exists. + fn catalog(&self, name: &str) -> Option; +} + +/// Represents a catalog, comprising a number of named schemas. +pub trait CatalogProvider: Sync + Send { + /// Returns the catalog provider as [`Any`](std::any::Any) + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Retrieves the list of available schema names in this catalog. + fn schema_names(&self) -> Vec; + + /// Registers schema to this catalog. + fn register_schema(&self, name: String, schema: SchemaProviderRef) + -> Option; + + /// Retrieves a specific schema from the catalog by name, provided it exists. + fn schema(&self, name: &str) -> Option; +} + +pub type CatalogListRef = Arc; +pub type CatalogProviderRef = Arc; + +#[async_trait::async_trait] +pub trait CatalogManager: CatalogList { + async fn start(&self) -> error::Result<()>; +} + +pub type CatalogManagerRef = Arc; diff --git a/src/catalog/src/manager.rs b/src/catalog/src/manager.rs new file mode 100644 index 0000000000..6d4718dcf0 --- /dev/null +++ b/src/catalog/src/manager.rs @@ -0,0 +1,233 @@ +use std::any::Any; +use std::sync::Arc; + +use common_recordbatch::RecordBatch; +use common_telemetry::info; +use datatypes::prelude::ScalarVector; +use datatypes::vectors::{BinaryVector, UInt8Vector}; +use futures_util::StreamExt; +use snafu::{ensure, OptionExt, ResultExt}; +use table::engine::{EngineContext, TableEngineRef}; +use table::requests::OpenTableRequest; +use table::table::numbers::NumbersTable; + +use super::error::Result; +use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME}; +use crate::error::{ + CatalogNotFoundSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, SchemaNotFoundSnafu, + SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableNotFoundSnafu, +}; +use crate::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use crate::system::{decode_system_catalog, Entry, SystemCatalogTable, TableEntry}; +use crate::tables::SystemCatalog; +use crate::{ + CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, SchemaProvider, + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, +}; + +/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. +pub struct LocalCatalogManager { + system: Arc, + catalogs: Arc, + engine: TableEngineRef, +} + +impl LocalCatalogManager { + /// Create a new [CatalogManager] with given user catalogs and table engine + pub async fn try_new(engine: TableEngineRef) -> Result { + let table = SystemCatalogTable::new(engine.clone()).await?; + let memory_catalog_list = crate::memory::new_memory_catalog_list()?; + let system_catalog = Arc::new(SystemCatalog::new( + table, + memory_catalog_list.clone(), + engine.clone(), + )); + Ok(Self { + system: system_catalog, + catalogs: memory_catalog_list, + engine, + }) + } + + /// Scan all entries from system catalog table + pub async fn init(&self) -> Result<()> { + self.init_system_catalog()?; + let mut system_records = self.system.information_schema.system.records().await?; + while let Some(records) = system_records + .next() + .await + .transpose() + .context(ReadSystemCatalogSnafu)? + { + self.handle_system_catalog_entries(records).await?; + } + Ok(()) + } + + fn init_system_catalog(&self) -> Result<()> { + let system_schema = Arc::new(MemorySchemaProvider::new()); + system_schema.register_table( + SYSTEM_CATALOG_TABLE_NAME.to_string(), + self.system.information_schema.system.clone(), + )?; + let system_catalog = Arc::new(MemoryCatalogProvider::new()); + system_catalog.register_schema(INFORMATION_SCHEMA_NAME.to_string(), system_schema); + self.catalogs + .register_catalog(SYSTEM_CATALOG_NAME.to_string(), system_catalog); + + let default_catalog = Arc::new(MemoryCatalogProvider::new()); + let default_schema = Arc::new(MemorySchemaProvider::new()); + + // Add numbers table for test + // TODO(hl): remove this registration + let table = Arc::new(NumbersTable::default()); + default_schema.register_table("numbers".to_string(), table)?; + + default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema); + self.catalogs + .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog); + Ok(()) + } + + async fn handle_system_catalog_entries(&self, records: RecordBatch) -> Result<()> { + ensure!( + records.df_recordbatch.columns().len() >= 4, + SystemCatalogSnafu { + msg: format!( + "Length mismatch: {}", + records.df_recordbatch.columns().len() + ) + } + ); + + let entry_type = UInt8Vector::try_from_arrow_array(&records.df_recordbatch.columns()[0]) + .with_context(|_| SystemCatalogTypeMismatchSnafu { + data_type: records.df_recordbatch.columns()[0].data_type().clone(), + })?; + + let key = BinaryVector::try_from_arrow_array(&records.df_recordbatch.columns()[1]) + .with_context(|_| SystemCatalogTypeMismatchSnafu { + data_type: records.df_recordbatch.columns()[1].data_type().clone(), + })?; + + let value = BinaryVector::try_from_arrow_array(&records.df_recordbatch.columns()[1]) + .with_context(|_| SystemCatalogTypeMismatchSnafu { + data_type: records.df_recordbatch.columns()[3].data_type().clone(), + })?; + + for ((t, k), v) in entry_type + .iter_data() + .zip(key.iter_data()) + .zip(value.iter_data()) + { + let entry = decode_system_catalog(t, k, v)?; + match entry { + Entry::Catalog(c) => { + self.catalogs.register_catalog_if_absent( + c.catalog_name.clone(), + Arc::new(MemoryCatalogProvider::new()), + ); + info!("Register catalog: {}", c.catalog_name); + } + Entry::Schema(s) => { + let catalog = + self.catalogs + .catalog(&s.catalog_name) + .context(CatalogNotFoundSnafu { + catalog_name: &s.catalog_name, + })?; + catalog.register_schema( + s.schema_name.clone(), + Arc::new(MemorySchemaProvider::new()), + ); + info!("Registered schema: {:?}", s); + } + Entry::Table(t) => { + self.open_and_register_table(&t).await?; + info!("Registered table: {:?}", t) + } + } + } + + Ok(()) + } + + async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> { + let catalog = self + .catalogs + .catalog(&t.catalog_name) + .context(CatalogNotFoundSnafu { + catalog_name: &t.catalog_name, + })?; + let schema = catalog + .schema(&t.schema_name) + .context(SchemaNotFoundSnafu { + schema_info: format!("{}.{}", &t.catalog_name, &t.schema_name), + })?; + + let context = EngineContext {}; + let request = OpenTableRequest { + catalog_name: t.catalog_name.clone(), + schema_name: t.schema_name.clone(), + table_name: t.table_name.clone(), + table_id: t.table_id, + }; + + let option = self + .engine + .open_table(&context, request) + .await + .with_context(|_| OpenTableSnafu { + table_info: format!( + "{}.{}.{}, id: {}", + &t.catalog_name, &t.schema_name, &t.table_name, t.table_id + ), + })? + .with_context(|| TableNotFoundSnafu { + table_info: format!( + "{}.{}.{}, id: {}", + &t.catalog_name, &t.schema_name, &t.table_name, t.table_id + ), + })?; + + schema.register_table(t.table_name.clone(), option)?; + Ok(()) + } +} + +impl CatalogList for LocalCatalogManager { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: CatalogProviderRef, + ) -> Option> { + self.catalogs.register_catalog(name, catalog) + } + + fn catalog_names(&self) -> Vec { + let mut res = self.catalogs.catalog_names(); + res.push(SYSTEM_CATALOG_NAME.to_string()); + res + } + + fn catalog(&self, name: &str) -> Option> { + if name.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) { + Some(self.system.clone()) + } else { + self.catalogs.catalog(name) + } + } +} + +#[async_trait::async_trait] +impl CatalogManager for LocalCatalogManager { + /// Start [MemoryCatalogManager] to load all information from system catalog table. + /// Make sure table engine is initialized before starting [MemoryCatalogManager]. + async fn start(&self) -> Result<()> { + self.init().await + } +} diff --git a/src/catalog/src/memory.rs b/src/catalog/src/memory.rs index fea19ca920..38bad99edd 100644 --- a/src/catalog/src/memory.rs +++ b/src/catalog/src/memory.rs @@ -1,47 +1,14 @@ use std::any::Any; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; use std::sync::RwLock; -use common_error::prelude::*; -use table::table::numbers::NumbersTable; use table::TableRef; -use crate::catalog::{ - CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, -}; -use crate::error::{Error, Result}; +use crate::error::{Result, TableExistsSnafu}; use crate::schema::SchemaProvider; - -/// Error implementation of memory catalog. -#[derive(Debug, Snafu)] -pub enum InnerError { - #[snafu(display("Table {} already exists", table))] - TableExists { table: String, backtrace: Backtrace }, -} - -impl ErrorExt for InnerError { - fn status_code(&self) -> StatusCode { - match self { - InnerError::TableExists { .. } => StatusCode::TableAlreadyExists, - } - } - - fn backtrace_opt(&self) -> Option<&Backtrace> { - ErrorCompat::backtrace(self) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -impl From for Error { - fn from(err: InnerError) -> Self { - Self::new(err) - } -} +use crate::{CatalogList, CatalogProvider, CatalogProviderRef, SchemaProviderRef}; /// Simple in-memory list of catalogs #[derive(Default)] @@ -50,6 +17,25 @@ pub struct MemoryCatalogList { pub catalogs: RwLock>, } +impl MemoryCatalogList { + /// Registers a catalog and return `None` if no catalog with the same name was already + /// registered, or `Some` with the previously registered catalog. + pub fn register_catalog_if_absent( + &self, + name: String, + catalog: Arc, + ) -> Option { + let mut catalogs = self.catalogs.write().unwrap(); + match catalogs.entry(name) { + Entry::Occupied(v) => Some(v.get().clone()), + Entry::Vacant(v) => { + v.insert(catalog); + None + } + } + } +} + impl CatalogList for MemoryCatalogList { fn as_any(&self) -> &dyn Any { self @@ -93,15 +79,6 @@ impl MemoryCatalogProvider { schemas: RwLock::new(HashMap::new()), } } - - pub fn register_schema( - &self, - name: impl Into, - schema: Arc, - ) -> Option> { - let mut schemas = self.schemas.write().unwrap(); - schemas.insert(name.into(), schema) - } } impl CatalogProvider for MemoryCatalogProvider { @@ -114,6 +91,15 @@ impl CatalogProvider for MemoryCatalogProvider { schemas.keys().cloned().collect() } + fn register_schema( + &self, + name: String, + schema: SchemaProviderRef, + ) -> Option { + let mut schemas = self.schemas.write().unwrap(); + schemas.insert(name, schema) + } + fn schema(&self, name: &str) -> Option> { let schemas = self.schemas.read().unwrap(); schemas.get(name).cloned() @@ -175,36 +161,39 @@ impl SchemaProvider for MemorySchemaProvider { } /// Create a memory catalog list contains a numbers table for test -pub fn new_memory_catalog_list() -> Result { - let schema_provider = Arc::new(MemorySchemaProvider::new()); - let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - let catalog_list = Arc::new(MemoryCatalogList::default()); - - // Add numbers table for test - let table = Arc::new(NumbersTable::default()); - schema_provider.register_table("numbers".to_string(), table)?; - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME, schema_provider); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); - - Ok(catalog_list) +pub fn new_memory_catalog_list() -> Result> { + Ok(Arc::new(MemoryCatalogList::default())) } #[cfg(test)] mod tests { + use common_error::ext::ErrorExt; + use common_error::prelude::StatusCode; + use table::table::numbers::NumbersTable; use super::*; + use crate::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; #[test] fn test_new_memory_catalog_list() { let catalog_list = new_memory_catalog_list().unwrap(); - let catalog_provider = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap(); - let schema_provider = catalog_provider.schema(DEFAULT_SCHEMA_NAME).unwrap(); + assert!(catalog_list.catalog(DEFAULT_CATALOG_NAME).is_none()); + let default_catalog = Arc::new(MemoryCatalogProvider::default()); + catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog.clone()); - let table = schema_provider.table("numbers"); + assert!(default_catalog.schema(DEFAULT_SCHEMA_NAME).is_none()); + let default_schema = Arc::new(MemorySchemaProvider::default()); + default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone()); + + default_schema + .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) + .unwrap(); + + let table = default_schema.table("numbers"); assert!(table.is_some()); - assert!(schema_provider.table("not_exists").is_none()); + assert!(default_schema.table("not_exists").is_none()); } #[tokio::test] @@ -226,4 +215,21 @@ mod tests { assert!(err.backtrace_opt().is_some()); assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); } + + #[test] + pub fn test_register_if_absent() { + let list = MemoryCatalogList::default(); + assert!(list + .register_catalog_if_absent( + "test_catalog".to_string(), + Arc::new(MemoryCatalogProvider::new()) + ) + .is_none()); + list.register_catalog_if_absent( + "test_catalog".to_string(), + Arc::new(MemoryCatalogProvider::new()), + ) + .unwrap(); + list.as_any().downcast_ref::().unwrap(); + } } diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs new file mode 100644 index 0000000000..6df6cc6d2a --- /dev/null +++ b/src/catalog/src/system.rs @@ -0,0 +1,303 @@ +use std::any::Any; +use std::sync::Arc; + +use common_query::logical_plan::Expr; +use common_recordbatch::SendableRecordBatchStream; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; +use table::engine::{EngineContext, TableEngineRef}; +use table::requests::{CreateTableRequest, OpenTableRequest}; +use table::{Table, TableRef}; + +use crate::consts::{ + INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, + SYSTEM_CATALOG_TABLE_NAME, +}; +use crate::error::{ + CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu, + OpenSystemCatalogSnafu, Result, SystemCatalogSchemaSnafu, ValueDeserializeSnafu, +}; + +pub struct SystemCatalogTable { + schema: SchemaRef, + pub table: TableRef, +} + +#[async_trait::async_trait] +impl Table for SystemCatalogTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + async fn scan( + &self, + _projection: &Option>, + _filters: &[Expr], + _limit: Option, + ) -> table::Result { + panic!("System catalog table does not support scan!") + } +} + +impl SystemCatalogTable { + pub async fn new(engine: TableEngineRef) -> Result { + let request = OpenTableRequest { + catalog_name: SYSTEM_CATALOG_NAME.to_string(), + schema_name: INFORMATION_SCHEMA_NAME.to_string(), + table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), + table_id: SYSTEM_CATALOG_TABLE_ID, + }; + let schema = Arc::new(build_system_catalog_schema()?); + let ctx = EngineContext::default(); + + if let Some(table) = engine + .open_table(&ctx, request) + .await + .context(OpenSystemCatalogSnafu)? + { + Ok(Self { table, schema }) + } else { + // system catalog table is not yet created, try to create + let request = CreateTableRequest { + name: SYSTEM_CATALOG_TABLE_NAME.to_string(), + desc: Some("System catalog table".to_string()), + schema: schema.clone(), + primary_key_indices: vec![0, 1, 2], + create_if_not_exists: true, + }; + + let table = engine + .create_table(&ctx, request) + .await + .context(CreateSystemCatalogSnafu)?; + Ok(Self { table, schema }) + } + } + + /// Create a stream of all entries inside system catalog table + pub async fn records(&self) -> Result { + let full_projection = None; + let stream = self.table.scan(&full_projection, &[], None).await.unwrap(); + Ok(stream) + } +} + +/// Build system catalog table schema. +/// A system catalog table consists of 4 columns, namely +/// - entry_type: type of entry in current row, can be any variant of [EntryType]. +/// - key: a binary encoded key of entry, differs according to different entry type. +/// - timestamp: currently not used. +/// - value: JSON-encoded value of entry's metadata. +fn build_system_catalog_schema() -> Result { + let mut cols = Vec::with_capacity(6); + cols.push(ColumnSchema::new( + "entry_type".to_string(), + ConcreteDataType::uint8_datatype(), + false, + )); + cols.push(ColumnSchema::new( + "key".to_string(), + ConcreteDataType::binary_datatype(), + false, + )); + cols.push(ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::int64_datatype(), + false, + )); + cols.push(ColumnSchema::new( + "value".to_string(), + ConcreteDataType::binary_datatype(), + false, + )); + Schema::with_timestamp_index(cols, 2).context(SystemCatalogSchemaSnafu) +} + +pub fn decode_system_catalog( + entry_type: Option, + key: Option<&[u8]>, + value: Option<&[u8]>, +) -> Result { + let entry_type = entry_type.context(InvalidKeySnafu { key: None })?; + let key = String::from_utf8_lossy(key.context(InvalidKeySnafu { key: None })?); + + match EntryType::try_from(entry_type)? { + EntryType::Catalog => { + // As for catalog entry, the key is a string with format: `` + // and the value is current not used. + let catalog_name = key.to_string(); + Ok(Entry::Catalog(CatalogEntry { catalog_name })) + } + EntryType::Schema => { + // As for schema entry, the key is a string with format: `.` + // and the value is current not used. + let schema_parts = key.split('.').collect::>(); + ensure!( + schema_parts.len() == 2, + InvalidKeySnafu { + key: Some(key.to_string()) + } + ); + Ok(Entry::Schema(SchemaEntry { + catalog_name: schema_parts[0].to_string(), + schema_name: schema_parts[1].to_string(), + })) + } + + EntryType::Table => { + // As for table entry, the key is a string with format: `..` + // and the value is a JSON string with format: `{"table_id": }` + let table_parts = key.split('.').collect::>(); + ensure!( + table_parts.len() >= 3, + InvalidKeySnafu { + key: Some(key.to_string()) + } + ); + let value = value.context(EmptyValueSnafu)?; + let table_meta: TableEntryValue = + serde_json::from_slice(value).context(ValueDeserializeSnafu)?; + Ok(Entry::Table(TableEntry { + catalog_name: table_parts[0].to_string(), + schema_name: table_parts[1].to_string(), + table_name: table_parts[2].to_string(), + table_id: table_meta.table_id, + })) + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum EntryType { + Catalog = 1, + Schema = 2, + Table = 3, +} + +impl TryFrom for EntryType { + type Error = Error; + + fn try_from(value: u8) -> std::result::Result { + match value { + b if b == Self::Catalog as u8 => Ok(Self::Catalog), + b if b == Self::Schema as u8 => Ok(Self::Schema), + b if b == Self::Table as u8 => Ok(Self::Table), + b => InvalidEntryTypeSnafu { + entry_type: Some(b), + } + .fail(), + } + } +} + +#[derive(Debug, PartialEq)] +pub enum Entry { + Catalog(CatalogEntry), + Schema(SchemaEntry), + Table(TableEntry), +} + +#[derive(Debug, PartialEq)] +pub struct CatalogEntry { + pub catalog_name: String, +} + +#[derive(Debug, PartialEq)] +pub struct SchemaEntry { + pub catalog_name: String, + pub schema_name: String, +} + +#[derive(Debug, PartialEq)] +pub struct TableEntry { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub table_id: u64, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct TableEntryValue { + pub table_id: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_decode_catalog_enrty() { + let entry = decode_system_catalog( + Some(EntryType::Catalog as u8), + Some("some_catalog".as_bytes()), + None, + ) + .unwrap(); + if let Entry::Catalog(e) = entry { + assert_eq!("some_catalog", e.catalog_name); + } else { + panic!("Unexpected type: {:?}", entry); + } + } + + #[test] + pub fn test_decode_schema_entry() { + let entry = decode_system_catalog( + Some(EntryType::Schema as u8), + Some("some_catalog.some_schema".as_bytes()), + None, + ) + .unwrap(); + + if let Entry::Schema(e) = entry { + assert_eq!("some_catalog", e.catalog_name); + assert_eq!("some_schema", e.schema_name); + } else { + panic!("Unexpected type: {:?}", entry); + } + } + + #[test] + pub fn test_decode_table() { + let entry = decode_system_catalog( + Some(EntryType::Table as u8), + Some("some_catalog.some_schema.some_table".as_bytes()), + Some("{\"table_id\":42}".as_bytes()), + ) + .unwrap(); + + if let Entry::Table(e) = entry { + assert_eq!("some_catalog", e.catalog_name); + assert_eq!("some_schema", e.schema_name); + assert_eq!("some_table", e.table_name); + assert_eq!(42, e.table_id); + } else { + panic!("Unexpected type: {:?}", entry); + } + } + + #[test] + #[should_panic] + pub fn test_decode_mismatch() { + decode_system_catalog( + Some(EntryType::Table as u8), + Some("some_catalog.some_schema.some_table".as_bytes()), + None, + ) + .unwrap(); + } + + #[test] + pub fn test_entry_type() { + assert_eq!(EntryType::Catalog, EntryType::try_from(1).unwrap()); + assert_eq!(EntryType::Schema, EntryType::try_from(2).unwrap()); + assert_eq!(EntryType::Table, EntryType::try_from(3).unwrap()); + assert!(EntryType::try_from(4).is_err()); + } +} diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs new file mode 100644 index 0000000000..c83f6e0b35 --- /dev/null +++ b/src/catalog/src/tables.rs @@ -0,0 +1,251 @@ +// The `tables` table in system catalog keeps a record of all tables created by user. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use async_stream::stream; +use common_query::logical_plan::Expr; +use common_recordbatch::error::Result as RecordBatchResult; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use datatypes::prelude::{ConcreteDataType, VectorBuilder}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; +use datatypes::vectors::VectorRef; +use futures::Stream; +use table::engine::TableEngineRef; +use table::{Table, TableRef}; + +use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME}; +use crate::system::SystemCatalogTable; +use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef}; + +/// Tables holds all tables created by user. +pub struct Tables { + schema: SchemaRef, + catalogs: CatalogListRef, + engine: TableEngineRef, +} + +impl Tables { + pub fn new(catalogs: CatalogListRef, engine: TableEngineRef) -> Self { + Self { + schema: Arc::new(build_schema_for_tables()), + catalogs, + engine, + } + } +} + +#[async_trait::async_trait] +impl Table for Tables { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + async fn scan( + &self, + _projection: &Option>, + _filters: &[Expr], + _limit: Option, + ) -> table::error::Result { + let catalogs = self.catalogs.clone(); + let schema_ref = self.schema.clone(); + let engine_name = self.engine.name().to_string(); + + let stream = stream!({ + for catalog_name in catalogs.catalog_names() { + let catalog = catalogs.catalog(&catalog_name).unwrap(); + for schema_name in catalog.schema_names() { + let mut tables_in_schema = Vec::with_capacity(catalog.schema_names().len()); + let schema = catalog.schema(&schema_name).unwrap(); + for table_name in schema.table_names() { + tables_in_schema.push(table_name); + } + + let vec = tables_to_record_batch( + &catalog_name, + &schema_name, + tables_in_schema, + &engine_name, + ); + let record_batch_res = RecordBatch::new(schema_ref.clone(), vec); + yield record_batch_res; + } + } + }); + + Ok(Box::pin(TablesRecordBatchStream { + schema: self.schema.clone(), + stream: Box::pin(stream), + })) + } +} + +/// Convert tables info to `RecordBatch`. +fn tables_to_record_batch( + catalog_name: &str, + schema_name: &str, + table_names: Vec, + engine: &str, +) -> Vec { + let mut catalog_vec = + VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len()); + let mut schema_vec = + VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len()); + let mut table_name_vec = + VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len()); + let mut engine_vec = + VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len()); + + for table_name in table_names { + catalog_vec.push(&Value::String(catalog_name.into())); + schema_vec.push(&Value::String(schema_name.into())); + table_name_vec.push(&Value::String(table_name.into())); + engine_vec.push(&Value::String(engine.into())); + } + + vec![ + catalog_vec.finish(), + schema_vec.finish(), + table_name_vec.finish(), + engine_vec.finish(), + ] +} + +pub struct TablesRecordBatchStream { + schema: SchemaRef, + stream: Pin> + Send>>, +} + +impl Stream for TablesRecordBatchStream { + type Item = RecordBatchResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +impl RecordBatchStream for TablesRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +pub struct InformationSchema { + pub tables: Arc, + pub system: Arc, +} + +impl SchemaProvider for InformationSchema { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + vec!["tables".to_string(), SYSTEM_CATALOG_TABLE_NAME.to_string()] + } + + fn table(&self, name: &str) -> Option { + if name.eq_ignore_ascii_case("tables") { + Some(self.tables.clone()) + } else if name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) { + Some(self.system.clone()) + } else { + None + } + } + + fn register_table( + &self, + _name: String, + _table: TableRef, + ) -> crate::error::Result> { + panic!("System catalog & schema does not support register table") + } + + fn deregister_table(&self, _name: &str) -> crate::error::Result> { + panic!("System catalog & schema does not support deregister table") + } + + fn table_exist(&self, name: &str) -> bool { + name.eq_ignore_ascii_case("tables") || name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) + } +} + +pub struct SystemCatalog { + pub information_schema: Arc, +} + +impl SystemCatalog { + pub fn new( + system: SystemCatalogTable, + catalogs: CatalogListRef, + engine: TableEngineRef, + ) -> Self { + let schema = InformationSchema { + tables: Arc::new(Tables::new(catalogs, engine)), + system: Arc::new(system), + }; + Self { + information_schema: Arc::new(schema), + } + } +} + +impl CatalogProvider for SystemCatalog { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + vec![INFORMATION_SCHEMA_NAME.to_string()] + } + + fn register_schema( + &self, + _name: String, + _schema: SchemaProviderRef, + ) -> Option { + panic!("System catalog does not support registering schema!") + } + + fn schema(&self, name: &str) -> Option> { + if name.eq_ignore_ascii_case(INFORMATION_SCHEMA_NAME) { + Some(self.information_schema.clone()) + } else { + None + } + } +} + +fn build_schema_for_tables() -> Schema { + let cols = vec![ + ColumnSchema::new( + "catalog".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + "schema".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + "table_name".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + "engine".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ]; + Schema::new(cols) +} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 1b790ecbf9..d4762091a9 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -1,9 +1,6 @@ use std::sync::Arc; -use catalog::CatalogListRef; -use snafu::ResultExt; - -use crate::error::{NewCatalogSnafu, Result}; +use crate::error::Result; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; @@ -28,19 +25,16 @@ impl Default for DatanodeOptions { pub struct Datanode { opts: DatanodeOptions, services: Services, - _catalog_list: CatalogListRef, instance: InstanceRef, } impl Datanode { pub async fn new(opts: DatanodeOptions) -> Result { - let catalog_list = catalog::memory::new_memory_catalog_list().context(NewCatalogSnafu)?; - let instance = Arc::new(Instance::new(&opts, catalog_list.clone()).await?); + let instance = Arc::new(Instance::new(&opts).await?); Ok(Self { opts, services: Services::new(instance.clone()), - _catalog_list: catalog_list, instance, }) } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 24c6f76280..de5c9da3e4 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -166,9 +166,9 @@ mod tests { } fn throw_catalog_error() -> std::result::Result<(), catalog::error::Error> { - Err(catalog::error::Error::new(MockError::with_backtrace( - StatusCode::Internal, - ))) + Err(catalog::error::Error::RegisterTable { + source: BoxedError::new(MockError::with_backtrace(StatusCode::Internal)), + }) } fn assert_internal_error(err: &Error) { diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 50ff37e69f..33a53e969a 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,7 +1,7 @@ use std::{fs, path, sync::Arc}; use api::v1::InsertExpr; -use catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_telemetry::logging::info; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; @@ -17,7 +17,8 @@ use table_engine::engine::MitoEngine; use crate::datanode::DatanodeOptions; use crate::error::{ - self, CreateTableSnafu, ExecuteSqlSnafu, InsertSnafu, Result, TableNotFoundSnafu, + self, CreateTableSnafu, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, + TableNotFoundSnafu, }; use crate::server::grpc::insert::insertion_expr_to_request; use crate::sql::SqlHandler; @@ -31,33 +32,38 @@ pub struct Instance { table_engine: DefaultEngine, sql_handler: SqlHandler, // Catalog list - catalog_list: CatalogListRef, + catalog_manager: CatalogManagerRef, } pub type InstanceRef = Arc; impl Instance { - pub async fn new(opts: &DatanodeOptions, catalog_list: CatalogListRef) -> Result { + pub async fn new(opts: &DatanodeOptions) -> Result { let log_store = create_local_file_log_store(opts).await?; - let factory = QueryEngineFactory::new(catalog_list.clone()); - let query_engine = factory.query_engine().clone(); let table_engine = DefaultEngine::new( EngineImpl::new(EngineConfig::default(), Arc::new(log_store)) .await .context(error::OpenStorageEngineSnafu)?, ); + let catalog_manager = Arc::new( + catalog::LocalCatalogManager::try_new(Arc::new(table_engine.clone())) + .await + .context(NewCatalogSnafu)?, + ); + let factory = QueryEngineFactory::new(catalog_manager.clone()); + let query_engine = factory.query_engine().clone(); Ok(Self { query_engine, sql_handler: SqlHandler::new(table_engine.clone()), table_engine, - catalog_list, + catalog_manager, }) } pub async fn execute_grpc_insert(&self, insert_expr: InsertExpr) -> Result { let schema_provider = self - .catalog_list + .catalog_manager .catalog(DEFAULT_CATALOG_NAME) .unwrap() .schema(DEFAULT_SCHEMA_NAME) @@ -98,7 +104,7 @@ impl Instance { } Statement::Insert(_) => { let schema_provider = self - .catalog_list + .catalog_manager .catalog(DEFAULT_CATALOG_NAME) .unwrap() .schema(DEFAULT_SCHEMA_NAME) @@ -114,6 +120,10 @@ impl Instance { } pub async fn start(&self) -> Result<()> { + self.catalog_manager + .start() + .await + .context(NewCatalogSnafu)?; // FIXME(dennis): create a demo table for test let column_schemas = vec![ ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), @@ -142,7 +152,7 @@ impl Instance { .context(CreateTableSnafu { table_name })?; let schema_provider = self - .catalog_list + .catalog_manager .catalog(DEFAULT_CATALOG_NAME) .unwrap() .schema(DEFAULT_SCHEMA_NAME) @@ -185,9 +195,9 @@ mod tests { #[tokio::test] async fn test_execute_insert() { - let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); + common_telemetry::init_default_ut_logging(); let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Instance::new(&opts, catalog_list).await.unwrap(); + let instance = Instance::new(&opts).await.unwrap(); instance.start().await.unwrap(); let output = instance @@ -205,9 +215,9 @@ mod tests { #[tokio::test] async fn test_execute_query() { - let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Instance::new(&opts, catalog_list).await.unwrap(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); let output = instance .execute_sql("select sum(number) from numbers limit 20") diff --git a/src/datanode/src/server/http/handler.rs b/src/datanode/src/server/http/handler.rs index 3de9f5014e..0c0d17f9ee 100644 --- a/src/datanode/src/server/http/handler.rs +++ b/src/datanode/src/server/http/handler.rs @@ -59,9 +59,9 @@ mod tests { } async fn create_extension() -> Extension { - let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap()); + let instance = Arc::new(Instance::new(&opts).await.unwrap()); + instance.start().await.unwrap(); Extension(instance) } @@ -82,6 +82,7 @@ mod tests { #[tokio::test] async fn test_sql_output_rows() { + common_telemetry::init_default_ut_logging(); let query = create_params(); let extension = create_extension().await; @@ -89,7 +90,7 @@ mod tests { match json { HttpResponse::Json(json) => { - assert!(json.success); + assert!(json.success, "{:?}", json); assert!(json.error.is_none()); assert!(json.output.is_some()); diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index f74827a36d..eabc4c9adb 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -11,9 +11,8 @@ use crate::server::http::HttpServer; use crate::test_util; async fn make_test_app() -> Router { - let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap()); + let instance = Arc::new(Instance::new(&opts).await.unwrap()); instance.start().await.unwrap(); let http_server = HttpServer::new(instance); http_server.make_app() diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 797dfccc26..127abab0fb 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -23,6 +23,8 @@ datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" metrics = "0.18" +serde = "1.0" +serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } sql = { path = "../sql" } table = { path = "../table" } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 5575e41cff..dda290af72 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -208,15 +208,31 @@ impl QueryExecutor for DatafusionQueryEngine { #[cfg(test)] mod tests { + use std::sync::Arc; + use arrow::array::UInt64Array; + use catalog::memory::{MemoryCatalogProvider, MemorySchemaProvider}; + use catalog::{ + CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, + }; use common_recordbatch::util; use datafusion::field_util::FieldExt; use datafusion::field_util::SchemaExt; + use table::table::numbers::NumbersTable; use crate::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; fn create_test_engine() -> QueryEngineRef { let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); + + let default_schema = Arc::new(MemorySchemaProvider::new()); + default_schema + .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) + .unwrap(); + let default_catalog = Arc::new(MemoryCatalogProvider::new()); + default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema); + catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog); + let factory = QueryEngineFactory::new(catalog_list); factory.query_engine().clone() } diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index aff837b8d7..fe6b1127a0 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -3,7 +3,9 @@ use std::any::Any; use std::sync::Arc; -use catalog::{CatalogListRef, CatalogProvider, SchemaProvider}; +use catalog::{ + CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef, +}; use datafusion::catalog::{ catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider}, schema::SchemaProvider as DfSchemaProvider, @@ -44,7 +46,7 @@ impl DfCatalogList for DfCatalogListAdapter { catalog: Arc, ) -> Option> { let catalog_adapter = Arc::new(CatalogProviderAdapter { - df_cataglog_provider: catalog, + df_catalog_provider: catalog, runtime: self.runtime.clone(), }); self.catalog_list @@ -73,7 +75,7 @@ impl DfCatalogList for DfCatalogListAdapter { /// Datafusion's CatalogProvider -> greptime CatalogProvider struct CatalogProviderAdapter { - df_cataglog_provider: Arc, + df_catalog_provider: Arc, runtime: Arc, } @@ -83,11 +85,19 @@ impl CatalogProvider for CatalogProviderAdapter { } fn schema_names(&self) -> Vec { - self.df_cataglog_provider.schema_names() + self.df_catalog_provider.schema_names() + } + + fn register_schema( + &self, + _name: String, + _schema: SchemaProviderRef, + ) -> Option { + todo!("register_schema is not supported in Datafusion catalog provider") } fn schema(&self, name: &str) -> Option> { - self.df_cataglog_provider + self.df_catalog_provider .schema(name) .map(|df_schema_provider| { Arc::new(SchemaProviderAdapter { @@ -100,7 +110,7 @@ impl CatalogProvider for CatalogProviderAdapter { ///Greptime CatalogProvider -> datafusion's CatalogProvider struct DfCatalogProviderAdapter { - catalog_provider: Arc, + catalog_provider: CatalogProviderRef, runtime: Arc, } @@ -234,3 +244,65 @@ impl SchemaProvider for SchemaProviderAdapter { self.df_schema_provider.table_exist(name) } } + +#[cfg(test)] +mod tests { + use catalog::memory::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; + use table::table::numbers::NumbersTable; + + use super::*; + + #[test] + #[should_panic] + pub fn test_register_schema() { + let adapter = CatalogProviderAdapter { + df_catalog_provider: Arc::new( + datafusion::catalog::catalog::MemoryCatalogProvider::new(), + ), + runtime: Arc::new(RuntimeEnv::default()), + }; + + adapter.register_schema( + "whatever".to_string(), + Arc::new(MemorySchemaProvider::new()), + ); + } + + #[test] + pub fn test_register_table() { + let adapter = DfSchemaProviderAdapter { + runtime: Arc::new(RuntimeEnv::default()), + schema_provider: Arc::new(MemorySchemaProvider::new()), + }; + + adapter + .register_table( + "test_table".to_string(), + Arc::new(DfTableProviderAdapter::new(Arc::new( + NumbersTable::default(), + ))), + ) + .unwrap(); + adapter.table("test_table").unwrap(); + } + + #[test] + pub fn test_register_catalog() { + let rt = Arc::new(RuntimeEnv::default()); + let catalog_list = DfCatalogListAdapter { + runtime: rt.clone(), + catalog_list: new_memory_catalog_list().unwrap(), + }; + assert!(catalog_list + .register_catalog( + "test_catalog".to_string(), + Arc::new(DfCatalogProviderAdapter { + catalog_provider: Arc::new(MemoryCatalogProvider::new()), + runtime: rt, + }), + ) + .is_none()); + + catalog_list.catalog("test_catalog").unwrap(); + } +} diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 5ac3aa3684..846ec46675 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -73,7 +73,9 @@ impl ErrorExt for InnerError { impl From for catalog::error::Error { fn from(e: InnerError) -> Self { - catalog::error::Error::new(e) + catalog::error::Error::RegisterTable { + source: BoxedError::new(e), + } } } @@ -85,6 +87,8 @@ impl From for Error { #[cfg(test)] mod tests { + use common_error::mock::MockError; + use super::*; fn throw_df_error() -> Result<(), DataFusionError> { @@ -129,4 +133,15 @@ mod tests { let sql_err = raise_sql_error().err().unwrap(); assert_eq!(sql_err.status_code(), err.status_code()); } + + #[test] + pub fn test_from_inner_error() { + let err = InnerError::TableSchemaMismatch { + source: table::error::Error::new(MockError::new(StatusCode::Unexpected)), + }; + + let catalog_error = catalog::error::Error::from(err); + // [InnerError] to [catalog::error::Error] is considered as Internal error + assert_eq!(StatusCode::Internal, catalog_error.status_code()); + } } diff --git a/src/query/tests/my_sum_udaf_example.rs b/src/query/tests/my_sum_udaf_example.rs index 226d393370..3c530f96b9 100644 --- a/src/query/tests/my_sum_udaf_example.rs +++ b/src/query/tests/my_sum_udaf_example.rs @@ -4,7 +4,9 @@ use std::sync::Arc; use arc_swap::ArcSwapOption; use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{CatalogList, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use catalog::{ + CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, +}; use common_function::scalars::aggregate::AggregateFunctionMeta; use common_query::error::CreateAccumulatorSnafu; use common_query::error::Result as QueryResult; @@ -262,7 +264,7 @@ pub fn new_query_engine_factory(table_name: String, table: TableRef) -> QueryEng let catalog_list = Arc::new(MemoryCatalogList::default()); schema_provider.register_table(table_name, table).unwrap(); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME, schema_provider); + catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); QueryEngineFactory::new(catalog_list) diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index de3fc0bdfe..c12a60c650 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -4,7 +4,9 @@ use std::sync::Arc; use arrow::array::UInt32Array; use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{CatalogList, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use catalog::{ + CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, +}; use common_query::prelude::{create_udf, make_scalar_function, Volatility}; use common_recordbatch::error::Result as RecordResult; use common_recordbatch::{util, RecordBatch}; @@ -23,6 +25,7 @@ use query::query_engine::{Output, QueryEngineFactory}; use query::QueryEngine; use rand::Rng; use table::table::adapter::DfTableProviderAdapter; +use table::table::numbers::NumbersTable; use test_util::MemTable; use crate::pow::pow; @@ -87,6 +90,15 @@ async fn test_datafusion_query_engine() -> Result<()> { async fn test_udf() -> Result<()> { common_telemetry::init_default_ut_logging(); let catalog_list = catalog::memory::new_memory_catalog_list()?; + + let default_schema = Arc::new(MemorySchemaProvider::new()); + default_schema + .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) + .unwrap(); + let default_catalog = Arc::new(MemoryCatalogProvider::new()); + default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema); + catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog); + let factory = QueryEngineFactory::new(catalog_list); let engine = factory.query_engine(); @@ -205,7 +217,7 @@ fn create_query_engine() -> Arc { .register_table("float_numbers".to_string(), float_number_table) .unwrap(); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME, schema_provider); + catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); let factory = QueryEngineFactory::new(catalog_list); diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index bcb0b3106c..3fa42a3ceb 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -144,7 +144,7 @@ impl<'a> ParserContext<'a> { /// Parses the set of valid formats fn parse_table_engine(&mut self) -> Result { if !self.consume_token(ENGINE) { - return Ok(engine::DEFAULT_ENGINE.to_string()); + return Ok(engine::MITO_ENGINE.to_string()); } self.parser diff --git a/src/storage/src/manifest/storage.rs b/src/storage/src/manifest/storage.rs index 38936d3908..4603c11302 100644 --- a/src/storage/src/manifest/storage.rs +++ b/src/storage/src/manifest/storage.rs @@ -131,7 +131,6 @@ impl ManifestLogStorage for ManifestObjectStore { .is_exist() .await .context(ReadObjectSnafu { path: &self.path })?; - if !dir_exists { return Ok(ObjectStoreLogIterator { iter: Box::new(Vec::default().into_iter()), diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 53295a7e45..884a0d84cf 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -23,11 +23,11 @@ use tokio::sync::Mutex; use crate::error::{ self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, - BuildRowKeyDescriptorSnafu, MissingTimestamIndexSnafu, Result, + BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, }; use crate::table::MitoTable; -pub const DEFAULT_ENGINE: &str = "mito"; +pub const MITO_ENGINE: &str = "mito"; const INIT_COLUMN_ID: ColumnId = 0; /// [TableEngine] implementation. @@ -49,6 +49,10 @@ impl MitoEngine { #[async_trait] impl TableEngine for MitoEngine { + fn name(&self) -> &str { + MITO_ENGINE + } + async fn create_table( &self, ctx: &EngineContext, @@ -127,7 +131,7 @@ fn build_row_key_desc_from_schema( request .schema .timestamp_column() - .context(MissingTimestamIndexSnafu { + .context(MissingTimestampIndexSnafu { table_name: &request.name, })?; let timestamp_index = request.schema.timestamp_index().unwrap(); @@ -191,7 +195,7 @@ fn build_column_family_from_request( let ts_index = request .schema .timestamp_index() - .context(MissingTimestamIndexSnafu { + .context(MissingTimestampIndexSnafu { table_name: &request.name, })?; let column_schemas = request @@ -271,7 +275,7 @@ impl MitoEngineInner { // Use region meta schema instead of request schema let table_meta = TableMetaBuilder::default() .schema(region.in_memory_metadata().schema().clone()) - .engine(DEFAULT_ENGINE) + .engine(MITO_ENGINE) .next_column_id(next_column_id) .primary_key_indices(request.primary_key_indices.clone()) .build() @@ -337,7 +341,7 @@ impl MitoEngineInner { //FIXME(boyan): recover table meta from table manifest let table_meta = TableMetaBuilder::default() .schema(region.in_memory_metadata().schema().clone()) - .engine(DEFAULT_ENGINE) + .engine(MITO_ENGINE) .next_column_id(INIT_COLUMN_ID) .primary_key_indices(Vec::default()) .build() @@ -445,6 +449,7 @@ mod tests { let (engine, table) = { let (engine, table_engine, table) = test_util::setup_mock_engine_and_table().await; + assert_eq!(MITO_ENGINE, table_engine.name()); // Now try to open the table again. let reopened = table_engine .open_table(&ctx, open_req.clone()) diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index aac8575729..740c415b04 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -43,7 +43,7 @@ pub enum Error { }, #[snafu(display("Missing timestamp index for table: {}", table_name))] - MissingTimestamIndex { + MissingTimestampIndex { table_name: String, backtrace: Backtrace, }, @@ -118,7 +118,7 @@ impl ErrorExt for Error { | BuildTableMeta { .. } | BuildTableInfo { .. } | BuildRegionDescriptor { .. } - | MissingTimestamIndex { .. } => StatusCode::InvalidArguments, + | MissingTimestampIndex { .. } => StatusCode::InvalidArguments, } } diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 0f12bc08cb..212a31220c 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -7,6 +7,9 @@ use crate::TableRef; /// Table engine abstraction. #[async_trait::async_trait] pub trait TableEngine: Send + Sync { + /// Return engine name + fn name(&self) -> &str; + /// Create a table by given request. /// /// Return the created table.