refactor: system tables in FrontendCatalogManager (#2358)

* rename method names

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove system table, table engine, register/deregister

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add system catalog

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* run nextest

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* some documents

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: fix clippy

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-12 05:45:47 -05:00
parent 46eca5026e
commit 1ad5f6e5d5
20 changed files with 114 additions and 1039 deletions

View File

@@ -21,15 +21,13 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use api::v1::meta::RegionStat;
use common_telemetry::{info, warn};
use common_telemetry::warn;
use futures::future::BoxFuture;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::{TableId, TableType};
use table::requests::CreateTableRequest;
use table::TableRef;
use crate::error::{CreateTableSnafu, Result};
use crate::error::Result;
pub mod error;
pub mod information_schema;
@@ -38,47 +36,11 @@ mod metrics;
pub mod remote;
pub mod system;
pub mod table_source;
pub mod tables;
#[async_trait::async_trait]
pub trait CatalogManager: Send + Sync {
fn as_any(&self) -> &dyn Any;
/// Register a local catalog.
///
/// # Returns
///
/// Whether the catalog is registered.
fn register_local_catalog(&self, name: &str) -> Result<bool>;
/// Register a local schema.
///
/// # Returns
///
/// Whether the schema is registered.
///
/// # Errors
///
/// This method will/should fail if catalog not exist
fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
/// Deregisters a database within given catalog/schema to catalog manager
fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;
/// Registers a local table.
///
/// # Returns
///
/// Whether the table is registered.
///
/// # Errors
///
/// This method will/should fail if catalog or schema not exist
fn register_local_table(&self, request: RegisterTableRequest) -> Result<bool>;
/// Deregisters a table within given catalog/schema to catalog manager
fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()>;
async fn catalog_names(&self) -> Result<Vec<String>>;
async fn schema_names(&self, catalog: &str) -> Result<Vec<String>>;
@@ -164,48 +126,6 @@ pub struct RegisterSchemaRequest {
pub schema: String,
}
pub(crate) async fn handle_system_table_request<'a, M: CatalogManager + ?Sized>(
manager: &'a M,
engine: TableEngineRef,
sys_table_requests: &'a mut Vec<RegisterSystemTableRequest>,
) -> Result<()> {
for req in sys_table_requests.drain(..) {
let catalog_name = &req.create_table_request.catalog_name;
let schema_name = &req.create_table_request.schema_name;
let table_name = &req.create_table_request.table_name;
let table_id = req.create_table_request.id;
let table = manager.table(catalog_name, schema_name, table_name).await?;
let table = if let Some(table) = table {
table
} else {
let table = engine
.create_table(&EngineContext::default(), req.create_table_request.clone())
.await
.with_context(|_| CreateTableSnafu {
table_info: common_catalog::format_full_table_name(
catalog_name,
schema_name,
table_name,
),
})?;
manager.register_local_table(RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name: table_name.clone(),
table_id,
table: table.clone(),
})?;
info!("Created and registered system table: {table_name}");
table
};
if let Some(hook) = req.open_hook {
(hook)(table).await?;
}
}
Ok(())
}
/// The stat of regions in the datanode node.
/// The number of regions can be got from len of vec.
///

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod manager;
pub mod memory;
pub use memory::{new_memory_catalog_manager, MemoryCatalogManager};

View File

