refactor: system tables in new region server (#2344)

refactor: inverse the dependency between system tables and catalog manager
This commit is contained in:
LFC
2023-09-12 11:36:21 +08:00
committed by Ruihang Xia
parent 3cab6de391
commit fe954b78a2
22 changed files with 188 additions and 1030 deletions

View File

@@ -22,8 +22,6 @@ use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
use tokio::task::JoinError;
use crate::DeregisterTableRequest;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
@@ -179,20 +177,6 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display(
"Failed to deregister table, request: {:?}, source: {}",
request,
source
))]
DeregisterTable {
request: DeregisterTableRequest,
location: Location,
source: table::error::Error,
},
#[snafu(display("Illegal catalog manager state: {}", msg))]
IllegalManagerState { location: Location, msg: String },
#[snafu(display("Failed to scan system catalog table, source: {}", source))]
SystemCatalogTableScan {
location: Location,
@@ -269,7 +253,6 @@ impl ErrorExt for Error {
Error::InvalidKey { .. }
| Error::SchemaNotFound { .. }
| Error::TableNotFound { .. }
| Error::IllegalManagerState { .. }
| Error::CatalogNotFound { .. }
| Error::InvalidEntryType { .. }
| Error::ParallelOpenTable { .. } => StatusCode::Unexpected,
@@ -302,7 +285,6 @@ impl ErrorExt for Error {
| Error::InsertCatalogRecord { source, .. }
| Error::OpenTable { source, .. }
| Error::CreateTable { source, .. }
| Error::DeregisterTable { source, .. }
| Error::TableSchemaMismatch { source, .. } => source.status_code(),
Error::MetaSrv { source, .. } => source.status_code(),

View File

@@ -44,36 +44,40 @@ pub mod tables;
pub trait CatalogManager: Send + Sync {
fn as_any(&self) -> &dyn Any;
/// Starts a catalog manager.
async fn start(&self) -> Result<()>;
/// Register a local catalog.
///
/// # Returns
///
/// Whether the catalog is registered.
fn register_local_catalog(&self, name: &str) -> Result<bool>;
/// Registers a catalog to catalog manager, returns whether the catalog exist before.
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool>;
/// Register a schema with catalog name and schema name. Retuens whether the
/// schema registered.
/// Register a local schema.
///
/// # Returns
///
/// Whether the schema is registered.
///
/// # Errors
///
/// This method will/should fail if catalog not exist
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
/// Deregisters a database within given catalog/schema to catalog manager
async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;
fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;
/// Registers a table within given catalog/schema to catalog manager,
/// returns whether the table registered.
/// Registers a local table.
///
/// # Returns
///
/// Whether the table is registered.
///
/// # Errors
///
/// This method will/should fail if catalog or schema not exist
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool>;
fn register_local_table(&self, request: RegisterTableRequest) -> Result<bool>;
/// Deregisters a table within given catalog/schema to catalog manager
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()>;
/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;
fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()>;
async fn catalog_names(&self) -> Result<Vec<String>>;
@@ -160,7 +164,7 @@ pub struct RegisterSchemaRequest {
pub schema: String,
}
pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
pub(crate) async fn handle_system_table_request<'a, M: CatalogManager + ?Sized>(
manager: &'a M,
engine: TableEngineRef,
sys_table_requests: &'a mut Vec<RegisterSystemTableRequest>,
@@ -185,15 +189,13 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
table_name,
),
})?;
let _ = manager
.register_table(RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name: table_name.clone(),
table_id,
table: table.clone(),
})
.await?;
manager.register_local_table(RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name: table_name.clone(),
table_id,
table: table.clone(),
})?;
info!("Created and registered system table: {table_name}");
table
};

View File

@@ -15,5 +15,4 @@
pub mod manager;
pub mod memory;
pub use manager::LocalCatalogManager;
pub use memory::{new_memory_catalog_manager, MemoryCatalogManager};

View File

