diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 795ad45661..a8ed29da4b 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -5,6 +5,7 @@ wal_dir = '/tmp/greptimedb/wal' rpc_runtime_size = 8 mysql_addr = '127.0.0.1:4406' mysql_runtime_size = 4 +enable_memory_catalog = false [storage] type = 'File' diff --git a/config/standalone.example.toml b/config/standalone.example.toml index dc8b0519ab..ab728b403f 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -2,6 +2,7 @@ node_id = 0 mode = 'standalone' http_addr = '127.0.0.1:4000' wal_dir = '/tmp/greptimedb/wal/' +enable_memory_catalog = false [storage] type = 'File' diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 24ab530f4e..a5467f6f1a 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -109,6 +109,12 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Operation {} not implemented yet", operation))] + Unimplemented { + operation: String, + backtrace: Backtrace, + }, + #[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))] OpenTable { table_info: String, @@ -216,11 +222,12 @@ impl ErrorExt for Error { | Error::ValueDeserialize { .. } | Error::Io { .. } => StatusCode::StorageUnavailable, + Error::RegisterTable { .. } => StatusCode::Internal, + Error::ReadSystemCatalog { source, .. } => source.status_code(), Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } => source.status_code(), - Error::RegisterTable { .. } => StatusCode::Internal, Error::TableExists { .. } => StatusCode::TableAlreadyExists, Error::SchemaExists { .. } => StatusCode::InvalidArguments, @@ -235,6 +242,8 @@ impl ErrorExt for Error { Error::InvalidTableSchema { source, .. } => source.status_code(), Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected, Error::Internal { source, .. } => source.status_code(), + + Error::Unimplemented { .. } => StatusCode::Unsupported, } } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 941d2c2580..9f4f5908ad 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -83,12 +83,17 @@ pub trait CatalogManager: CatalogList { /// Starts a catalog manager. async fn start(&self) -> Result<()>; - /// Registers a table given given catalog/schema to catalog manager, - /// returns table registered. - async fn register_table(&self, request: RegisterTableRequest) -> Result; + /// Registers a table within given catalog/schema to catalog manager, + /// returns whether the table registered. + async fn register_table(&self, request: RegisterTableRequest) -> Result; - /// Register a schema with catalog name and schema name. - async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; + /// Deregisters a table within given catalog/schema to catalog manager, + /// returns whether the table deregistered. + async fn deregister_table(&self, request: DeregisterTableRequest) -> Result; + + /// Register a schema with catalog name and schema name. Retuens whether the + /// schema registered. + async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; /// Register a system table, should be called before starting the manager. async fn register_system_table(&self, request: RegisterSystemTableRequest) @@ -123,6 +128,14 @@ pub struct RegisterTableRequest { pub table: TableRef, } +#[derive(Clone)] +pub struct DeregisterTableRequest { + pub catalog: String, + pub schema: String, + pub table_name: String, + pub table_id: TableId, +} + #[derive(Debug, Clone)] pub struct RegisterSchemaRequest { pub catalog: String, diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index ed6783c68f..34f9d94488 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -36,7 +36,7 @@ use table::TableRef; use crate::error::{ CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, - TableExistsSnafu, TableNotFoundSnafu, + TableExistsSnafu, TableNotFoundSnafu, UnimplementedSnafu, }; use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use crate::system::{ @@ -46,8 +46,8 @@ use crate::system::{ use crate::tables::SystemCatalog; use crate::{ format_full_table_name, handle_system_table_request, CatalogList, CatalogManager, - CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, RegisterSystemTableRequest, - RegisterTableRequest, SchemaProvider, SchemaProviderRef, + CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, + RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef, }; /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. @@ -309,7 +309,7 @@ impl CatalogManager for LocalCatalogManager { self.init().await } - async fn register_table(&self, request: RegisterTableRequest) -> Result { + async fn register_table(&self, request: RegisterTableRequest) -> Result { let started = self.init_lock.lock().await; ensure!( @@ -349,10 +349,17 @@ impl CatalogManager for LocalCatalogManager { .await?; schema.register_table(request.table_name, request.table)?; - Ok(1) + Ok(true) } - async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { + async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result { + UnimplementedSnafu { + operation: "deregister table", + } + .fail() + } + + async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { let started = self.init_lock.lock().await; ensure!( *started, @@ -377,7 +384,7 @@ impl CatalogManager for LocalCatalogManager { .register_schema(request.catalog, schema_name.clone()) .await?; catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?; - Ok(1) + Ok(true) } async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index a32b29e204..8018b7ba44 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -27,8 +27,8 @@ use table::TableRef; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; use crate::schema::SchemaProvider; use crate::{ - CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, - RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef, + CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, + RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef, }; /// Simple in-memory list of catalogs @@ -69,7 +69,7 @@ impl CatalogManager for MemoryCatalogManager { Ok(()) } - async fn register_table(&self, request: RegisterTableRequest) -> Result { + async fn register_table(&self, request: RegisterTableRequest) -> Result { let catalogs = self.catalogs.write().unwrap(); let catalog = catalogs .get(&request.catalog) @@ -84,10 +84,28 @@ impl CatalogManager for MemoryCatalogManager { })?; schema .register_table(request.table_name, request.table) - .map(|v| if v.is_some() { 0 } else { 1 }) + .map(|v| v.is_none()) } - async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { + async fn deregister_table(&self, request: DeregisterTableRequest) -> Result { + let catalogs = self.catalogs.write().unwrap(); + let catalog = catalogs + .get(&request.catalog) + .context(CatalogNotFoundSnafu { + catalog_name: &request.catalog, + })? + .clone(); + let schema = catalog + .schema(&request.schema)? + .with_context(|| SchemaNotFoundSnafu { + schema_info: format!("{}.{}", &request.catalog, &request.schema), + })?; + schema + .deregister_table(&request.table_name) + .map(|v| v.is_some()) + } + + async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { let catalogs = self.catalogs.write().unwrap(); let catalog = catalogs .get(&request.catalog) @@ -95,11 +113,12 @@ impl CatalogManager for MemoryCatalogManager { catalog_name: &request.catalog, })?; catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?; - Ok(1) + Ok(true) } async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> { - unimplemented!() + // TODO(ruihang): support register system table request + Ok(()) } fn schema(&self, catalog: &str, schema: &str) -> Result> { @@ -340,4 +359,35 @@ mod tests { .downcast_ref::() .unwrap(); } + + #[tokio::test] + pub async fn test_catalog_deregister_table() { + let catalog = MemoryCatalogManager::default(); + let schema = catalog + .schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .unwrap() + .unwrap(); + + let register_table_req = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "numbers".to_string(), + table_id: 2333, + table: Arc::new(NumbersTable::default()), + }; + catalog.register_table(register_table_req).await.unwrap(); + assert!(schema.table_exist("numbers").unwrap()); + + let deregister_table_req = DeregisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "numbers".to_string(), + table_id: 2333, + }; + catalog + .deregister_table(deregister_table_req) + .await + .unwrap(); + assert!(!schema.table_exist("numbers").unwrap()); + } } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 5369f6ce0d..19ffa8e192 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -37,13 +37,13 @@ use tokio::sync::Mutex; use crate::error::{ CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu, - OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, + OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu, }; use crate::remote::{Kv, KvBackendRef}; use crate::{ handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, - RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, - SchemaProviderRef, + DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, + RegisterTableRequest, SchemaProvider, SchemaProviderRef, }; /// Catalog manager based on metasrv. @@ -411,7 +411,7 @@ impl CatalogManager for RemoteCatalogManager { Ok(()) } - async fn register_table(&self, request: RegisterTableRequest) -> Result { + async fn register_table(&self, request: RegisterTableRequest) -> Result { let catalog_name = request.catalog; let schema_name = request.schema; let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu { @@ -430,10 +430,17 @@ impl CatalogManager for RemoteCatalogManager { .fail(); } schema_provider.register_table(request.table_name, request.table)?; - Ok(1) + Ok(true) } - async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { + async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result { + UnimplementedSnafu { + operation: "deregister table", + } + .fail() + } + + async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { let catalog_name = request.catalog; let schema_name = request.schema; let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu { @@ -441,7 +448,7 @@ impl CatalogManager for RemoteCatalogManager { })?; let schema_provider = self.new_schema_provider(&catalog_name, &schema_name); catalog_provider.register_schema(schema_name, schema_provider)?; - Ok(1) + Ok(true) } async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index b43fd09889..e5d8811e71 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -202,7 +202,7 @@ mod tests { table_id, table, }; - assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap()); + assert!(catalog_manager.register_table(reg_req).await.unwrap()); assert_eq!( HashSet::from([table_name, "numbers".to_string()]), default_schema @@ -287,7 +287,7 @@ mod tests { .register_schema(schema_name.clone(), schema.clone()) .expect("Register schema should not fail"); assert!(prev.is_none()); - assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap()); + assert!(catalog_manager.register_table(reg_req).await.unwrap()); assert_eq!( HashSet::from([schema_name.clone()]), diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 39b0b14fa6..917d32dfe0 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -71,6 +71,7 @@ pub struct StandaloneOptions { pub mode: Mode, pub wal_dir: String, pub storage: ObjectStoreConfig, + pub enable_memory_catalog: bool, } impl Default for StandaloneOptions { @@ -86,6 +87,7 @@ impl Default for StandaloneOptions { mode: Mode::Standalone, wal_dir: "/tmp/greptimedb/wal".to_string(), storage: ObjectStoreConfig::default(), + enable_memory_catalog: false, } } } @@ -110,6 +112,7 @@ impl StandaloneOptions { DatanodeOptions { wal_dir: self.wal_dir, storage: self.storage, + enable_memory_catalog: self.enable_memory_catalog, ..Default::default() } } @@ -131,18 +134,22 @@ struct StartCommand { influxdb_enable: bool, #[clap(short, long)] config_file: Option, + #[clap(short = 'm', long = "memory-catalog")] + enable_memory_catalog: bool, } impl StartCommand { async fn run(self) -> Result<()> { + let enable_memory_catalog = self.enable_memory_catalog; let config_file = self.config_file.clone(); let fe_opts = FrontendOptions::try_from(self)?; let dn_opts: DatanodeOptions = { - let opts: StandaloneOptions = if let Some(path) = config_file { + let mut opts: StandaloneOptions = if let Some(path) = config_file { toml_loader::from_file!(&path)? } else { StandaloneOptions::default() }; + opts.enable_memory_catalog = enable_memory_catalog; opts.datanode_options() }; @@ -264,6 +271,7 @@ mod tests { std::env::current_dir().unwrap().as_path().to_str().unwrap() )), influxdb_enable: false, + enable_memory_catalog: false, }; let fe_opts = FrontendOptions::try_from(cmd).unwrap(); diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 89cb34eda2..7e1b935a00 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -47,6 +47,7 @@ pub struct DatanodeOptions { pub meta_client_opts: Option, pub wal_dir: String, pub storage: ObjectStoreConfig, + pub enable_memory_catalog: bool, pub mode: Mode, } @@ -61,6 +62,7 @@ impl Default for DatanodeOptions { meta_client_opts: None, wal_dir: "/tmp/greptimedb/wal".to_string(), storage: ObjectStoreConfig::default(), + enable_memory_catalog: false, mode: Mode::Standalone, } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index dd94c9afc1..c6217ee4c8 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -100,17 +100,29 @@ impl Instance { // create remote catalog manager let (catalog_manager, factory, table_id_provider) = match opts.mode { Mode::Standalone => { - let catalog = Arc::new( - catalog::local::LocalCatalogManager::try_new(table_engine.clone()) - .await - .context(CatalogSnafu)?, - ); - let factory = QueryEngineFactory::new(catalog.clone()); - ( - catalog.clone() as CatalogManagerRef, - factory, - Some(catalog as TableIdProviderRef), - ) + if opts.enable_memory_catalog { + let catalog = Arc::new(catalog::local::MemoryCatalogManager::default()); + let factory = QueryEngineFactory::new(catalog.clone()); + + ( + catalog.clone() as CatalogManagerRef, + factory, + Some(catalog as TableIdProviderRef), + ) + } else { + let catalog = Arc::new( + catalog::local::LocalCatalogManager::try_new(table_engine.clone()) + .await + .context(CatalogSnafu)?, + ); + let factory = QueryEngineFactory::new(catalog.clone()); + + ( + catalog.clone() as CatalogManagerRef, + factory, + Some(catalog as TableIdProviderRef), + ) + } } Mode::Distributed => { diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 2e5d7b64d4..e2fcbd0cc2 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -19,8 +19,9 @@ use std::sync::Arc; use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu}; use catalog::remote::{Kv, KvBackendRef}; use catalog::{ - CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, - RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef, + CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, + RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, + SchemaProviderRef, }; use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue}; use futures::StreamExt; @@ -65,17 +66,21 @@ impl CatalogManager for FrontendCatalogManager { Ok(()) } - async fn register_table( + async fn register_table(&self, _request: RegisterTableRequest) -> catalog::error::Result { + unimplemented!() + } + + async fn deregister_table( &self, - _request: RegisterTableRequest, - ) -> catalog::error::Result { + _request: DeregisterTableRequest, + ) -> catalog::error::Result { unimplemented!() } async fn register_schema( &self, _request: RegisterSchemaRequest, - ) -> catalog::error::Result { + ) -> catalog::error::Result { unimplemented!() }