@@ -1,349 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE,
NUMBERS_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME,
};
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::info;
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{BinaryVector, UInt8Vector};
use futures_util::lock::Mutex;
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
use table::engine::EngineContext;
use table::requests::OpenTableRequest;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use crate::error::{
CatalogNotFoundSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result, SchemaNotFoundSnafu,
SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu,
TableNotFoundSnafu,
};
use crate::system::{
decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX,
VALUE_INDEX,
};
use crate::tables::SystemCatalog;
use crate::{
handle_system_table_request, CatalogManagerRef, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest,
};
pub struct SystemTableInitializer {
system: Arc<SystemCatalog>,
catalog_manager: CatalogManagerRef,
engine_manager: TableEngineManagerRef,
system_table_requests: Mutex<Vec<RegisterSystemTableRequest>>,
}
impl SystemTableInitializer {
pub async fn try_new(
engine_manager: TableEngineManagerRef,
catalog_manager: CatalogManagerRef,
) -> Result<Self> {
let engine = engine_manager
.engine(MITO_ENGINE)
.context(TableEngineNotFoundSnafu {
engine_name: MITO_ENGINE,
})?;
let table = SystemCatalogTable::new(engine.clone()).await?;
let system_catalog = Arc::new(SystemCatalog::new(table));
Ok(Self {
system: system_catalog,
catalog_manager,
engine_manager,
system_table_requests: Mutex::new(Vec::default()),
})
}
/// Scan all entries from system catalog table
pub async fn init(&self) -> Result<()> {
self.init_system_catalog().await?;
let system_records = self.system.information_schema.system.records().await?;
let entries = self.collect_system_catalog_entries(system_records).await?;
self.handle_system_catalog_entries(entries).await?;
// Processing system table hooks
let mut sys_table_requests = self.system_table_requests.lock().await;
let engine = self
.engine_manager
.engine(MITO_ENGINE)
.context(TableEngineNotFoundSnafu {
engine_name: MITO_ENGINE,
})?;
handle_system_table_request(
self.catalog_manager.as_ref(),
engine,
&mut sys_table_requests,
)
.await?;
Ok(())
}
async fn init_system_catalog(&self) -> Result<()> {
let catalog_manager = &self.catalog_manager;
catalog_manager.register_local_catalog(SYSTEM_CATALOG_NAME)?;
catalog_manager.register_local_schema(RegisterSchemaRequest {
catalog: SYSTEM_CATALOG_NAME.to_string(),
schema: INFORMATION_SCHEMA_NAME.to_string(),
})?;
let register_table_req = RegisterTableRequest {
catalog: SYSTEM_CATALOG_NAME.to_string(),
schema: INFORMATION_SCHEMA_NAME.to_string(),
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
table_id: SYSTEM_CATALOG_TABLE_ID,
table: self.system.information_schema.system.as_table_ref(),
};
catalog_manager.register_local_table(register_table_req)?;
// Add numbers table for test
let register_number_table_req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: NUMBERS_TABLE_NAME.to_string(),
table_id: NUMBERS_TABLE_ID,
table: NumbersTable::table(NUMBERS_TABLE_ID),
};
catalog_manager.register_local_table(register_number_table_req)?;
Ok(())
}
/// Collect stream of system catalog entries to `Vec<Entry>`
async fn collect_system_catalog_entries(
&self,
stream: SendableRecordBatchStream,
) -> Result<Vec<Entry>> {
let record_batch = common_recordbatch::util::collect(stream)
.await
.context(ReadSystemCatalogSnafu)?;
let rbs = record_batch
.into_iter()
.map(Self::record_batch_to_entry)
.collect::<Result<Vec<_>>>()?;
Ok(rbs.into_iter().flat_map(Vec::into_iter).collect::<_>())
}
/// Convert `RecordBatch` to a vector of `Entry`.
fn record_batch_to_entry(rb: RecordBatch) -> Result<Vec<Entry>> {
ensure!(
rb.num_columns() >= 6,
SystemCatalogSnafu {
msg: format!("Length mismatch: {}", rb.num_columns())
}
);
let entry_type = rb
.column(ENTRY_TYPE_INDEX)
.as_any()
.downcast_ref::<UInt8Vector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(ENTRY_TYPE_INDEX).data_type(),
})?;
let key = rb
.column(KEY_INDEX)
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(KEY_INDEX).data_type(),
})?;
let value = rb
.column(VALUE_INDEX)
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(VALUE_INDEX).data_type(),
})?;
let mut res = Vec::with_capacity(rb.num_rows());
for ((t, k), v) in entry_type
.iter_data()
.zip(key.iter_data())
.zip(value.iter_data())
{
let entry = decode_system_catalog(t, k, v)?;
res.push(entry);
}
Ok(res)
}
/// Processes records from system catalog table.
async fn handle_system_catalog_entries(&self, entries: Vec<Entry>) -> Result<()> {
let entries = Self::sort_entries(entries);
for entry in entries {
match entry {
Entry::Catalog(c) => {
self.catalog_manager
.register_local_catalog(&c.catalog_name)?;
info!("Register catalog: {}", c.catalog_name);
}
Entry::Schema(s) => {
let req = RegisterSchemaRequest {
catalog: s.catalog_name.clone(),
schema: s.schema_name.clone(),
};
self.catalog_manager.register_local_schema(req)?;
info!("Registered schema: {:?}", s);
}
Entry::Table(t) => {
if t.is_deleted {
continue;
}
self.open_and_register_table(&t).await?;
info!("Registered table: {:?}", t);
}
}
}
Ok(())
}
/// Sort catalog entries to ensure catalog entries comes first, then schema entries,
/// and table entries is the last.
fn sort_entries(mut entries: Vec<Entry>) -> Vec<Entry> {
entries.sort();
entries
}
async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> {
self.check_catalog_schema_exist(&t.catalog_name, &t.schema_name)
.await?;
let context = EngineContext {};
let open_request = OpenTableRequest {
catalog_name: t.catalog_name.clone(),
schema_name: t.schema_name.clone(),
table_name: t.table_name.clone(),
table_id: t.table_id,
region_numbers: vec![0],
};
let engine = self
.engine_manager
.engine(&t.engine)
.context(TableEngineNotFoundSnafu {
engine_name: &t.engine,
})?;
let table_ref = engine
.open_table(&context, open_request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!(
"{}.{}.{}, id: {}",
&t.catalog_name, &t.schema_name, &t.table_name, t.table_id
),
})?
.with_context(|| TableNotFoundSnafu {
table_info: format!(
"{}.{}.{}, id: {}",
&t.catalog_name, &t.schema_name, &t.table_name, t.table_id
),
})?;
let register_request = RegisterTableRequest {
catalog: t.catalog_name.clone(),
schema: t.schema_name.clone(),
table_name: t.table_name.clone(),
table_id: t.table_id,
table: table_ref,
};
self.catalog_manager
.register_local_table(register_request)?;
Ok(())
}
async fn check_catalog_schema_exist(
&self,
catalog_name: &str,
schema_name: &str,
) -> Result<()> {
if !self.catalog_manager.catalog_exist(catalog_name).await? {
return CatalogNotFoundSnafu { catalog_name }.fail()?;
}
if !self
.catalog_manager
.schema_exist(catalog_name, schema_name)
.await?
{
return SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
}
.fail()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use mito::engine::MITO_ENGINE;
use super::*;
use crate::system::{CatalogEntry, SchemaEntry};
#[test]
fn test_sort_entry() {
let vec = vec![
Entry::Table(TableEntry {
catalog_name: "C1".to_string(),
schema_name: "S1".to_string(),
table_name: "T1".to_string(),
table_id: 1,
engine: MITO_ENGINE.to_string(),
is_deleted: false,
}),
Entry::Catalog(CatalogEntry {
catalog_name: "C2".to_string(),
}),
Entry::Schema(SchemaEntry {
catalog_name: "C1".to_string(),
schema_name: "S1".to_string(),
}),
Entry::Schema(SchemaEntry {
catalog_name: "C2".to_string(),
schema_name: "S2".to_string(),
}),
Entry::Catalog(CatalogEntry {
catalog_name: "".to_string(),
}),
Entry::Table(TableEntry {
catalog_name: "C1".to_string(),
schema_name: "S1".to_string(),
table_name: "T2".to_string(),
table_id: 2,
engine: MITO_ENGINE.to_string(),
is_deleted: false,
}),
];
let res = SystemTableInitializer::sort_entries(vec);
assert_matches!(res[0], Entry::Catalog(..));
assert_matches!(res[1], Entry::Catalog(..));
assert_matches!(res[2], Entry::Schema(..));
assert_matches!(res[3], Entry::Schema(..));
assert_matches!(res[4], Entry::Table(..));
assert_matches!(res[5], Entry::Table(..));
}
}