@@ -12,77 +12,61 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID,
MITO_ENGINE, NUMBERS_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID,
SYSTEM_CATALOG_TABLE_NAME,
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE,
NUMBERS_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME,
};
use common_catalog::format_full_table_name;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{error, info};
use common_telemetry::info;
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{BinaryVector, UInt8Vector};
use futures_util::lock::Mutex;
use metrics::increment_gauge;
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
use table::engine::EngineContext;
use table::metadata::TableId;
use table::requests::OpenTableRequest;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table::TableIdProvider;
use table::TableRef;
use crate::error::{
self, CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu,
Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu,
SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, TableNotExistSnafu,
TableNotFoundSnafu, UnimplementedSnafu,
CatalogNotFoundSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result, SchemaNotFoundSnafu,
SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu,
TableNotFoundSnafu,
};
use crate::local::memory::MemoryCatalogManager;
use crate::system::{
decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX,
VALUE_INDEX,
};
use crate::tables::SystemCatalog;
use crate::{
handle_system_table_request, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
handle_system_table_request, CatalogManagerRef, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest,
};
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
pub struct LocalCatalogManager {
pub struct SystemTableInitializer {
system: Arc<SystemCatalog>,
catalogs: Arc<MemoryCatalogManager>,
catalog_manager: CatalogManagerRef,
engine_manager: TableEngineManagerRef,
next_table_id: AtomicU32,
init_lock: Mutex<bool>,
register_lock: Mutex<()>,
system_table_requests: Mutex<Vec<RegisterSystemTableRequest>>,
}
impl LocalCatalogManager {
/// Create a new [CatalogManager] with given user catalogs and mito engine
pub async fn try_new(engine_manager: TableEngineManagerRef) -> Result<Self> {
impl SystemTableInitializer {
pub async fn try_new(
engine_manager: TableEngineManagerRef,
catalog_manager: CatalogManagerRef,
) -> Result<Self> {
let engine = engine_manager
.engine(MITO_ENGINE)
.context(TableEngineNotFoundSnafu {
engine_name: MITO_ENGINE,
})?;
let table = SystemCatalogTable::new(engine.clone()).await?;
let memory_catalog_manager = crate::local::memory::new_memory_catalog_manager()?;
let system_catalog = Arc::new(SystemCatalog::new(table));
Ok(Self {
system: system_catalog,
catalogs: memory_catalog_manager,
catalog_manager,
engine_manager,
next_table_id: AtomicU32::new(MIN_USER_TABLE_ID),
init_lock: Mutex::new(false),
register_lock: Mutex::new(()),
system_table_requests: Mutex::new(Vec::default()),
})
}
@@ -92,15 +76,7 @@ impl LocalCatalogManager {
self.init_system_catalog().await?;
let system_records = self.system.information_schema.system.records().await?;
let entries = self.collect_system_catalog_entries(system_records).await?;
let max_table_id = self.handle_system_catalog_entries(entries).await?;
info!(
"All system catalog entries processed, max table id: {}",
max_table_id
);
self.next_table_id
.store((max_table_id + 1).max(MIN_USER_TABLE_ID), Ordering::Relaxed);
*self.init_lock.lock().await = true;
self.handle_system_catalog_entries(entries).await?;
// Processing system table hooks
let mut sys_table_requests = self.system_table_requests.lock().await;
@@ -111,26 +87,24 @@ impl LocalCatalogManager {
engine_name: MITO_ENGINE,
})?;
handle_system_table_request(self, engine, &mut sys_table_requests).await?;
handle_system_table_request(
self.catalog_manager.as_ref(),
engine,
&mut sys_table_requests,
)
.await?;
Ok(())
}
async fn init_system_catalog(&self) -> Result<()> {
// register default catalog and default schema
self.catalogs
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string())?;
self.catalogs.register_schema_sync(RegisterSchemaRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
})?;
let catalog_manager = &self.catalog_manager;
catalog_manager.register_local_catalog(SYSTEM_CATALOG_NAME)?;
// register SystemCatalogTable
self.catalogs
.register_catalog_sync(SYSTEM_CATALOG_NAME.to_string())?;
self.catalogs.register_schema_sync(RegisterSchemaRequest {
catalog_manager.register_local_schema(RegisterSchemaRequest {
catalog: SYSTEM_CATALOG_NAME.to_string(),
schema: INFORMATION_SCHEMA_NAME.to_string(),
})?;
let register_table_req = RegisterTableRequest {
catalog: SYSTEM_CATALOG_NAME.to_string(),
schema: INFORMATION_SCHEMA_NAME.to_string(),
@@ -138,7 +112,7 @@ impl LocalCatalogManager {
table_id: SYSTEM_CATALOG_TABLE_ID,
table: self.system.information_schema.system.as_table_ref(),
};
self.catalogs.register_table(register_table_req).await?;
catalog_manager.register_local_table(register_table_req)?;
// Add numbers table for test
let register_number_table_req = RegisterTableRequest {
@@ -149,9 +123,7 @@ impl LocalCatalogManager {
table: NumbersTable::table(NUMBERS_TABLE_ID),
};
self.catalogs
.register_table(register_number_table_req)
.await?;
catalog_manager.register_local_table(register_number_table_req)?;
Ok(())
}
@@ -216,16 +188,14 @@ impl LocalCatalogManager {
Ok(res)
}
/// Processes records from system catalog table and returns the max table id persisted
/// in system catalog table.
async fn handle_system_catalog_entries(&self, entries: Vec<Entry>) -> Result<TableId> {
/// Processes records from system catalog table.
async fn handle_system_catalog_entries(&self, entries: Vec<Entry>) -> Result<()> {
let entries = Self::sort_entries(entries);
let mut max_table_id = 0;
for entry in entries {
match entry {
Entry::Catalog(c) => {
self.catalogs
.register_catalog_sync(c.catalog_name.clone())?;
self.catalog_manager
.register_local_catalog(&c.catalog_name)?;
info!("Register catalog: {}", c.catalog_name);
}
Entry::Schema(s) => {
@@ -233,11 +203,10 @@ impl LocalCatalogManager {
catalog: s.catalog_name.clone(),
schema: s.schema_name.clone(),
};
let _ = self.catalogs.register_schema_sync(req)?;
self.catalog_manager.register_local_schema(req)?;
info!("Registered schema: {:?}", s);
}
Entry::Table(t) => {
max_table_id = max_table_id.max(t.table_id);
if t.is_deleted {
continue;
}
@@ -246,7 +215,7 @@ impl LocalCatalogManager {
}
}
}
Ok(max_table_id)
Ok(())
}
/// Sort catalog entries to ensure catalog entries comes first, then schema entries,
@@ -298,19 +267,8 @@ impl LocalCatalogManager {
table_id: t.table_id,
table: table_ref,
};
let _ = self.catalogs.register_table(register_request).await?;
Ok(())
}
async fn check_state(&self) -> Result<()> {
let started = self.init_lock.lock().await;
ensure!(
*started,
IllegalManagerStateSnafu {
msg: "Catalog manager not started",
}
);
self.catalog_manager
.register_local_table(register_request)?;
Ok(())
}
@@ -319,11 +277,11 @@ impl LocalCatalogManager {
catalog_name: &str,
schema_name: &str,
) -> Result<()> {
if !self.catalogs.catalog_exist(catalog_name).await? {
if !self.catalog_manager.catalog_exist(catalog_name).await? {
return CatalogNotFoundSnafu { catalog_name }.fail()?;
}
if !self
.catalogs
.catalog_manager
.schema_exist(catalog_name, schema_name)
.await?
{
@@ -337,234 +295,6 @@ impl LocalCatalogManager {
}
}
#[async_trait::async_trait]
impl TableIdProvider for LocalCatalogManager {
async fn next_table_id(&self) -> table::Result<TableId> {
Ok(self.next_table_id.fetch_add(1, Ordering::Relaxed))
}
}
#[async_trait::async_trait]
impl CatalogManager for LocalCatalogManager {
/// Start [LocalCatalogManager] to load all information from system catalog table.
/// Make sure table engine is initialized before starting [MemoryCatalogManager].
async fn start(&self) -> Result<()> {
self.init().await
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
self.check_state().await?;
let catalog_name = request.catalog.clone();
let schema_name = request.schema.clone();
self.check_catalog_schema_exist(&catalog_name, &schema_name)
.await?;
{
let _lock = self.register_lock.lock().await;
if let Some(existing) = self
.catalogs
.table(&request.catalog, &request.schema, &request.table_name)
.await?
{
if existing.table_info().ident.table_id != request.table_id {
error!(
"Unexpected table register request: {:?}, existing: {:?}",
request,
existing.table_info()
);
return TableExistsSnafu {
table: format_full_table_name(
&catalog_name,
&schema_name,
&request.table_name,
),
}
.fail();
}
// Try to register table with same table id, just ignore.
Ok(false)
} else {
// table does not exist
let engine = request.table.table_info().meta.engine.to_string();
let table_name = request.table_name.clone();
let table_id = request.table_id;
let _ = self.catalogs.register_table(request).await?;
let _ = self
.system
.register_table(
catalog_name.clone(),
schema_name.clone(),
table_name,
table_id,
engine,
)
.await?;
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&catalog_name, &schema_name)],
);
Ok(true)
}
}
}
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
self.check_state().await?;
let catalog_name = &request.catalog;
let schema_name = &request.schema;
self.check_catalog_schema_exist(catalog_name, schema_name)
.await?;
ensure!(
self.catalogs
.table(catalog_name, schema_name, &request.new_table_name)
.await?
.is_none(),
TableExistsSnafu {
table: &request.new_table_name
}
);
let _lock = self.register_lock.lock().await;
let old_table = self
.catalogs
.table(catalog_name, schema_name, &request.table_name)
.await?
.context(TableNotExistSnafu {
table: &request.table_name,
})?;
let engine = old_table.table_info().meta.engine.to_string();
// rename table in system catalog
let _ = self
.system
.register_table(
catalog_name.clone(),
schema_name.clone(),
request.new_table_name.clone(),
request.table_id,
engine,
)
.await?;
self.catalogs.rename_table(request).await
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {
self.check_state().await?;
{
let _ = self.register_lock.lock().await;
let DeregisterTableRequest {
catalog,
schema,
table_name,
} = &request;
let table_id = self
.catalogs
.table(catalog, schema, table_name)
.await?
.with_context(|| error::TableNotExistSnafu {
table: format_full_table_name(catalog, schema, table_name),
})?
.table_info()
.ident
.table_id;
self.system.deregister_table(&request, table_id).await?;
self.catalogs.deregister_table(request).await
}
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
self.check_state().await?;
let catalog_name = &request.catalog;
let schema_name = &request.schema;
if !self.catalogs.catalog_exist(catalog_name).await? {
return CatalogNotFoundSnafu { catalog_name }.fail()?;
}
{
let _lock = self.register_lock.lock().await;
ensure!(
!self
.catalogs
.schema_exist(catalog_name, schema_name)
.await?,
SchemaExistsSnafu {
schema: schema_name,
}
);
let _ = self
.system
.register_schema(request.catalog.clone(), schema_name.clone())
.await?;
self.catalogs.register_schema_sync(request)
}
}
async fn deregister_schema(&self, _request: DeregisterSchemaRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister schema",
}
.fail()
}
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
self.catalogs.schema_exist(catalog, schema).await
}
async fn table(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> Result<Option<TableRef>> {
self.catalogs
.table(catalog_name, schema_name, table_name)
.await
}
async fn catalog_exist(&self, catalog: &str) -> Result<bool> {
if catalog.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) {
Ok(true)
} else {
self.catalogs.catalog_exist(catalog).await
}
}
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
self.catalogs.table_exist(catalog, schema, table).await
}
async fn catalog_names(&self) -> Result<Vec<String>> {
self.catalogs.catalog_names().await
}
async fn schema_names(&self, catalog_name: &str) -> Result<Vec<String>> {
self.catalogs.schema_names(catalog_name).await
}
async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result<Vec<String>> {
self.catalogs.table_names(catalog_name, schema_name).await
}
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool> {
self.catalogs.clone().register_catalog(name).await
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
@@ -608,7 +338,7 @@ mod tests {
is_deleted: false,
}),
];
let res = LocalCatalogManager::sort_entries(vec);
let res = SystemTableInitializer::sort_entries(vec);
assert_matches!(res[0], Entry::Catalog(..));
assert_matches!(res[1], Entry::Catalog(..));
assert_matches!(res[2], Entry::Schema(..));

View File

@@ -15,96 +15,47 @@
use std::any::Any;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock, Weak};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME};
use metrics::{decrement_gauge, increment_gauge};
use snafu::OptionExt;
use table::metadata::TableId;
use table::table::TableIdProvider;
use table::TableRef;
use crate::error::{
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::information_schema::InformationSchemaProvider;
use crate::{
CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
RegisterTableRequest, RenameTableRequest,
RegisterTableRequest,
};
type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
/// Simple in-memory list of catalogs
#[derive(Clone)]
pub struct MemoryCatalogManager {
/// Collection of catalogs containing schemas and ultimately Tables
pub catalogs: RwLock<HashMap<String, SchemaEntries>>,
pub table_id: AtomicU32,
}
#[async_trait::async_trait]
impl TableIdProvider for MemoryCatalogManager {
async fn next_table_id(&self) -> table::error::Result<TableId> {
Ok(self.table_id.fetch_add(1, Ordering::Relaxed))
}
catalogs: Arc<RwLock<HashMap<String, SchemaEntries>>>,
}
#[async_trait::async_trait]
impl CatalogManager for MemoryCatalogManager {
async fn start(&self) -> Result<()> {
self.table_id.store(MIN_USER_TABLE_ID, Ordering::Relaxed);
Ok(())
fn register_local_catalog(&self, name: &str) -> Result<bool> {
self.register_catalog(name)
}
fn register_local_table(&self, request: RegisterTableRequest) -> Result<bool> {
self.register_table(request)
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
self.register_table_sync(request)
}
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
.get_mut(&request.catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.get_mut(&request.schema)
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
// check old and new table names
if !schema.contains_key(&request.table_name) {
return TableNotFoundSnafu {
table_info: request.table_name.to_string(),
}
.fail()?;
}
if schema.contains_key(&request.new_table_name) {
return TableExistsSnafu {
table: &request.new_table_name,
}
.fail();
}
let table = schema.remove(&request.table_name).unwrap();
let _ = schema.insert(request.new_table_name, table);
Ok(true)
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {
fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()> {
self.deregister_table_sync(request)
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
self.register_schema_sync(request)
}
async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result<bool> {
fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let schemas = catalogs
.get_mut(&request.catalog)
@@ -203,28 +154,27 @@ impl CatalogManager for MemoryCatalogManager {
.collect())
}
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool> {
self.register_catalog_sync(name)
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl MemoryCatalogManager {
pub fn new() -> Arc<Self> {
Arc::new(Self {
catalogs: Default::default(),
})
}
/// Creates a manager with some default setups
/// (e.g. default catalog/schema and information schema)
pub fn with_default_setup() -> Arc<Self> {
let manager = Arc::new(Self {
table_id: AtomicU32::new(MIN_USER_TABLE_ID),
catalogs: Default::default(),
});
// Safety: default catalog/schema is registered in order so no CatalogNotFound error will occur
manager
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string())
.unwrap();
manager.register_catalog(DEFAULT_CATALOG_NAME).unwrap();
manager
.register_schema_sync(RegisterSchemaRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
@@ -252,12 +202,15 @@ impl MemoryCatalogManager {
}
/// Registers a catalog if it does not exist and returns false if the schema exists.
pub fn register_catalog_sync(self: &Arc<Self>, name: String) -> Result<bool> {
pub fn register_catalog(&self, name: &str) -> Result<bool> {
let name = name.to_string();
let mut catalogs = self.catalogs.write().unwrap();
match catalogs.entry(name.clone()) {
Entry::Vacant(e) => {
let catalog = self.create_catalog_entry(name);
let arc_self = Arc::new(self.clone());
let catalog = arc_self.create_catalog_entry(name);
e.insert(catalog);
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
Ok(true)
@@ -311,7 +264,7 @@ impl MemoryCatalogManager {
}
/// Registers a schema and returns an error if the catalog or schema does not exist.
pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
pub fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
.get_mut(&request.catalog)
@@ -356,7 +309,7 @@ impl MemoryCatalogManager {
let schema = &table.table_info().schema_name;
if !manager.catalog_exist_sync(catalog).unwrap() {
manager.register_catalog_sync(catalog.to_string()).unwrap();
manager.register_catalog(catalog).unwrap();
}
if !manager.schema_exist_sync(catalog, schema).unwrap() {
@@ -375,7 +328,7 @@ impl MemoryCatalogManager {
table_id: table.table_info().ident.table_id,
table,
};
let _ = manager.register_table_sync(request).unwrap();
let _ = manager.register_table(request).unwrap();
manager
}
}
@@ -388,8 +341,6 @@ pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
#[cfg(test)]
mod tests {
use common_catalog::consts::*;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
@@ -406,7 +357,7 @@ mod tests {
table: NumbersTable::table(NUMBERS_TABLE_ID),
};
let _ = catalog_list.register_table(register_request).await.unwrap();
catalog_list.register_local_table(register_request).unwrap();
let table = catalog_list
.table(
DEFAULT_CATALOG_NAME,
@@ -423,130 +374,11 @@ mod tests {
.is_none());
}
#[tokio::test]
async fn test_mem_manager_rename_table() {
let catalog = MemoryCatalogManager::with_default_setup();
let table_name = "test_table";
assert!(!catalog
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
// register test table
let table_id = 2333;
let register_request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
table: NumbersTable::table(table_id),
};
assert!(catalog.register_table(register_request).await.unwrap());
assert!(catalog
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
// rename test table
let new_table_name = "test_table_renamed";
let rename_request = RenameTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
new_table_name: new_table_name.to_string(),
table_id,
};
let _ = catalog.rename_table(rename_request).await.unwrap();
// test old table name not exist
assert!(!catalog
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
// test new table name exists
assert!(catalog
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap());
let registered_table = catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap()
.unwrap();
assert_eq!(registered_table.table_info().ident.table_id, table_id);
let dup_register_request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: new_table_name.to_string(),
table_id: table_id + 1,
table: NumbersTable::table(table_id + 1),
};
let result = catalog.register_table(dup_register_request).await;
let err = result.err().unwrap();
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}
#[tokio::test]
async fn test_catalog_rename_table() {
let catalog = MemoryCatalogManager::with_default_setup();
let table_name = "num";
let table_id = 2333;
let table = NumbersTable::table(table_id);
// register table
let register_table_req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
table,
};
assert!(catalog.register_table(register_table_req).await.unwrap());
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap()
.is_some());
// rename table
let new_table_name = "numbers_new";
let rename_table_req = RenameTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
new_table_name: new_table_name.to_string(),
table_id,
};
assert!(catalog.rename_table(rename_table_req).await.unwrap());
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap()
.is_none());
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap()
.is_some());
let registered_table = catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap()
.unwrap();
assert_eq!(registered_table.table_info().ident.table_id, table_id);
}
#[test]
pub fn test_register_catalog_sync() {
let list = MemoryCatalogManager::with_default_setup();
assert!(list
.register_catalog_sync("test_catalog".to_string())
.unwrap());
assert!(!list
.register_catalog_sync("test_catalog".to_string())
.unwrap());
assert!(list.register_catalog("test_catalog").unwrap());
assert!(!list.register_catalog("test_catalog").unwrap());
}
#[tokio::test]
@@ -561,7 +393,7 @@ mod tests {
table_id: 2333,
table: NumbersTable::table(2333),
};
let _ = catalog.register_table(register_table_req).await.unwrap();
catalog.register_local_table(register_table_req).unwrap();
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
@@ -574,8 +406,7 @@ mod tests {
table_name: table_name.to_string(),
};
catalog
.deregister_table(deregister_table_req)
.await
.deregister_local_table(deregister_table_req)
.unwrap();
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
@@ -603,20 +434,16 @@ mod tests {
table_id: 0,
table: NumbersTable::table(0),
};
catalog
.clone()
.register_catalog(catalog_name.clone())
.await
.unwrap();
catalog.register_schema(schema).await.unwrap();
catalog.register_table(table).await.unwrap();
catalog.register_local_catalog(&catalog_name).unwrap();
catalog.register_local_schema(schema).unwrap();
catalog.register_local_table(table).unwrap();
let request = DeregisterSchemaRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
};
assert!(catalog.deregister_schema(request).await.unwrap());
assert!(catalog.deregister_local_schema(request).unwrap());
assert!(!catalog
.schema_exist(&catalog_name, &schema_name)
.await

View File

@@ -20,7 +20,7 @@ use common_catalog::consts::{
SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME,
};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, warn};
use common_telemetry::debug;
use common_time::util;
use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef};
use datatypes::schema::{ColumnSchema, RawSchema};
@@ -34,11 +34,9 @@ use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest, Table
use table::TableRef;
use crate::error::{
self, CreateSystemCatalogSnafu, DeregisterTableSnafu, EmptyValueSnafu, Error,
InsertCatalogRecordSnafu, InvalidEntryTypeSnafu, InvalidKeySnafu, OpenSystemCatalogSnafu,
Result, ValueDeserializeSnafu,
self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InsertCatalogRecordSnafu,
InvalidEntryTypeSnafu, InvalidKeySnafu, OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu,
};
use crate::DeregisterTableRequest;
pub const ENTRY_TYPE_INDEX: usize = 0;
pub const KEY_INDEX: usize = 1;
@@ -104,30 +102,6 @@ impl SystemCatalogTable {
.context(InsertCatalogRecordSnafu)
}
pub(crate) async fn deregister_table(
&self,
request: &DeregisterTableRequest,
table_id: TableId,
) -> Result<()> {
let deletion_request = build_table_deletion_request(request, table_id);
self.0
.insert(deletion_request)
.await
.map(|x| {
if x != 1 {
let table = common_catalog::format_full_table_name(
&request.catalog,
&request.schema,
&request.table_name
);
warn!("Failed to delete table record from information_schema, unexpected returned result: {x}, table: {table}");
}
})
.with_context(|_| DeregisterTableSnafu {
request: request.clone(),
})
}
pub async fn register_schema(&self, catalog: String, schema: String) -> Result<usize> {
let insert_request = build_schema_insert_request(catalog, schema);
self.0
@@ -232,24 +206,6 @@ pub fn build_table_insert_request(
)
}
pub(crate) fn build_table_deletion_request(
request: &DeregisterTableRequest,
table_id: TableId,
) -> InsertRequest {
let entry_key = format_table_entry_key(&request.catalog, &request.schema, table_id);
build_insert_request(
EntryType::Table,
entry_key.as_bytes(),
serde_json::to_string(&TableEntryValue {
table_name: "".to_string(),
engine: "".to_string(),
is_deleted: true,
})
.unwrap()
.as_bytes(),
)
}
fn build_primary_key_columns(entry_type: EntryType, key: &[u8]) -> HashMap<String, VectorRef> {
HashMap::from([
(
@@ -614,21 +570,5 @@ mod tests {
is_deleted: false,
});
assert_eq!(entry, expected);
catalog_table
.deregister_table(
&DeregisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "my_table".to_string(),
},
1,
)
.await
.unwrap();
let records = catalog_table.records().await.unwrap();
let batches = RecordBatches::try_collect(records).await.unwrap().take();
assert_eq!(batches.len(), 1);
}
}

View File

@@ -19,7 +19,6 @@ use std::sync::Arc;
use table::metadata::TableId;
use crate::system::SystemCatalogTable;
use crate::DeregisterTableRequest;
pub struct InformationSchema {
pub system: Arc<SystemCatalogTable>,
@@ -53,17 +52,6 @@ impl SystemCatalog {
.await
}
pub(crate) async fn deregister_table(
&self,
request: &DeregisterTableRequest,
table_id: TableId,
) -> crate::error::Result<()> {
self.information_schema
.system
.deregister_table(request, table_id)
.await
}
pub async fn register_schema(
&self,
catalog: String,

View File

@@ -1,175 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod tests {
use std::sync::Arc;
use catalog::local::LocalCatalogManager;
use catalog::{CatalogManager, RegisterTableRequest, RenameTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::{error, info};
use common_test_util::temp_dir::TempDir;
use mito::config::EngineConfig;
use table::engine::manager::MemoryTableEngineManager;
use table::table::numbers::NumbersTable;
use tokio::sync::Mutex;
async fn create_local_catalog_manager(
) -> Result<(TempDir, LocalCatalogManager), catalog::error::Error> {
let (dir, object_store) =
mito::table::test_util::new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(mito::table::test_util::MockMitoEngine::new(
EngineConfig::default(),
mito::table::test_util::MockEngine::default(),
object_store,
));
let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone()));
let catalog_manager = LocalCatalogManager::try_new(engine_manager).await.unwrap();
catalog_manager.start().await?;
Ok((dir, catalog_manager))
}
#[tokio::test]
async fn test_rename_table() {
common_telemetry::init_default_ut_logging();
let (_dir, catalog_manager) = create_local_catalog_manager().await.unwrap();
// register table
let table_name = "test_table";
let table_id = 42;
let request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
table: NumbersTable::table(table_id),
};
assert!(catalog_manager.register_table(request).await.unwrap());
// rename table
let new_table_name = "table_t";
let rename_table_req = RenameTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
new_table_name: new_table_name.to_string(),
table_id,
};
assert!(catalog_manager
.rename_table(rename_table_req)
.await
.unwrap());
let registered_table = catalog_manager
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap()
.unwrap();
assert_eq!(registered_table.table_info().ident.table_id, table_id);
}
#[tokio::test]
async fn test_duplicate_register() {
let (_dir, catalog_manager) = create_local_catalog_manager().await.unwrap();
let request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "test_table".to_string(),
table_id: 42,
table: NumbersTable::table(42),
};
assert!(catalog_manager
.register_table(request.clone())
.await
.unwrap());
// register table with same table id will succeed with 0 as return val.
assert!(!catalog_manager.register_table(request).await.unwrap());
let err = catalog_manager
.register_table(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "test_table".to_string(),
table_id: 43,
table: NumbersTable::table(43),
})
.await
.unwrap_err();
assert!(
err.to_string()
.contains("Table `greptime.public.test_table` already exists"),
"Actual error message: {err}",
);
}
#[test]
fn test_concurrent_register() {
common_telemetry::init_default_ut_logging();
let rt = Arc::new(tokio::runtime::Builder::new_multi_thread().build().unwrap());
let (_dir, catalog_manager) =
rt.block_on(async { create_local_catalog_manager().await.unwrap() });
let catalog_manager = Arc::new(catalog_manager);
let succeed = Arc::new(Mutex::new(None));
let mut handles = Vec::with_capacity(8);
for i in 0..8 {
let catalog = catalog_manager.clone();
let succeed = succeed.clone();
let handle = rt.spawn(async move {
let table_id = 42 + i;
let table = NumbersTable::table(table_id);
let table_info = table.table_info();
let req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "test_table".to_string(),
table_id,
table,
};
match catalog.register_table(req).await {
Ok(res) => {
if res {
let mut succeed = succeed.lock().await;
info!("Successfully registered table: {}", table_id);
*succeed = Some(table_info);
}
}
Err(_) => {
error!("Failed to register table {}", table_id);
}
}
});
handles.push(handle);
}
rt.block_on(async move {
for handle in handles {
handle.await.unwrap();
}
let guard = succeed.lock().await;
let table_info = guard.as_ref().unwrap();
let table_registered = catalog_manager
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "test_table")
.await
.unwrap()
.unwrap();
assert_eq!(
table_registered.table_info().ident.table_id,
table_info.ident.table_id
);
});
}
}

View File

@@ -25,7 +25,7 @@ use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use crate::error::{self, IllegalAuthConfigSnafu, Result, StartCatalogManagerSnafu};
use crate::error::{self, IllegalAuthConfigSnafu, Result};
use crate::options::{Options, TopLevelOptions};
pub struct Instance {
@@ -34,12 +34,6 @@ pub struct Instance {
impl Instance {
pub async fn start(&mut self) -> Result<()> {
self.frontend
.catalog_manager()
.start()
.await
.context(StartCatalogManagerSnafu)?;
self.frontend
.start()
.await

View File

@@ -126,12 +126,6 @@ pub enum Error {
#[snafu(display("Incorrect internal state: {}", state))]
IncorrectInternalState { state: String, location: Location },
#[snafu(display("Failed to create catalog list, source: {}", source))]
NewCatalog {
location: Location,
source: catalog::error::Error,
},
#[snafu(display("Catalog not found: {}", name))]
CatalogNotFound { name: String, location: Location },
@@ -583,7 +577,7 @@ impl ErrorExt for Error {
HandleHeartbeatResponse { source, .. } => source.status_code(),
DecodeLogicalPlan { source, .. } => source.status_code(),
NewCatalog { source, .. } | RegisterSchema { source, .. } => source.status_code(),
RegisterSchema { source, .. } => source.status_code(),
CreateTable { source, .. } => source.status_code(),
DropTable { source, .. } => source.status_code(),
FlushTable { source, .. } => source.status_code(),

View File

@@ -21,10 +21,11 @@ use catalog::error::{
TableMetadataManagerSnafu,
};
use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES};
use catalog::local::MemoryCatalogManager;
use catalog::remote::KvCacheInvalidatorRef;
use catalog::{
CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
RegisterTableRequest, RenameTableRequest,
RegisterTableRequest,
};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
@@ -50,13 +51,44 @@ use table::TableRef;
use crate::table::DistTable;
// There are two sources for finding a table: the `local_catalog_manager` and the
// `table_metadata_manager`.
//
// The `local_catalog_manager` is for storing tables that are often transparent, not saving any
// real data. For example, our system tables, the `numbers` table and the "information_schema"
// table.
//
// The `table_metadata_manager`, on the other hand, is for storing tables that are created by users,
// obviously.
//
// For now, separating the two makes the code simpler, at least in the retrieval site. Now we have
// `numbers` and `information_schema` system tables. Both have their special implementations. If we
// put them with other ordinary tables that are created by users, we need to check the table name
// to decide which `TableRef` to return. Like this:
//
// ```rust
// match table_name {
// "numbers" => ... // return NumbersTable impl
// "information_schema" => ... // return InformationSchemaTable impl
// _ => .. // return DistTable impl
// }
// ```
//
// On the other hand, because we use `MemoryCatalogManager` for system tables, we can easily store
// and retrieve the concrete implementation of the system tables by their names, no more "if-else"s.
//
// However, if the system table is designed to have more features in the future, we may revisit
// the implementation here.
#[derive(Clone)]
pub struct FrontendCatalogManager {
backend: KvBackendRef,
// TODO(LFC): Maybe use a real implementation for Standalone mode.
// Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend
// is implemented by RaftEngine. Maybe we need a cache for it?
backend_cache_invalidator: KvCacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
table_metadata_manager: TableMetadataManagerRef,
datanode_manager: DatanodeManagerRef,
local_catalog_manager: Arc<MemoryCatalogManager>,
}
#[async_trait::async_trait]
@@ -105,18 +137,14 @@ impl FrontendCatalogManager {
datanode_manager: DatanodeManagerRef,
) -> Self {
Self {
backend: backend.clone(),
backend_cache_invalidator,
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
datanode_manager,
local_catalog_manager: MemoryCatalogManager::new(),
}
}
pub fn backend(&self) -> KvBackendRef {
self.backend.clone()
}
pub fn partition_manager(&self) -> PartitionRuleManagerRef {
self.partition_manager.clone()
}
@@ -136,45 +164,34 @@ impl FrontendCatalogManager {
}
}
// FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting
// as soon as it's stable: https://github.com/rust-lang/rust/issues/65991
#[async_trait::async_trait]
impl CatalogManager for FrontendCatalogManager {
async fn start(&self) -> catalog::error::Result<()> {
fn register_local_catalog(&self, name: &str) -> CatalogResult<bool> {
self.local_catalog_manager.register_catalog(name)
}
fn register_local_table(&self, request: RegisterTableRequest) -> CatalogResult<bool> {
self.local_catalog_manager.register_table(request)
}
fn deregister_local_table(&self, _request: DeregisterTableRequest) -> CatalogResult<()> {
Ok(())
}
async fn register_catalog(self: Arc<Self>, _name: String) -> CatalogResult<bool> {
unimplemented!("FrontendCatalogManager does not support registering catalog")
}
// TODO(LFC): Handle the table caching in (de)register_table.
async fn register_table(&self, _request: RegisterTableRequest) -> CatalogResult<bool> {
Ok(true)
}
async fn deregister_table(&self, _request: DeregisterTableRequest) -> CatalogResult<()> {
Ok(())
}
async fn register_schema(
fn register_local_schema(
&self,
_request: RegisterSchemaRequest,
) -> catalog::error::Result<bool> {
unimplemented!("FrontendCatalogManager does not support registering schema")
}
async fn deregister_schema(
fn deregister_local_schema(
&self,
_request: DeregisterSchemaRequest,
) -> catalog_err::Result<bool> {
unimplemented!("FrontendCatalogManager does not support deregistering schema")
}
async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result<bool> {
unimplemented!()
}
async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
let stream = self
.table_metadata_manager

View File

@@ -28,6 +28,7 @@ use api::v1::meta::Role;
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::local::manager::SystemTableInitializer;
use catalog::remote::CachedMetaKvBackend;
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
@@ -78,14 +79,16 @@ use sql::statements::copy::CopyTable;
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use table::engine::manager::MemoryTableEngineManager;
use self::distributed::DistRegionRequestHandler;
use self::standalone::{StandaloneRegionRequestHandler, StandaloneTableMetadataCreator};
use crate::catalog::FrontendCatalogManager;
use crate::delete::Deleter;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu,
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
self, CatalogSnafu, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu,
MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result,
SqlExecInterceptedSnafu,
};
use crate::expr_factory::CreateExprFactory;
use crate::frontend::FrontendOptions;
@@ -159,11 +162,11 @@ impl Instance {
datanode_clients.clone(),
));
let dist_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(dist_request_handler),
Some(region_request_handler.clone()),
true,
plugins.clone(),
)
@@ -421,6 +424,14 @@ impl FrontendInstance for Instance {
heartbeat_task.start().await?;
}
let initializer = SystemTableInitializer::try_new(
Arc::new(MemoryTableEngineManager::new_empty()),
self.catalog_manager.clone(),
)
.await
.context(CatalogSnafu)?;
initializer.init().await.context(CatalogSnafu)?;
self.script_executor.start(self).await?;
futures::future::try_join_all(self.servers.values().map(start_server))

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{column_def, AlterExpr, CreateTableExpr, TruncateTableExpr};
use catalog::{CatalogManagerRef, DeregisterTableRequest, RegisterTableRequest};
use catalog::CatalogManagerRef;
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
@@ -34,7 +34,7 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use partition::partition::{PartitionBound, PartitionDef};
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use sql::ast::Value as SqlValue;
use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, Partitions};
@@ -46,7 +46,7 @@ use table::TableRef;
use super::StatementExecutor;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistSnafu,
DeserializePartitionSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
};
use crate::table::DistTable;
@@ -121,23 +121,6 @@ impl StatementExecutor {
let table = DistTable::table(table_info);
let request = RegisterTableRequest {
catalog: table_name.catalog_name.clone(),
schema: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
table_id,
table: table.clone(),
};
ensure!(
self.catalog_manager
.register_table(request)
.await
.context(CatalogSnafu)?,
TableAlreadyExistSnafu {
table: table_name.to_string()
}
);
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate_table(
@@ -173,16 +156,6 @@ impl StatementExecutor {
let engine = table.table_info().meta.engine.to_string();
self.drop_table_procedure(&table_name, table_id).await?;
let request = DeregisterTableRequest {
catalog: table_name.catalog_name.clone(),
schema: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
self.catalog_manager
.deregister_table(request)
.await
.context(CatalogSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate_table(

View File

@@ -1432,14 +1432,13 @@ mod test {
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(catalog_list
.register_table(RegisterTableRequest {
.register_local_table(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
table_id: 1024,
table,
})
.await
.is_ok());
DfTableSourceProvider::new(catalog_list, false, QueryContext::arc().as_ref())
}

View File

@@ -515,7 +515,7 @@ mod tests {
table_id: NUMBERS_TABLE_ID,
table: NumbersTable::table(NUMBERS_TABLE_ID),
};
let _ = catalog_manager.register_table(req).await.unwrap();
catalog_manager.register_local_table(req).unwrap();
QueryEngineFactory::new(catalog_manager, None, false).query_engine()
}

View File

@@ -380,14 +380,13 @@ mod test {
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(catalog_list
.register_table(RegisterTableRequest {
.register_local_table(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
table_id: 1024,
table,
})
.await
.is_ok());
QueryEngineFactory::new(catalog_list, None, false).query_engine()
}

View File

@@ -112,7 +112,7 @@ fn catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
table_id: NUMBERS_TABLE_ID,
table: NumbersTable::table(NUMBERS_TABLE_ID),
};
let _ = catalog_manager.register_table_sync(req).unwrap();
let _ = catalog_manager.register_table(req).unwrap();
Ok(catalog_manager)
}

View File

@@ -104,7 +104,7 @@ fn create_test_engine() -> TimeRangeTester {
table_id: table.table_info().ident.table_id,
table: table.clone(),
};
let _ = catalog_manager.register_table_sync(req).unwrap();
let _ = catalog_manager.register_table(req).unwrap();
let engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
TimeRangeTester { engine, filter }

View File

@@ -160,59 +160,22 @@ impl ScriptManager {
#[cfg(test)]
mod tests {
use catalog::CatalogManager;
use common_config::WalConfig;
use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use mito::table::test_util::new_test_object_store;
use catalog::local::MemoryCatalogManager;
use query::QueryEngineFactory;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::manager::MemoryTableEngineManager;
use super::*;
type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogStore>>;
#[ignore = "script engine is temporary disabled"]
#[tokio::test]
async fn test_insert_find_compile_script() {
let wal_dir = create_temp_dir("test_insert_find_compile_script_wal");
let wal_dir_str = wal_dir.path().to_string_lossy().to_string();
common_telemetry::init_default_ut_logging();
let (_dir, object_store) = new_test_object_store("test_insert_find_compile_script").await;
let log_store = RaftEngineLogStore::try_new(wal_dir_str, WalConfig::default())
.await
.unwrap();
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let mock_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(log_store),
object_store.clone(),
compaction_scheduler,
)
.unwrap(),
object_store,
));
let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone()));
let catalog_manager = Arc::new(
catalog::local::LocalCatalogManager::try_new(engine_manager)
.await
.unwrap(),
);
let catalog_manager = MemoryCatalogManager::new();
let factory = QueryEngineFactory::new(catalog_manager.clone(), None, false);
let query_engine = factory.query_engine();
let mgr = ScriptManager::new(catalog_manager.clone(), query_engine)
.await
.unwrap();
catalog_manager.start().await.unwrap();
let schema = "schema";
let name = "test";

View File

@@ -15,7 +15,7 @@
//! Procedure to alter a table.
use async_trait::async_trait;
use catalog::{CatalogManagerRef, RenameTableRequest};
use catalog::CatalogManagerRef;
use common_procedure::error::SubprocedureFailedSnafu;
use common_procedure::{
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
@@ -50,7 +50,9 @@ impl Procedure for AlterTableProcedure {
match self.data.state {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::EngineAlterTable => self.on_engine_alter_table(ctx).await,
AlterTableState::RenameInCatalog => self.on_rename_in_catalog().await,
// No more need to "rename table in catalog", because the table metadata is now stored
// in kv backend, and updated by the unified DDL procedure soon. For ordinary tables,
// catalog manager will be a readonly proxy.
}
}
@@ -214,15 +216,7 @@ impl AlterTableProcedure {
self.data.request.table_name,
sub_id
);
// The sub procedure is done, we can execute next step.
if self.data.request.is_rename_table() {
// We also need to rename the table in the catalog.
self.data.state = AlterTableState::RenameInCatalog;
Ok(Status::executing(true))
} else {
// If this isn't a rename operation, we are done.
Ok(Status::Done)
}
Ok(Status::Done)
}
ProcedureState::Failed { error } => {
// Return error if the subprocedure is failed.
@@ -232,28 +226,6 @@ impl AlterTableProcedure {
}
}
}
async fn on_rename_in_catalog(&mut self) -> Result<Status> {
// Safety: table id is available in this state.
let table_id = self.data.table_id.unwrap();
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
let rename_req = RenameTableRequest {
catalog: self.data.request.catalog_name.clone(),
schema: self.data.request.schema_name.clone(),
table_name: self.data.request.table_name.clone(),
new_table_name: new_table_name.clone(),
table_id,
};
let _ = self
.catalog_manager
.rename_table(rename_req)
.await
.map_err(Error::from_error_ext)?;
}
Ok(Status::Done)
}
}
/// Represents each step while altering a table in the datanode.
@@ -263,8 +235,6 @@ enum AlterTableState {
Prepare,
/// Alter table in the table engine.
EngineAlterTable,
/// Rename the table in the catalog (optional).
RenameInCatalog,
}
/// Serializable data of [AlterTableProcedure].
@@ -294,56 +264,3 @@ impl AlterTableData {
}
}
}
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_alter_table_procedure_rename() {
let env = TestEnv::new("rename");
let table_name = "test_old";
let table_id = env.create_table(table_name).await;
let new_table_name = "test_new";
let request = AlterTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
alter_kind: AlterKind::RenameTable {
new_table_name: new_table_name.to_string(),
},
table_version: None,
};
let TestEnv {
dir: _dir,
table_engine,
procedure_manager,
catalog_manager,
} = env;
let procedure =
AlterTableProcedure::new(request, catalog_manager.clone(), table_engine.clone());
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap();
watcher.changed().await.unwrap();
let table = catalog_manager
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap()
.unwrap();
let table_info = table.table_info();
assert_eq!(new_table_name, table_info.name);
assert!(!catalog_manager
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
}
}

View File

@@ -256,8 +256,7 @@ impl CreateTableProcedure {
};
let _ = self
.catalog_manager
.register_table(register_req)
.await
.register_local_table(register_req)
.map_err(Error::from_error_ext)?;
Ok(Status::Done)

View File

@@ -163,8 +163,7 @@ impl DropTableProcedure {
table_name: self.data.request.table_name.clone(),
};
self.catalog_manager
.deregister_table(deregister_table_req)
.await
.deregister_local_table(deregister_table_req)
.context(AccessCatalogSnafu)?;
}