mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
feat: deregister table for MemoryCatalogManager (#620)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -5,6 +5,7 @@ wal_dir = '/tmp/greptimedb/wal'
|
||||
rpc_runtime_size = 8
|
||||
mysql_addr = '127.0.0.1:4406'
|
||||
mysql_runtime_size = 4
|
||||
enable_memory_catalog = false
|
||||
|
||||
[storage]
|
||||
type = 'File'
|
||||
|
||||
@@ -2,6 +2,7 @@ node_id = 0
|
||||
mode = 'standalone'
|
||||
http_addr = '127.0.0.1:4000'
|
||||
wal_dir = '/tmp/greptimedb/wal/'
|
||||
enable_memory_catalog = false
|
||||
|
||||
[storage]
|
||||
type = 'File'
|
||||
|
||||
@@ -109,6 +109,12 @@ pub enum Error {
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Operation {} not implemented yet", operation))]
|
||||
Unimplemented {
|
||||
operation: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))]
|
||||
OpenTable {
|
||||
table_info: String,
|
||||
@@ -216,11 +222,12 @@ impl ErrorExt for Error {
|
||||
| Error::ValueDeserialize { .. }
|
||||
| Error::Io { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
Error::RegisterTable { .. } => StatusCode::Internal,
|
||||
|
||||
Error::ReadSystemCatalog { source, .. } => source.status_code(),
|
||||
Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(),
|
||||
Error::InvalidCatalogValue { source, .. } => source.status_code(),
|
||||
|
||||
Error::RegisterTable { .. } => StatusCode::Internal,
|
||||
Error::TableExists { .. } => StatusCode::TableAlreadyExists,
|
||||
Error::SchemaExists { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
@@ -235,6 +242,8 @@ impl ErrorExt for Error {
|
||||
Error::InvalidTableSchema { source, .. } => source.status_code(),
|
||||
Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected,
|
||||
Error::Internal { source, .. } => source.status_code(),
|
||||
|
||||
Error::Unimplemented { .. } => StatusCode::Unsupported,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -83,12 +83,17 @@ pub trait CatalogManager: CatalogList {
|
||||
/// Starts a catalog manager.
|
||||
async fn start(&self) -> Result<()>;
|
||||
|
||||
/// Registers a table given given catalog/schema to catalog manager,
|
||||
/// returns table registered.
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize>;
|
||||
/// Registers a table within given catalog/schema to catalog manager,
|
||||
/// returns whether the table registered.
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool>;
|
||||
|
||||
/// Register a schema with catalog name and schema name.
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize>;
|
||||
/// Deregisters a table within given catalog/schema to catalog manager,
|
||||
/// returns whether the table deregistered.
|
||||
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool>;
|
||||
|
||||
/// Register a schema with catalog name and schema name. Retuens whether the
|
||||
/// schema registered.
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
|
||||
|
||||
/// Register a system table, should be called before starting the manager.
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest)
|
||||
@@ -123,6 +128,14 @@ pub struct RegisterTableRequest {
|
||||
pub table: TableRef,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DeregisterTableRequest {
|
||||
pub catalog: String,
|
||||
pub schema: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RegisterSchemaRequest {
|
||||
pub catalog: String,
|
||||
|
||||
@@ -36,7 +36,7 @@ use table::TableRef;
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result,
|
||||
SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu,
|
||||
TableExistsSnafu, TableNotFoundSnafu,
|
||||
TableExistsSnafu, TableNotFoundSnafu, UnimplementedSnafu,
|
||||
};
|
||||
use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use crate::system::{
|
||||
@@ -46,8 +46,8 @@ use crate::system::{
|
||||
use crate::tables::SystemCatalog;
|
||||
use crate::{
|
||||
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
|
||||
CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, RegisterSystemTableRequest,
|
||||
RegisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
|
||||
@@ -309,7 +309,7 @@ impl CatalogManager for LocalCatalogManager {
|
||||
self.init().await
|
||||
}
|
||||
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
|
||||
let started = self.init_lock.lock().await;
|
||||
|
||||
ensure!(
|
||||
@@ -349,10 +349,17 @@ impl CatalogManager for LocalCatalogManager {
|
||||
.await?;
|
||||
|
||||
schema.register_table(request.table_name, request.table)?;
|
||||
Ok(1)
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize> {
|
||||
async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
|
||||
UnimplementedSnafu {
|
||||
operation: "deregister table",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
|
||||
let started = self.init_lock.lock().await;
|
||||
ensure!(
|
||||
*started,
|
||||
@@ -377,7 +384,7 @@ impl CatalogManager for LocalCatalogManager {
|
||||
.register_schema(request.catalog, schema_name.clone())
|
||||
.await?;
|
||||
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
|
||||
Ok(1)
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
|
||||
|
||||
@@ -27,8 +27,8 @@ use table::TableRef;
|
||||
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
|
||||
use crate::schema::SchemaProvider;
|
||||
use crate::{
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef,
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
|
||||
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// Simple in-memory list of catalogs
|
||||
@@ -69,7 +69,7 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
|
||||
let catalogs = self.catalogs.write().unwrap();
|
||||
let catalog = catalogs
|
||||
.get(&request.catalog)
|
||||
@@ -84,10 +84,28 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
})?;
|
||||
schema
|
||||
.register_table(request.table_name, request.table)
|
||||
.map(|v| if v.is_some() { 0 } else { 1 })
|
||||
.map(|v| v.is_none())
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize> {
|
||||
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
|
||||
let catalogs = self.catalogs.write().unwrap();
|
||||
let catalog = catalogs
|
||||
.get(&request.catalog)
|
||||
.context(CatalogNotFoundSnafu {
|
||||
catalog_name: &request.catalog,
|
||||
})?
|
||||
.clone();
|
||||
let schema = catalog
|
||||
.schema(&request.schema)?
|
||||
.with_context(|| SchemaNotFoundSnafu {
|
||||
schema_info: format!("{}.{}", &request.catalog, &request.schema),
|
||||
})?;
|
||||
schema
|
||||
.deregister_table(&request.table_name)
|
||||
.map(|v| v.is_some())
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
|
||||
let catalogs = self.catalogs.write().unwrap();
|
||||
let catalog = catalogs
|
||||
.get(&request.catalog)
|
||||
@@ -95,11 +113,12 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
catalog_name: &request.catalog,
|
||||
})?;
|
||||
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
|
||||
Ok(1)
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> {
|
||||
unimplemented!()
|
||||
// TODO(ruihang): support register system table request
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
|
||||
@@ -340,4 +359,35 @@ mod tests {
|
||||
.downcast_ref::<MemoryCatalogManager>()
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_catalog_deregister_table() {
|
||||
let catalog = MemoryCatalogManager::default();
|
||||
let schema = catalog
|
||||
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let register_table_req = RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "numbers".to_string(),
|
||||
table_id: 2333,
|
||||
table: Arc::new(NumbersTable::default()),
|
||||
};
|
||||
catalog.register_table(register_table_req).await.unwrap();
|
||||
assert!(schema.table_exist("numbers").unwrap());
|
||||
|
||||
let deregister_table_req = DeregisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "numbers".to_string(),
|
||||
table_id: 2333,
|
||||
};
|
||||
catalog
|
||||
.deregister_table(deregister_table_req)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!schema.table_exist("numbers").unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,13 +37,13 @@ use tokio::sync::Mutex;
|
||||
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu,
|
||||
OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu,
|
||||
OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
|
||||
};
|
||||
use crate::remote::{Kv, KvBackendRef};
|
||||
use crate::{
|
||||
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
|
||||
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider,
|
||||
SchemaProviderRef,
|
||||
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
|
||||
RegisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// Catalog manager based on metasrv.
|
||||
@@ -411,7 +411,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
|
||||
let catalog_name = request.catalog;
|
||||
let schema_name = request.schema;
|
||||
let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu {
|
||||
@@ -430,10 +430,17 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
.fail();
|
||||
}
|
||||
schema_provider.register_table(request.table_name, request.table)?;
|
||||
Ok(1)
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize> {
|
||||
async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
|
||||
UnimplementedSnafu {
|
||||
operation: "deregister table",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
|
||||
let catalog_name = request.catalog;
|
||||
let schema_name = request.schema;
|
||||
let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu {
|
||||
@@ -441,7 +448,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
})?;
|
||||
let schema_provider = self.new_schema_provider(&catalog_name, &schema_name);
|
||||
catalog_provider.register_schema(schema_name, schema_provider)?;
|
||||
Ok(1)
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
|
||||
|
||||
@@ -202,7 +202,7 @@ mod tests {
|
||||
table_id,
|
||||
table,
|
||||
};
|
||||
assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap());
|
||||
assert!(catalog_manager.register_table(reg_req).await.unwrap());
|
||||
assert_eq!(
|
||||
HashSet::from([table_name, "numbers".to_string()]),
|
||||
default_schema
|
||||
@@ -287,7 +287,7 @@ mod tests {
|
||||
.register_schema(schema_name.clone(), schema.clone())
|
||||
.expect("Register schema should not fail");
|
||||
assert!(prev.is_none());
|
||||
assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap());
|
||||
assert!(catalog_manager.register_table(reg_req).await.unwrap());
|
||||
|
||||
assert_eq!(
|
||||
HashSet::from([schema_name.clone()]),
|
||||
|
||||
@@ -71,6 +71,7 @@ pub struct StandaloneOptions {
|
||||
pub mode: Mode,
|
||||
pub wal_dir: String,
|
||||
pub storage: ObjectStoreConfig,
|
||||
pub enable_memory_catalog: bool,
|
||||
}
|
||||
|
||||
impl Default for StandaloneOptions {
|
||||
@@ -86,6 +87,7 @@ impl Default for StandaloneOptions {
|
||||
mode: Mode::Standalone,
|
||||
wal_dir: "/tmp/greptimedb/wal".to_string(),
|
||||
storage: ObjectStoreConfig::default(),
|
||||
enable_memory_catalog: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -110,6 +112,7 @@ impl StandaloneOptions {
|
||||
DatanodeOptions {
|
||||
wal_dir: self.wal_dir,
|
||||
storage: self.storage,
|
||||
enable_memory_catalog: self.enable_memory_catalog,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -131,18 +134,22 @@ struct StartCommand {
|
||||
influxdb_enable: bool,
|
||||
#[clap(short, long)]
|
||||
config_file: Option<String>,
|
||||
#[clap(short = 'm', long = "memory-catalog")]
|
||||
enable_memory_catalog: bool,
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
let enable_memory_catalog = self.enable_memory_catalog;
|
||||
let config_file = self.config_file.clone();
|
||||
let fe_opts = FrontendOptions::try_from(self)?;
|
||||
let dn_opts: DatanodeOptions = {
|
||||
let opts: StandaloneOptions = if let Some(path) = config_file {
|
||||
let mut opts: StandaloneOptions = if let Some(path) = config_file {
|
||||
toml_loader::from_file!(&path)?
|
||||
} else {
|
||||
StandaloneOptions::default()
|
||||
};
|
||||
opts.enable_memory_catalog = enable_memory_catalog;
|
||||
opts.datanode_options()
|
||||
};
|
||||
|
||||
@@ -264,6 +271,7 @@ mod tests {
|
||||
std::env::current_dir().unwrap().as_path().to_str().unwrap()
|
||||
)),
|
||||
influxdb_enable: false,
|
||||
enable_memory_catalog: false,
|
||||
};
|
||||
|
||||
let fe_opts = FrontendOptions::try_from(cmd).unwrap();
|
||||
|
||||
@@ -47,6 +47,7 @@ pub struct DatanodeOptions {
|
||||
pub meta_client_opts: Option<MetaClientOpts>,
|
||||
pub wal_dir: String,
|
||||
pub storage: ObjectStoreConfig,
|
||||
pub enable_memory_catalog: bool,
|
||||
pub mode: Mode,
|
||||
}
|
||||
|
||||
@@ -61,6 +62,7 @@ impl Default for DatanodeOptions {
|
||||
meta_client_opts: None,
|
||||
wal_dir: "/tmp/greptimedb/wal".to_string(),
|
||||
storage: ObjectStoreConfig::default(),
|
||||
enable_memory_catalog: false,
|
||||
mode: Mode::Standalone,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,17 +100,29 @@ impl Instance {
|
||||
// create remote catalog manager
|
||||
let (catalog_manager, factory, table_id_provider) = match opts.mode {
|
||||
Mode::Standalone => {
|
||||
let catalog = Arc::new(
|
||||
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
|
||||
.await
|
||||
.context(CatalogSnafu)?,
|
||||
);
|
||||
let factory = QueryEngineFactory::new(catalog.clone());
|
||||
(
|
||||
catalog.clone() as CatalogManagerRef,
|
||||
factory,
|
||||
Some(catalog as TableIdProviderRef),
|
||||
)
|
||||
if opts.enable_memory_catalog {
|
||||
let catalog = Arc::new(catalog::local::MemoryCatalogManager::default());
|
||||
let factory = QueryEngineFactory::new(catalog.clone());
|
||||
|
||||
(
|
||||
catalog.clone() as CatalogManagerRef,
|
||||
factory,
|
||||
Some(catalog as TableIdProviderRef),
|
||||
)
|
||||
} else {
|
||||
let catalog = Arc::new(
|
||||
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
|
||||
.await
|
||||
.context(CatalogSnafu)?,
|
||||
);
|
||||
let factory = QueryEngineFactory::new(catalog.clone());
|
||||
|
||||
(
|
||||
catalog.clone() as CatalogManagerRef,
|
||||
factory,
|
||||
Some(catalog as TableIdProviderRef),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Mode::Distributed => {
|
||||
|
||||
@@ -19,8 +19,9 @@ use std::sync::Arc;
|
||||
use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu};
|
||||
use catalog::remote::{Kv, KvBackendRef};
|
||||
use catalog::{
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
|
||||
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider,
|
||||
SchemaProviderRef,
|
||||
};
|
||||
use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue};
|
||||
use futures::StreamExt;
|
||||
@@ -65,17 +66,21 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn register_table(
|
||||
async fn register_table(&self, _request: RegisterTableRequest) -> catalog::error::Result<bool> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn deregister_table(
|
||||
&self,
|
||||
_request: RegisterTableRequest,
|
||||
) -> catalog::error::Result<usize> {
|
||||
_request: DeregisterTableRequest,
|
||||
) -> catalog::error::Result<bool> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn register_schema(
|
||||
&self,
|
||||
_request: RegisterSchemaRequest,
|
||||
) -> catalog::error::Result<usize> {
|
||||
) -> catalog::error::Result<bool> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user