View File

@@ -24,10 +24,7 @@ use table::TableRef;
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::information_schema::InformationSchemaProvider;
use crate::{
CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
RegisterTableRequest,
};
use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest};
type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
@@ -40,49 +37,6 @@ pub struct MemoryCatalogManager {
#[async_trait::async_trait]
impl CatalogManager for MemoryCatalogManager {
fn register_local_catalog(&self, name: &str) -> Result<bool> {
self.register_catalog(name)
}
fn register_local_table(&self, request: RegisterTableRequest) -> Result<bool> {
self.register_table(request)
}
fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()> {
self.deregister_table_sync(request)
}
fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
self.register_schema_sync(request)
}
fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let schemas = catalogs
.get_mut(&request.catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?;
let table_count = schemas
.remove(&request.schema)
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?
.len();
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
table_count as f64,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
Ok(true)
}
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
self.schema_exist_sync(catalog, schema)
}
@@ -174,7 +128,7 @@ impl MemoryCatalogManager {
});
// Safety: default catalog/schema is registered in order so no CatalogNotFound error will occur
manager.register_catalog(DEFAULT_CATALOG_NAME).unwrap();
manager.register_catalog_sync(DEFAULT_CATALOG_NAME).unwrap();
manager
.register_schema_sync(RegisterSchemaRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
@@ -202,7 +156,7 @@ impl MemoryCatalogManager {
}
/// Registers a catalog if it does not exist and returns false if the schema exists.
pub fn register_catalog(&self, name: &str) -> Result<bool> {
pub fn register_catalog_sync(&self, name: &str) -> Result<bool> {
let name = name.to_string();
let mut catalogs = self.catalogs.write().unwrap();
@@ -264,7 +218,7 @@ impl MemoryCatalogManager {
}
/// Registers a schema and returns an error if the catalog or schema does not exist.
pub fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
.get_mut(&request.catalog)
@@ -309,7 +263,7 @@ impl MemoryCatalogManager {
let schema = &table.table_info().schema_name;
if !manager.catalog_exist_sync(catalog).unwrap() {
manager.register_catalog(catalog).unwrap();
manager.register_catalog_sync(catalog).unwrap();
}
if !manager.schema_exist_sync(catalog, schema).unwrap() {
@@ -328,7 +282,7 @@ impl MemoryCatalogManager {
table_id: table.table_info().ident.table_id,
table,
};
let _ = manager.register_table(request).unwrap();
let _ = manager.register_table_sync(request).unwrap();
manager
}
}
@@ -357,7 +311,7 @@ mod tests {
table: NumbersTable::table(NUMBERS_TABLE_ID),
};
catalog_list.register_local_table(register_request).unwrap();
catalog_list.register_table_sync(register_request).unwrap();
let table = catalog_list
.table(
DEFAULT_CATALOG_NAME,
@@ -377,8 +331,8 @@ mod tests {
#[test]
pub fn test_register_catalog_sync() {
let list = MemoryCatalogManager::with_default_setup();
assert!(list.register_catalog("test_catalog").unwrap());
assert!(!list.register_catalog("test_catalog").unwrap());
assert!(list.register_catalog_sync("test_catalog").unwrap());
assert!(!list.register_catalog_sync("test_catalog").unwrap());
}
#[tokio::test]
@@ -393,7 +347,7 @@ mod tests {
table_id: 2333,
table: NumbersTable::table(2333),
};
catalog.register_local_table(register_table_req).unwrap();
catalog.register_table_sync(register_table_req).unwrap();
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
@@ -405,48 +359,11 @@ mod tests {
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
};
catalog
.deregister_local_table(deregister_table_req)
.unwrap();
catalog.deregister_table_sync(deregister_table_req).unwrap();
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap()
.is_none());
}
#[tokio::test]
async fn test_catalog_deregister_schema() {
let catalog = MemoryCatalogManager::with_default_setup();
// Registers a catalog, a schema, and a table.
let catalog_name = "foo_catalog".to_string();
let schema_name = "foo_schema".to_string();
let table_name = "foo_table".to_string();
let schema = RegisterSchemaRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
};
let table = RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name,
table_id: 0,
table: NumbersTable::table(0),
};
catalog.register_local_catalog(&catalog_name).unwrap();
catalog.register_local_schema(schema).unwrap();
catalog.register_local_table(table).unwrap();
let request = DeregisterSchemaRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
};
assert!(catalog.deregister_local_schema(request).unwrap());
assert!(!catalog
.schema_exist(&catalog_name, &schema_name)
.await
.unwrap());
}
}

