feat: add deregister_schema to CatalogManager (#1911)

* feat: add deregister_schema to CatalogManager

* refactor: MemoryCatalogManager::deregister_schema

* fix: typo

* fix: typo
This commit is contained in:
Niwaka
2023-07-10 10:59:14 +09:00
committed by GitHub
parent f20b5695b8
commit 195dfdc5d3
5 changed files with 109 additions and 12 deletions

View File

@@ -59,6 +59,9 @@ pub trait CatalogManager: Send + Sync {
/// This method will/should fail if catalog not exist /// This method will/should fail if catalog not exist
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>; async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
/// Deregisters a database within given catalog/schema to catalog manager
async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;
/// Registers a table within given catalog/schema to catalog manager, /// Registers a table within given catalog/schema to catalog manager,
/// returns whether the table registered. /// returns whether the table registered.
/// ///
@@ -149,6 +152,12 @@ pub struct DeregisterTableRequest {
pub table_name: String, pub table_name: String,
} }
#[derive(Debug, Clone)]
pub struct DeregisterSchemaRequest {
pub catalog: String,
pub schema: String,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RegisterSchemaRequest { pub struct RegisterSchemaRequest {
pub catalog: String, pub catalog: String,

View File

@@ -41,7 +41,7 @@ use crate::error::{
self, CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, self, CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu,
Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu,
SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, TableNotExistSnafu, SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, TableNotExistSnafu,
TableNotFoundSnafu, TableNotFoundSnafu, UnimplementedSnafu,
}; };
use crate::information_schema::InformationSchemaProvider; use crate::information_schema::InformationSchemaProvider;
use crate::local::memory::MemoryCatalogManager; use crate::local::memory::MemoryCatalogManager;
@@ -51,8 +51,9 @@ use crate::system::{
}; };
use crate::tables::SystemCatalog; use crate::tables::SystemCatalog;
use crate::{ use crate::{
handle_system_table_request, CatalogManager, CatalogManagerRef, DeregisterTableRequest, handle_system_table_request, CatalogManager, CatalogManagerRef, DeregisterSchemaRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, RenameTableRequest,
}; };
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
@@ -516,6 +517,13 @@ impl CatalogManager for LocalCatalogManager {
} }
} }
async fn deregister_schema(&self, _request: DeregisterSchemaRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister schema",
}
.fail()
}
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
self.check_state().await?; self.check_state().await?;

View File

@@ -29,8 +29,8 @@ use crate::error::{
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu, CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
}; };
use crate::{ use crate::{
CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
RegisterTableRequest, RenameTableRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
}; };
type SchemaEntries = HashMap<String, HashMap<String, TableRef>>; type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
@@ -150,6 +150,34 @@ impl CatalogManager for MemoryCatalogManager {
Ok(registered) Ok(registered)
} }
async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let schemas = catalogs
.get_mut(&request.catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?;
let table_count = schemas
.remove(&request.schema)
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?
.len();
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
table_count as f64,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
Ok(true)
}
async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> { async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> {
// TODO(ruihang): support register system table request // TODO(ruihang): support register system table request
Ok(()) Ok(())
@@ -517,4 +545,42 @@ mod tests {
.unwrap() .unwrap()
.is_none()); .is_none());
} }
#[tokio::test]
async fn test_catalog_deregister_schema() {
let catalog = MemoryCatalogManager::default();
// Registers a catalog, a schema, and a table.
let catalog_name = "foo_catalog".to_string();
let schema_name = "foo_schema".to_string();
let table_name = "foo_table".to_string();
let schema = RegisterSchemaRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
};
let table = RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name,
table_id: 0,
table: Arc::new(NumbersTable::default()),
};
catalog
.register_catalog(catalog_name.clone())
.await
.unwrap();
catalog.register_schema(schema).await.unwrap();
catalog.register_table(table).await.unwrap();
let request = DeregisterSchemaRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
};
assert!(catalog.deregister_schema(request).await.unwrap());
assert!(!catalog
.schema_exist(&catalog_name, &schema_name)
.await
.unwrap());
}
} }

View File

@@ -34,7 +34,7 @@ use tokio::sync::Mutex;
use crate::error::{ use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu,
ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu, ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu,
TableMetadataManagerSnafu, TableMetadataManagerSnafu, UnimplementedSnafu,
}; };
use crate::helper::{ use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, build_catalog_prefix, build_schema_prefix, build_table_global_prefix,
@@ -43,8 +43,8 @@ use crate::helper::{
}; };
use crate::remote::region_alive_keeper::RegionAliveKeepers; use crate::remote::region_alive_keeper::RegionAliveKeepers;
use crate::{ use crate::{
handle_system_table_request, CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, handle_system_table_request, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest,
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
}; };
/// Catalog manager based on metasrv. /// Catalog manager based on metasrv.
@@ -726,6 +726,13 @@ impl CatalogManager for RemoteCatalogManager {
Ok(true) Ok(true)
} }
async fn deregister_schema(&self, _request: DeregisterSchemaRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister schema",
}
.fail()
}
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> { async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
let catalog_name = request.catalog.clone(); let catalog_name = request.catalog.clone();
let schema_name = request.schema.clone(); let schema_name = request.schema.clone();

View File

@@ -28,8 +28,8 @@ use catalog::helper::{
use catalog::information_schema::InformationSchemaProvider; use catalog::information_schema::InformationSchemaProvider;
use catalog::remote::KvCacheInvalidatorRef; use catalog::remote::KvCacheInvalidatorRef;
use catalog::{ use catalog::{
CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
RegisterTableRequest, RenameTableRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
}; };
use client::client_manager::DatanodeClients; use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME};
@@ -130,7 +130,7 @@ impl CatalogManager for FrontendCatalogManager {
} }
async fn register_catalog(&self, _name: String) -> CatalogResult<bool> { async fn register_catalog(&self, _name: String) -> CatalogResult<bool> {
unimplemented!("FrontendCatalogManager does not support register catalog") unimplemented!("FrontendCatalogManager does not support registering catalog")
} }
// TODO(LFC): Handle the table caching in (de)register_table. // TODO(LFC): Handle the table caching in (de)register_table.
@@ -151,7 +151,14 @@ impl CatalogManager for FrontendCatalogManager {
&self, &self,
_request: RegisterSchemaRequest, _request: RegisterSchemaRequest,
) -> catalog::error::Result<bool> { ) -> catalog::error::Result<bool> {
unimplemented!("FrontendCatalogManager does not support register schema") unimplemented!("FrontendCatalogManager does not support registering schema")
}
async fn deregister_schema(
&self,
_request: DeregisterSchemaRequest,
) -> catalog_err::Result<bool> {
unimplemented!("FrontendCatalogManager does not support deregistering schema")
} }
async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result<bool> { async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result<bool> {