View File

@@ -1,65 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The `tables` table in system catalog keeps a record of all tables created by user.
use std::sync::Arc;
use table::metadata::TableId;
use crate::system::SystemCatalogTable;
pub struct InformationSchema {
pub system: Arc<SystemCatalogTable>,
}
pub struct SystemCatalog {
pub information_schema: Arc<InformationSchema>,
}
impl SystemCatalog {
pub(crate) fn new(system: SystemCatalogTable) -> Self {
let schema = InformationSchema {
system: Arc::new(system),
};
Self {
information_schema: Arc::new(schema),
}
}
pub async fn register_table(
&self,
catalog: String,
schema: String,
table_name: String,
table_id: TableId,
engine: String,
) -> crate::error::Result<usize> {
self.information_schema
.system
.register_table(catalog, schema, table_name, table_id, engine)
.await
}
pub async fn register_schema(
&self,
catalog: String,
schema: String,
) -> crate::error::Result<usize> {
self.information_schema
.system
.register_schema(catalog, schema)
.await
}
}

View File

@@ -253,11 +253,11 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let datanode_clients = Arc::new(DatanodeClients::default());
let catalog_list = Arc::new(FrontendCatalogManager::new(
let catalog_list = FrontendCatalogManager::new(
cached_meta_backend.clone(),
cached_meta_backend.clone(),
datanode_clients,
));
);
let plugins: Arc<Plugins> = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,

View File

@@ -311,11 +311,11 @@ impl StartCommand {
.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();
let catalog_manager = Arc::new(FrontendCatalogManager::new(
let catalog_manager = FrontendCatalogManager::new(
kv_store.clone(),
Arc::new(DummyKvCacheInvalidator),
Arc::new(StandaloneDatanodeManager(region_server.clone())),
));
);
catalog_manager
.table_metadata_manager_ref()

View File

@@ -14,22 +14,16 @@
use std::any::Any;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use catalog::error::{
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult,
TableMetadataManagerSnafu,
};
use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES};
use catalog::local::MemoryCatalogManager;
use catalog::remote::KvCacheInvalidatorRef;
use catalog::{
CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
RegisterTableRequest,
};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
};
use catalog::CatalogManager;
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, Context};
use common_meta::datanode_manager::DatanodeManagerRef;
@@ -51,34 +45,11 @@ use table::TableRef;
use crate::table::DistTable;
// There are two sources for finding a table: the `local_catalog_manager` and the
// `table_metadata_manager`.
//
// The `local_catalog_manager` is for storing tables that are often transparent, not saving any
// real data. For example, our system tables, the `numbers` table and the "information_schema"
// table.
//
// The `table_metadata_manager`, on the other hand, is for storing tables that are created by users,
// obviously.
//
// For now, separating the two makes the code simpler, at least in the retrieval site. Now we have
// `numbers` and `information_schema` system tables. Both have their special implementations. If we
// put them with other ordinary tables that are created by users, we need to check the table name
// to decide which `TableRef` to return. Like this:
//
// ```rust
// match table_name {
// "numbers" => ... // return NumbersTable impl
// "information_schema" => ... // return InformationSchemaTable impl
// _ => .. // return DistTable impl
// }
// ```
//
// On the other hand, because we use `MemoryCatalogManager` for system tables, we can easily store
// and retrieve the concrete implementation of the system tables by their names, no more "if-else"s.
//
// However, if the system table is designed to have more features in the future, we may revisit
// the implementation here.
/// Access all existing catalog, schema and tables.
///
/// The result comes from two source, all the user tables are presented in
/// a kv-backend which persists the metadata of a table. And system tables
/// comes from [SystemCatalog], which is static and read-only.
#[derive(Clone)]
pub struct FrontendCatalogManager {
// TODO(LFC): Maybe use a real implementation for Standalone mode.
@@ -88,7 +59,8 @@ pub struct FrontendCatalogManager {
partition_manager: PartitionRuleManagerRef,
table_metadata_manager: TableMetadataManagerRef,
datanode_manager: DatanodeManagerRef,
local_catalog_manager: Arc<MemoryCatalogManager>,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
}
#[async_trait::async_trait]
@@ -135,14 +107,16 @@ impl FrontendCatalogManager {
backend: KvBackendRef,
backend_cache_invalidator: KvCacheInvalidatorRef,
datanode_manager: DatanodeManagerRef,
) -> Self {
Self {
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
backend_cache_invalidator,
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
datanode_manager,
local_catalog_manager: MemoryCatalogManager::new(),
}
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
},
})
}
pub fn partition_manager(&self) -> PartitionRuleManagerRef {
@@ -166,32 +140,6 @@ impl FrontendCatalogManager {
#[async_trait::async_trait]
impl CatalogManager for FrontendCatalogManager {
fn register_local_catalog(&self, name: &str) -> CatalogResult<bool> {
self.local_catalog_manager.register_catalog(name)
}
fn register_local_table(&self, request: RegisterTableRequest) -> CatalogResult<bool> {
self.local_catalog_manager.register_table(request)
}
fn deregister_local_table(&self, _request: DeregisterTableRequest) -> CatalogResult<()> {
Ok(())
}
fn register_local_schema(
&self,
_request: RegisterSchemaRequest,
) -> catalog::error::Result<bool> {
unimplemented!("FrontendCatalogManager does not support registering schema")
}
fn deregister_local_schema(
&self,
_request: DeregisterSchemaRequest,
) -> catalog_err::Result<bool> {
unimplemented!("FrontendCatalogManager does not support deregistering schema")
}
async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
let stream = self
.table_metadata_manager
@@ -218,11 +166,13 @@ impl CatalogManager for FrontendCatalogManager {
.try_collect::<BTreeSet<_>>()
.await
.map_err(BoxedError::new)
.context(ListSchemasSnafu { catalog })?;
.context(ListSchemasSnafu { catalog })?
.into_iter()
.collect::<Vec<_>>();
keys.insert(INFORMATION_SCHEMA_NAME.to_string());
keys.extend_from_slice(&self.system_catalog.schema_names());
Ok(keys.into_iter().collect::<Vec<_>>())
Ok(keys)
}
async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult<Vec<String>> {
@@ -235,13 +185,7 @@ impl CatalogManager for FrontendCatalogManager {
.into_iter()
.map(|(k, _)| k)
.collect::<Vec<String>>();
if catalog == DEFAULT_CATALOG_NAME && schema == DEFAULT_SCHEMA_NAME {
tables.push(NUMBERS_TABLE_NAME.to_string());
}
if schema == INFORMATION_SCHEMA_NAME {
tables.push(TABLES.to_string());
tables.push(COLUMNS.to_string());
}
tables.extend_from_slice(&self.system_catalog.table_names(schema));
Ok(tables)
}
@@ -255,9 +199,10 @@ impl CatalogManager for FrontendCatalogManager {
}
async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
if schema == INFORMATION_SCHEMA_NAME {
if self.system_catalog.schema_exist(schema) {
return Ok(true);
}
self.table_metadata_manager
.schema_manager()
.exist(SchemaNameKey::new(catalog, schema))
@@ -266,7 +211,7 @@ impl CatalogManager for FrontendCatalogManager {
}
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
if schema == INFORMATION_SCHEMA_NAME && (table == TABLES || table == COLUMNS) {
if self.system_catalog.table_exist(schema, table) {
return Ok(true);
}
@@ -285,19 +230,8 @@ impl CatalogManager for FrontendCatalogManager {
schema: &str,
table_name: &str,
) -> CatalogResult<Option<TableRef>> {
if catalog == DEFAULT_CATALOG_NAME
&& schema == DEFAULT_SCHEMA_NAME
&& table_name == NUMBERS_TABLE_NAME
{
return Ok(Some(NumbersTable::table(NUMBERS_TABLE_ID)));
}
if schema == INFORMATION_SCHEMA_NAME {
let manager = Arc::new(self.clone()) as _;
let provider =
InformationSchemaProvider::new(catalog.to_string(), Arc::downgrade(&manager));
return Ok(provider.table(table_name));
if let Some(table) = self.system_catalog.table(catalog, schema, table_name) {
return Ok(Some(table));
}
let key = TableNameKey::new(catalog, schema, table_name);
@@ -334,3 +268,57 @@ impl CatalogManager for FrontendCatalogManager {
self
}
}
// TODO: This struct can hold a static map of all system tables when
// the upper layer (e.g., procedure) can inform the catalog manager
// a new catalog is created.
/// Existing system tables:
/// - public.numbers
/// - information_schema.tables
/// - information_schema.columns
#[derive(Clone)]
struct SystemCatalog {
catalog_manager: Weak<FrontendCatalogManager>,
}
impl SystemCatalog {
fn schema_names(&self) -> Vec<String> {
vec![INFORMATION_SCHEMA_NAME.to_string()]
}
fn table_names(&self, schema: &str) -> Vec<String> {
if schema == INFORMATION_SCHEMA_NAME {
vec![TABLES.to_string(), COLUMNS.to_string()]
} else if schema == DEFAULT_SCHEMA_NAME {
vec![NUMBERS_TABLE_NAME.to_string()]
} else {
vec![]
}
}
fn schema_exist(&self, schema: &str) -> bool {
schema == INFORMATION_SCHEMA_NAME
}
fn table_exist(&self, schema: &str, table: &str) -> bool {
if schema == INFORMATION_SCHEMA_NAME {
table == TABLES || table == COLUMNS
} else if schema == DEFAULT_SCHEMA_NAME {
table == NUMBERS_TABLE_NAME
} else {
false
}
}
fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Option<TableRef> {
if schema == INFORMATION_SCHEMA_NAME {
let information_schema_provider =
InformationSchemaProvider::new(catalog.to_string(), self.catalog_manager.clone());
information_schema_provider.table(table_name)
} else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
Some(NumbersTable::table(NUMBERS_TABLE_ID))
} else {
None
}
}
}

View File

@@ -27,7 +27,6 @@ use std::time::Duration;
use api::v1::meta::Role;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::local::manager::SystemTableInitializer;
use catalog::remote::CachedMetaKvBackend;
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
@@ -78,16 +77,14 @@ use sql::statements::copy::CopyTable;
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use table::engine::manager::MemoryTableEngineManager;
use self::distributed::DistRegionRequestHandler;
use self::standalone::StandaloneTableMetadataCreator;
use crate::catalog::FrontendCatalogManager;
use crate::delete::{Deleter, DeleterRef};
use crate::error::{
self, CatalogSnafu, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu,
MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result,
SqlExecInterceptedSnafu,
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu,
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
};
use crate::frontend::FrontendOptions;
use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
@@ -154,11 +151,11 @@ impl Instance {
) -> Result<Self> {
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let catalog_manager = Arc::new(FrontendCatalogManager::new(
let catalog_manager = FrontendCatalogManager::new(
meta_backend.clone(),
meta_backend.clone(),
datanode_clients.clone(),
));
);
let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone()));
let region_request_handler = DistRegionRequestHandler::arc(
@@ -398,14 +395,6 @@ impl FrontendInstance for Instance {
heartbeat_task.start().await?;
}
let initializer = SystemTableInitializer::try_new(
Arc::new(MemoryTableEngineManager::new_empty()),
self.catalog_manager.clone(),
)
.await
.context(CatalogSnafu)?;
initializer.init().await.context(CatalogSnafu)?;
self.script_executor.start(self).await?;
futures::future::try_join_all(self.servers.values().map(start_server))

View File

@@ -1376,7 +1376,7 @@ mod test {
use std::time::{Duration, UNIX_EPOCH};
use catalog::local::MemoryCatalogManager;
use catalog::{CatalogManager, RegisterTableRequest};
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
@@ -1432,7 +1432,7 @@ mod test {
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(catalog_list
.register_local_table(RegisterTableRequest {
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name,

View File

@@ -486,7 +486,7 @@ mod tests {
use std::borrow::Cow::Borrowed;
use std::sync::Arc;
use catalog::{CatalogManager, RegisterTableRequest};
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
@@ -515,7 +515,7 @@ mod tests {
table_id: NUMBERS_TABLE_ID,
table: NumbersTable::table(NUMBERS_TABLE_ID),
};
catalog_manager.register_local_table(req).unwrap();
catalog_manager.register_table_sync(req).unwrap();
QueryEngineFactory::new(catalog_manager, None, false).query_engine()
}

View File

@@ -326,7 +326,7 @@ fn have_range_in_exprs(exprs: &Vec<Expr>) -> bool {
mod test {
use catalog::local::MemoryCatalogManager;
use catalog::{CatalogManager, RegisterTableRequest};
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
@@ -380,7 +380,7 @@ mod test {
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(catalog_list
.register_local_table(RegisterTableRequest {
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name,

View File

@@ -112,7 +112,7 @@ fn catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
table_id: NUMBERS_TABLE_ID,
table: NumbersTable::table(NUMBERS_TABLE_ID),
};
let _ = catalog_manager.register_table(req).unwrap();
let _ = catalog_manager.register_table_sync(req).unwrap();
Ok(catalog_manager)
}

View File

@@ -104,7 +104,7 @@ fn create_test_engine() -> TimeRangeTester {
table_id: table.table_info().ident.table_id,
table: table.clone(),
};
let _ = catalog_manager.register_table(req).unwrap();
let _ = catalog_manager.register_table_sync(req).unwrap();
let engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
TimeRangeTester { engine, filter }

View File

@@ -15,7 +15,7 @@
//! Procedure to create a table.
use async_trait::async_trait;
use catalog::{CatalogManagerRef, RegisterTableRequest};
use catalog::CatalogManagerRef;
use common_procedure::error::SubprocedureFailedSnafu;
use common_procedure::{
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
@@ -240,25 +240,13 @@ impl CreateTableProcedure {
region_numbers: self.data.request.region_numbers.clone(),
};
// Safety: The table is already created.
let table = self
let _ = self
.table_engine
.open_table(&engine_ctx, open_req)
.await
.map_err(Error::from_error_ext)?
.unwrap();
let register_req = RegisterTableRequest {
catalog: self.data.request.catalog_name.clone(),
schema: self.data.request.schema_name.clone(),
table_name: self.data.request.table_name.clone(),
table_id: self.data.request.id,
table,
};
let _ = self
.catalog_manager
.register_local_table(register_req)
.map_err(Error::from_error_ext)?;
Ok(Status::Done)
}
}

View File

@@ -15,7 +15,7 @@
//! Procedure to drop a table.
use async_trait::async_trait;
use catalog::{CatalogManagerRef, DeregisterTableRequest};
use catalog::CatalogManagerRef;
use common_procedure::error::SubprocedureFailedSnafu;
use common_procedure::{
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
@@ -144,29 +144,6 @@ impl DropTableProcedure {
}
async fn on_remove_from_catalog(&mut self) -> Result<Status> {
let request = &self.data.request;
let has_table = self
.catalog_manager
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await
.context(AccessCatalogSnafu)?
.is_some();
if has_table {
// The table is still in the catalog.
let deregister_table_req = DeregisterTableRequest {
catalog: self.data.request.catalog_name.clone(),
schema: self.data.request.schema_name.clone(),
table_name: self.data.request.table_name.clone(),
};
self.catalog_manager
.deregister_local_table(deregister_table_req)
.context(AccessCatalogSnafu)?;
}
self.data.state = DropTableState::EngineDropTable;
// Assign procedure id to the subprocedure.
self.data.subprocedure_id = Some(ProcedureId::random());
@@ -264,45 +241,10 @@ impl DropTableData {
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use table::engine::TableEngine;
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_drop_table_procedure() {
let env = TestEnv::new("drop");
let table_name = "test_drop";
let table_id = env.create_table(table_name).await;
let request = DropTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
};
let TestEnv {
dir: _dir,
table_engine,
procedure_manager,
catalog_manager,
} = env;
let procedure =
DropTableProcedure::new(request, catalog_manager.clone(), table_engine.clone());
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap();
watcher.changed().await.unwrap();
assert!(!catalog_manager
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
let ctx = EngineContext::default();
assert!(!table_engine.table_exists(&ctx, table_id,));
}
#[tokio::test]
async fn test_drop_not_exists_table() {
common_telemetry::init_default_ut_logging();

View File

@@ -20,7 +20,7 @@ use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::{ProcedureManagerRef, ProcedureWithId};
use common_procedure::ProcedureManagerRef;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
@@ -32,11 +32,8 @@ use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::metadata::TableId;
use table::requests::CreateTableRequest;
use crate::CreateTableProcedure;
pub struct TestEnv {
pub dir: TempDir,
pub table_engine: Arc<MitoEngine<EngineImpl<NoopLogStore>>>,
@@ -92,27 +89,6 @@ impl TestEnv {
catalog_manager,
}
}
pub async fn create_table(&self, table_name: &str) -> TableId {
let request = new_create_request(table_name);
let table_id = request.id;
let procedure = CreateTableProcedure::new(
request,
self.catalog_manager.clone(),
self.table_engine.clone(),
self.table_engine.clone(),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.unwrap();
watcher.changed().await.unwrap();
table_id
}
}
pub fn schema_for_test() -> RawSchema {

View File

@@ -234,45 +234,10 @@ impl TruncateTableData {
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use table::engine::TableEngine;
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_truncate_table_procedure() {
let env = TestEnv::new("truncate");
let table_name = "test_truncate";
let table_id = env.create_table(table_name).await;
let request = TruncateTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
};
let TestEnv {
dir: _dir,
table_engine,
procedure_manager,
catalog_manager,
} = env;
let procedure =
TruncateTableProcedure::new(request, catalog_manager.clone(), table_engine.clone());
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap();
watcher.changed().await.unwrap();
assert!(catalog_manager
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
let ctx = EngineContext::default();
assert!(table_engine.table_exists(&ctx, table_id));
}
#[tokio::test]
async fn test_truncate_not_exists_table() {
common_telemetry::init_default_ut_logging();

View File

@@ -26,7 +26,6 @@ use crate::requests::{
TruncateTableRequest,
};
use crate::TableRef;
pub mod manager;
/// Represents a resolved path to a table of the form “catalog.schema.table”
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]

View File

@@ -1,194 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_telemetry::error;
use snafu::{ensure, OptionExt};
use crate::engine::{TableEngineProcedureRef, TableEngineRef};
use crate::error::{EngineExistSnafu, EngineNotFoundSnafu, Result};
#[async_trait::async_trait]
pub trait TableEngineManager: Send + Sync {
/// returns [Error::EngineNotFound](crate::error::Error::EngineNotFound) if engine not found
fn engine(&self, name: &str) -> Result<TableEngineRef>;
/// returns [Error::EngineExist](crate::error::Error::EngineExist) if engine exists
fn register_engine(&self, name: &str, engine: TableEngineRef) -> Result<()>;
/// closes all registered engines
async fn close(&self) -> Result<()>;
/// returns [TableEngineProcedureRef] of specific engine `name` or
/// [Error::EngineNotFound](crate::error::Error::EngineNotFound) if engine not found
fn engine_procedure(&self, name: &str) -> Result<TableEngineProcedureRef>;
}
pub type TableEngineManagerRef = Arc<dyn TableEngineManager>;
/// Simple in-memory table engine manager
pub struct MemoryTableEngineManager {
pub engines: RwLock<HashMap<String, TableEngineRef>>,
engine_procedures: RwLock<HashMap<String, TableEngineProcedureRef>>,
}
impl MemoryTableEngineManager {
/// Create a new [MemoryTableEngineManager] with single table `engine`.
pub fn new(engine: TableEngineRef) -> Self {
MemoryTableEngineManager::alias(engine.name().to_string(), engine)
}
// TODO: remove `TableEngineManager`
pub fn new_empty() -> Self {
let engines = RwLock::new(HashMap::new());
let engine_procedures = RwLock::new(HashMap::new());
MemoryTableEngineManager {
engines,
engine_procedures,
}
}
/// Create a new [MemoryTableEngineManager] with single table `engine` and
/// an alias `name` instead of the engine's name.
pub fn alias(name: String, engine: TableEngineRef) -> Self {
let engines = HashMap::from([(name, engine)]);
let engines = RwLock::new(engines);
MemoryTableEngineManager {
engines,
engine_procedures: RwLock::new(HashMap::new()),
}
}
/// Attach the `engine_procedures` to the manager.
pub fn with_engine_procedures(
mut self,
engine_procedures: HashMap<String, TableEngineProcedureRef>,
) -> Self {
self.engine_procedures = RwLock::new(engine_procedures);
self
}
pub fn with(engines: Vec<TableEngineRef>) -> Self {
let engines = engines
.into_iter()
.map(|engine| (engine.name().to_string(), engine))
.collect::<HashMap<_, _>>();
let engines = RwLock::new(engines);
MemoryTableEngineManager {
engines,
engine_procedures: RwLock::new(HashMap::new()),
}
}
}
#[async_trait]
impl TableEngineManager for MemoryTableEngineManager {
fn engine(&self, name: &str) -> Result<TableEngineRef> {
let engines = self.engines.read().unwrap();
engines
.get(name)
.cloned()
.context(EngineNotFoundSnafu { engine: name })
}
fn register_engine(&self, name: &str, engine: TableEngineRef) -> Result<()> {
let mut engines = self.engines.write().unwrap();
ensure!(
!engines.contains_key(name),
EngineExistSnafu { engine: name }
);
let _ = engines.insert(name.to_string(), engine);
Ok(())
}
async fn close(&self) -> Result<()> {
let engines = {
let engines = self.engines.write().unwrap();
engines.values().cloned().collect::<Vec<_>>()
};
if let Err(err) =
futures::future::try_join_all(engines.iter().map(|engine| engine.close())).await
{
error!("Failed to close engine: {}", err);
}
Ok(())
}
fn engine_procedure(&self, name: &str) -> Result<TableEngineProcedureRef> {
let engine_procedures = self.engine_procedures.read().unwrap();
engine_procedures
.get(name)
.cloned()
.context(EngineNotFoundSnafu { engine: name })
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::engine::TableEngine;
use crate::error;
use crate::test_util::MockTableEngine;
#[test]
fn test_table_engine_manager() {
let table_engine = MockTableEngine::new();
let table_engine_ref = Arc::new(table_engine);
let table_engine_manager = MemoryTableEngineManager::new(table_engine_ref.clone());
// Attach engine procedures.
let engine_procedure: TableEngineProcedureRef = table_engine_ref.clone();
let engine_procedures =
HashMap::from([(table_engine_ref.name().to_string(), engine_procedure)]);
let table_engine_manager = table_engine_manager.with_engine_procedures(engine_procedures);
table_engine_manager
.register_engine("yet_another", table_engine_ref.clone())
.unwrap();
let got = table_engine_manager.engine(table_engine_ref.name());
assert_eq!(got.unwrap().name(), table_engine_ref.name());
let got = table_engine_manager.engine("yet_another");
assert_eq!(got.unwrap().name(), table_engine_ref.name());
let missing = table_engine_manager.engine("not_exists");
assert_matches!(missing.err().unwrap(), error::Error::EngineNotFound { .. });
assert!(table_engine_manager
.engine_procedure(table_engine_ref.name())
.is_ok());
assert_matches!(
table_engine_manager
.engine_procedure("unknown")
.err()
.unwrap(),
error::Error::EngineNotFound { .. }
);
}
}