From 5b012a1f67ae3e85de8461bd737a400521740072 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 14 Aug 2023 11:08:43 +0800 Subject: [PATCH] feat!: switch to new catalog/schema key (#2140) * feat!: switch to new catalog/schema key * chore: apply suggestions from CR --- src/catalog/src/error.rs | 17 ++ src/catalog/src/remote/manager.rs | 39 ++--- src/catalog/tests/remote_catalog_tests.rs | 71 ++------ src/cmd/src/cli.rs | 2 + src/common/meta/src/helper.rs | 2 + src/common/meta/src/key.rs | 18 +- src/common/meta/src/key/catalog_name.rs | 31 ++++ src/common/meta/src/key/schema_name.rs | 30 ++++ src/common/meta/src/lib.rs | 2 + src/datanode/src/instance.rs | 1 - src/frontend/src/catalog.rs | 108 +++++------- src/frontend/src/instance/distributed.rs | 47 +++--- .../src/instance/distributed/inserter.rs | 33 ++-- src/meta-srv/src/error.rs | 17 ++ src/meta-srv/src/metadata_service.rs | 100 +++++------ src/meta-srv/src/metasrv/builder.rs | 5 +- src/meta-srv/src/mocks.rs | 9 +- src/meta-srv/src/service/admin.rs | 4 +- src/meta-srv/src/service/admin/meta.rs | 158 ++++-------------- tests-integration/src/cluster.rs | 9 +- 20 files changed, 311 insertions(+), 392 deletions(-) diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 46cd6d82ed..c947a9d0f7 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -27,6 +27,19 @@ use crate::DeregisterTableRequest; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to list catalogs, source: {}", source))] + ListCatalogs { + location: Location, + source: BoxedError, + }, + + #[snafu(display("Failed to list {}'s schemas, source: {}", catalog, source))] + ListSchemas { + location: Location, + catalog: String, + source: BoxedError, + }, + #[snafu(display( "Failed to re-compile script due to internal error, source: {}", source @@ -284,6 +297,10 @@ impl ErrorExt for Error { StatusCode::InvalidArguments } + Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => { + source.status_code() + } + Error::OpenSystemCatalog { source, .. } | Error::CreateSystemCatalog { source, .. } | Error::InsertCatalogRecord { source, .. } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 0bfae4ea31..53e97c5ca6 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -17,11 +17,11 @@ use std::sync::Arc; use async_trait::async_trait; use common_catalog::consts::MITO_ENGINE; -use common_meta::helper::{CatalogKey, SchemaKey}; use common_meta::ident::TableIdent; +use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::datanode_table::DatanodeTableValue; +use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::TableMetadataManagerRef; -use common_meta::kv_backend::KvBackendRef; use common_telemetry::{error, info, warn}; use metrics::increment_gauge; use snafu::{ensure, OptionExt, ResultExt}; @@ -45,7 +45,6 @@ use crate::{ /// Catalog manager based on metasrv. pub struct RemoteCatalogManager { node_id: u64, - backend: KvBackendRef, engine_manager: TableEngineManagerRef, system_table_requests: Mutex>, region_alive_keepers: Arc, @@ -57,14 +56,12 @@ impl RemoteCatalogManager { pub fn new( engine_manager: TableEngineManagerRef, node_id: u64, - backend: KvBackendRef, region_alive_keepers: Arc, table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { engine_manager, node_id, - backend, system_table_requests: Default::default(), region_alive_keepers, memory_catalog_manager: MemoryCatalogManager::with_default_setup(), @@ -110,13 +107,6 @@ impl RemoteCatalogManager { .context(ParallelOpenTableSnafu)?; Ok(()) } - - fn build_schema_key(&self, catalog_name: String, schema_name: String) -> SchemaKey { - SchemaKey { - catalog_name, - schema_name, - } - } } async fn open_and_register_table( @@ -323,16 +313,12 @@ impl CatalogManager for RemoteCatalogManager { return Ok(true); } - let key = self - .build_schema_key(catalog.to_string(), schema.to_string()) - .to_string(); let remote_schema_exists = self - .backend - .get(key.as_bytes()) + .table_metadata_manager + .schema_manager() + .exist(SchemaNameKey::new(catalog, schema)) .await - .context(TableMetadataManagerSnafu)? - .is_some(); - + .context(TableMetadataManagerSnafu)?; // Create schema locally if remote schema exists. Since local schema is managed by memory // catalog manager, creating a local schema is relatively cheap (just a HashMap). // Besides, if this method ("schema_exist) is called, it's very likely that someone wants to @@ -368,16 +354,13 @@ impl CatalogManager for RemoteCatalogManager { return Ok(true); } - let key = CatalogKey { - catalog_name: catalog.to_string(), - }; - + let key = CatalogNameKey::new(catalog); let remote_catalog_exists = self - .backend - .get(key.to_string().as_bytes()) + .table_metadata_manager + .catalog_manager() + .exist(key) .await - .context(TableMetadataManagerSnafu)? - .is_some(); + .context(TableMetadataManagerSnafu)?; // Create catalog locally if remote catalog exists. Since local catalog is managed by memory // catalog manager, creating a local catalog is relatively cheap (just a HashMap). diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 11b352d513..bc48b3c9a6 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -21,7 +21,6 @@ mod tests { use std::sync::Arc; use std::time::Duration; - use catalog::error::Error; use catalog::remote::mock::MockTableEngine; use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; @@ -29,12 +28,13 @@ mod tests { use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE, }; - use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; + use common_meta::helper::CatalogValue; use common_meta::ident::TableIdent; + use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackend; - use common_meta::rpc::store::{CompareAndPutRequest, PutRequest, RangeRequest}; + use common_meta::rpc::store::{CompareAndPutRequest, PutRequest}; use datatypes::schema::RawSchema; use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef}; use table::engine::{EngineContext, TableEngineRef}; @@ -54,79 +54,43 @@ mod tests { } } - #[tokio::test] - async fn test_backend() { - let backend = MemoryKvBackend::::default(); - - let default_catalog_key = CatalogKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - } - .to_string(); - let req = PutRequest::new() - .with_key(default_catalog_key.as_bytes()) - .with_value(CatalogValue.as_bytes().unwrap()); - backend.put(req).await.unwrap(); - - let schema_key = SchemaKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - } - .to_string(); - let req = PutRequest::new() - .with_key(schema_key.as_bytes()) - .with_value(SchemaValue.as_bytes().unwrap()); - backend.put(req).await.unwrap(); - - let req = RangeRequest::new().with_prefix(b"__c-".to_vec()); - let res = backend - .range(req) - .await - .unwrap() - .kvs - .into_iter() - .map(|kv| String::from_utf8_lossy(kv.key()).to_string()); - assert_eq!( - vec!["__c-greptime".to_string()], - res.into_iter().collect::>() - ); - } - #[tokio::test] async fn test_cached_backend() { let backend = CachedMetaKvBackend::wrap(Arc::new(MemoryKvBackend::default())); - let default_catalog_key = CatalogKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - } - .to_string(); + let default_catalog_key = CatalogNameKey::new(DEFAULT_CATALOG_NAME).to_string(); + let req = PutRequest::new() .with_key(default_catalog_key.as_bytes()) .with_value(CatalogValue.as_bytes().unwrap()); backend.put(req).await.unwrap(); - let ret = backend.get(b"__c-greptime").await.unwrap(); + let ret = backend.get(b"__catalog_name/greptime").await.unwrap(); let _ = ret.unwrap(); let req = CompareAndPutRequest::new() - .with_key(b"__c-greptime".to_vec()) + .with_key(b"__catalog_name/greptime".to_vec()) .with_expect(CatalogValue.as_bytes().unwrap()) .with_value(b"123".to_vec()); let _ = backend.compare_and_put(req).await.unwrap(); - let ret = backend.get(b"__c-greptime").await.unwrap(); + let ret = backend.get(b"__catalog_name/greptime").await.unwrap(); assert_eq!(b"123", ret.as_ref().unwrap().value.as_slice()); let req = PutRequest::new() - .with_key(b"__c-greptime".to_vec()) + .with_key(b"__catalog_name/greptime".to_vec()) .with_value(b"1234".to_vec()); let _ = backend.put(req).await; - let ret = backend.get(b"__c-greptime").await.unwrap(); + let ret = backend.get(b"__catalog_name/greptime").await.unwrap(); assert_eq!(b"1234", ret.unwrap().value.as_slice()); - backend.delete(b"__c-greptime", false).await.unwrap(); + backend + .delete(b"__catalog_name/greptime", false) + .await + .unwrap(); - let ret = backend.get(b"__c-greptime").await.unwrap(); + let ret = backend.get(b"__catalog_name/greptime").await.unwrap(); assert!(ret.is_none()); } @@ -134,12 +98,12 @@ mod tests { let backend = Arc::new(MemoryKvBackend::default()); let req = PutRequest::new() - .with_key(b"__c-greptime".to_vec()) + .with_key(b"__catalog_name/greptime".to_vec()) .with_value(b"".to_vec()); backend.put(req).await.unwrap(); let req = PutRequest::new() - .with_key(b"__s-greptime-public".to_vec()) + .with_key(b"__schema_name/greptime-public".to_vec()) .with_value(b"".to_vec()); backend.put(req).await.unwrap(); @@ -156,7 +120,6 @@ mod tests { let catalog_manager = RemoteCatalogManager::new( engine_manager.clone(), node_id, - cached_backend.clone(), region_alive_keepers.clone(), Arc::new(TableMetadataManager::new(cached_backend)), ); diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 09356d0020..6649ec1d78 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -16,6 +16,8 @@ mod bench; mod cmd; mod helper; mod repl; +// TODO(weny): Removes it +#[allow(deprecated)] mod upgrade; use async_trait::async_trait; diff --git a/src/common/meta/src/helper.rs b/src/common/meta/src/helper.rs index a2dbad814d..52b8a3b1a9 100644 --- a/src/common/meta/src/helper.rs +++ b/src/common/meta/src/helper.rs @@ -67,6 +67,7 @@ impl TableGlobalValue { } } +#[deprecated(since = "0.4.0", note = "Please use the CatalogNameKey instead")] pub struct CatalogKey { pub catalog_name: String, } @@ -95,6 +96,7 @@ impl CatalogKey { #[derive(Debug, Serialize, Deserialize)] pub struct CatalogValue; +#[deprecated(since = "0.4.0", note = "Please use the SchemaNameKey instead")] pub struct SchemaKey { pub catalog_name: String, pub schema_name: String, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 6326ced7a6..c0ff12b263 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -65,8 +65,8 @@ use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; use table_region::{TableRegionKey, TableRegionManager, TableRegionValue}; -use self::catalog_name::CatalogNameValue; -use self::schema_name::SchemaNameValue; +use self::catalog_name::{CatalogManager, CatalogNameValue}; +use self::schema_name::{SchemaManager, SchemaNameValue}; use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX}; use crate::kv_backend::KvBackendRef; @@ -125,6 +125,8 @@ pub struct TableMetadataManager { table_info_manager: TableInfoManager, table_region_manager: TableRegionManager, datanode_table_manager: DatanodeTableManager, + catalog_manager: CatalogManager, + schema_manager: SchemaManager, } impl TableMetadataManager { @@ -133,7 +135,9 @@ impl TableMetadataManager { table_name_manager: TableNameManager::new(kv_backend.clone()), table_info_manager: TableInfoManager::new(kv_backend.clone()), table_region_manager: TableRegionManager::new(kv_backend.clone()), - datanode_table_manager: DatanodeTableManager::new(kv_backend), + datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()), + catalog_manager: CatalogManager::new(kv_backend.clone()), + schema_manager: SchemaManager::new(kv_backend), } } @@ -152,6 +156,14 @@ impl TableMetadataManager { pub fn datanode_table_manager(&self) -> &DatanodeTableManager { &self.datanode_table_manager } + + pub fn catalog_manager(&self) -> &CatalogManager { + &self.catalog_manager + } + + pub fn schema_manager(&self) -> &SchemaManager { + &self.schema_manager + } } macro_rules! impl_table_meta_key { diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index f5d8a5194c..c6c727c0af 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -15,6 +15,7 @@ use std::fmt::Display; use std::sync::Arc; +use common_catalog::consts::DEFAULT_CATALOG_NAME; use futures::stream::BoxStream; use futures::StreamExt; use serde::{Deserialize, Serialize}; @@ -32,6 +33,14 @@ pub struct CatalogNameKey<'a> { pub catalog: &'a str, } +impl<'a> Default for CatalogNameKey<'a> { + fn default() -> Self { + Self { + catalog: DEFAULT_CATALOG_NAME, + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct CatalogNameValue; @@ -103,6 +112,12 @@ impl CatalogManager { Ok(()) } + pub async fn exist(&self, catalog: CatalogNameKey<'_>) -> Result { + let raw_key = catalog.as_raw_key(); + + Ok(self.kv_backend.get(&raw_key).await?.is_some()) + } + pub async fn catalog_names(&self) -> BoxStream<'static, Result> { let start_key = CatalogNameKey::range_start_key(); let req = RangeRequest::new().with_prefix(start_key.as_bytes()); @@ -121,6 +136,7 @@ impl CatalogManager { #[cfg(test)] mod tests { use super::*; + use crate::kv_backend::memory::MemoryKvBackend; #[test] fn test_serialization() { @@ -132,4 +148,19 @@ mod tests { assert_eq!(key, parsed); } + + #[tokio::test] + async fn test_key_exist() { + let manager = CatalogManager::new(Arc::new(MemoryKvBackend::default())); + + let catalog_key = CatalogNameKey::new("my-catalog"); + + manager.create(catalog_key).await.unwrap(); + + assert!(manager.exist(catalog_key).await.unwrap()); + + let wrong_catalog_key = CatalogNameKey::new("my-wrong"); + + assert!(!manager.exist(wrong_catalog_key).await.unwrap()); + } } diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 2317e73e38..2b0508920f 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -15,6 +15,7 @@ use std::fmt::Display; use std::sync::Arc; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use futures::stream::BoxStream; use futures::StreamExt; use serde::{Deserialize, Serialize}; @@ -33,6 +34,15 @@ pub struct SchemaNameKey<'a> { pub schema: &'a str, } +impl<'a> Default for SchemaNameKey<'a> { + fn default() -> Self { + Self { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_SCHEMA_NAME, + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct SchemaNameValue; @@ -109,6 +119,12 @@ impl SchemaManager { Ok(()) } + pub async fn exist(&self, schema: SchemaNameKey<'_>) -> Result { + let raw_key = schema.as_raw_key(); + + Ok(self.kv_backend.get(&raw_key).await?.is_some()) + } + /// Returns a schema stream, it lists all schemas belong to the target `catalog`. pub async fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result> { let start_key = SchemaNameKey::range_start_key(catalog); @@ -128,6 +144,7 @@ impl SchemaManager { #[cfg(test)] mod tests { use super::*; + use crate::kv_backend::memory::MemoryKvBackend; #[test] fn test_serialization() { @@ -139,4 +156,17 @@ mod tests { assert_eq!(key, parsed); } + + #[tokio::test] + async fn test_key_exist() { + let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default())); + let schema_key = SchemaNameKey::new("my-catalog", "my-schema"); + manager.create(schema_key).await.unwrap(); + + assert!(manager.exist(schema_key).await.unwrap()); + + let wrong_schema_key = SchemaNameKey::new("my-catalog", "my-wrong"); + + assert!(!manager.exist(wrong_schema_key).await.unwrap()); + } } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index e393ec5145..e052afeb2a 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -16,6 +16,8 @@ pub mod error; pub mod heartbeat; +// TODO(weny): Removes it +#[allow(deprecated)] pub mod helper; pub mod ident; pub mod instruction; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 589ee18519..44a0f077e2 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -260,7 +260,6 @@ impl Instance { let catalog_manager = Arc::new(RemoteCatalogManager::new( engine_manager.clone(), opts.node_id.context(MissingNodeIdSnafu)?, - kv_backend.clone(), region_alive_keepers.clone(), Arc::new(TableMetadataManager::new(kv_backend)), )); diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 9937f3716c..53fa9eee6b 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -13,13 +13,13 @@ // limitations under the License. use std::any::Any; -use std::collections::HashSet; +use std::collections::BTreeSet; use std::sync::Arc; use api::v1::CreateTableExpr; use catalog::error::{ - self as catalog_err, InternalSnafu, InvalidCatalogValueSnafu, InvalidSystemTableDefSnafu, - Result as CatalogResult, TableMetadataManagerSnafu, UnimplementedSnafu, + self as catalog_err, InternalSnafu, InvalidSystemTableDefSnafu, ListCatalogsSnafu, + ListSchemasSnafu, Result as CatalogResult, TableMetadataManagerSnafu, UnimplementedSnafu, }; use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES}; use catalog::remote::KvCacheInvalidatorRef; @@ -30,16 +30,16 @@ use catalog::{ use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME}; use common_error::ext::BoxedError; -use common_meta::helper::{build_catalog_prefix, build_schema_prefix, CatalogKey, SchemaKey}; +use common_meta::key::catalog_name::CatalogNameKey; +use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoKey; use common_meta::key::table_name::TableNameKey; use common_meta::key::table_region::TableRegionKey; use common_meta::key::{TableMetaKey, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; -use common_meta::rpc::store::RangeRequest; -use common_meta::rpc::KeyValue; use common_meta::table_name::TableName; -use common_telemetry::{debug, warn}; +use common_telemetry::debug; +use futures_util::TryStreamExt; use partition::manager::PartitionRuleManagerRef; use snafu::prelude::*; use table::metadata::TableId; @@ -95,20 +95,18 @@ impl FrontendCatalogManager { self.partition_manager.clone() } + pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } + pub fn datanode_clients(&self) -> Arc { self.datanode_clients.clone() } pub async fn invalidate_schema(&self, catalog: &str, schema: &str) { - let schema_key = SchemaKey { - catalog_name: catalog.into(), - schema_name: schema.into(), - } - .to_string(); + let key = SchemaNameKey::new(catalog, schema).as_raw_key(); - let key = schema_key.as_bytes(); - - self.backend_cache_invalidator.invalidate_key(key).await; + self.backend_cache_invalidator.invalidate_key(&key).await; } pub async fn invalidate_table( @@ -284,47 +282,36 @@ impl CatalogManager for FrontendCatalogManager { } async fn catalog_names(&self) -> CatalogResult> { - let key = build_catalog_prefix(); - let req = RangeRequest::new().with_prefix(key.as_bytes()); + let stream = self + .table_metadata_manager + .catalog_manager() + .catalog_names() + .await; - let kvs = self - .backend - .range(req) + let keys = stream + .try_collect::>() .await - .context(TableMetadataManagerSnafu)? - .kvs; + .map_err(BoxedError::new) + .context(ListCatalogsSnafu)?; - let mut res = HashSet::new(); - for KeyValue { key: k, value: _ } in kvs { - let catalog_key = String::from_utf8_lossy(&k); - if let Ok(key) = CatalogKey::parse(catalog_key.as_ref()) { - let _ = res.insert(key.catalog_name); - } else { - warn!("invalid catalog key: {:?}", catalog_key); - } - } - Ok(res.into_iter().collect()) + Ok(keys) } async fn schema_names(&self, catalog: &str) -> CatalogResult> { - let key = build_schema_prefix(catalog); - let req = RangeRequest::new().with_prefix(key.as_bytes()); - - let kvs = self - .backend - .range(req) + let stream = self + .table_metadata_manager + .schema_manager() + .schema_names(catalog) + .await; + let mut keys = stream + .try_collect::>() .await - .context(TableMetadataManagerSnafu)? - .kvs; + .map_err(BoxedError::new) + .context(ListSchemasSnafu { catalog })?; - let mut res = HashSet::new(); - res.insert(INFORMATION_SCHEMA_NAME.to_string()); - for KeyValue { key: k, value: _ } in kvs { - let key = - SchemaKey::parse(String::from_utf8_lossy(&k)).context(InvalidCatalogValueSnafu)?; - let _ = res.insert(key.schema_name); - } - Ok(res.into_iter().collect()) + keys.insert(INFORMATION_SCHEMA_NAME.to_string()); + + Ok(keys.into_iter().collect::>()) } async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult> { @@ -349,33 +336,22 @@ impl CatalogManager for FrontendCatalogManager { } async fn catalog_exist(&self, catalog: &str) -> CatalogResult { - let key = CatalogKey { - catalog_name: catalog.to_string(), - } - .to_string(); - self.backend - .get(key.as_bytes()) + self.table_metadata_manager + .catalog_manager() + .exist(CatalogNameKey::new(catalog)) .await .context(TableMetadataManagerSnafu) - .map(|x| x.is_some()) } async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult { if schema == INFORMATION_SCHEMA_NAME { return Ok(true); } - - let schema_key = SchemaKey { - catalog_name: catalog.to_string(), - schema_name: schema.to_string(), - } - .to_string(); - Ok(self - .backend() - .get(schema_key.as_bytes()) + self.table_metadata_manager + .schema_manager() + .exist(SchemaNameKey::new(catalog, schema)) .await - .context(TableMetadataManagerSnafu)? - .is_some()) + .context(TableMetadataManagerSnafu) } async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult { diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 198c00a0a6..004d923415 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -32,11 +32,10 @@ use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; -use common_meta::helper::{SchemaKey, SchemaValue}; +use common_meta::key::schema_name::SchemaNameKey; use common_meta::peer::Peer; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition, Partition as MetaPartition, RouteRequest}; -use common_meta::rpc::store::CompareAndPutRequest; use common_meta::table_name::TableName; use common_query::Output; use common_telemetry::{debug, info}; @@ -64,11 +63,10 @@ use table::TableRef; use crate::catalog::FrontendCatalogManager; use crate::error::{ - self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ColumnDataTypeSnafu, - ColumnNotFoundSnafu, DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, - RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, StartMetaClientSnafu, - TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, - UnrecognizedTableOptionSnafu, + self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, + DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, RequestDatanodeSnafu, + RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, TableNotFoundSnafu, + TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; use crate::instance::distributed::inserter::DistInserter; @@ -463,32 +461,29 @@ impl DistInstance { }; } - let key = SchemaKey { - catalog_name: catalog.to_owned(), - schema_name: expr.database_name.clone(), - }; - let value = SchemaValue {}; - let client = self - .meta_client - .store_client() - .context(StartMetaClientSnafu)?; - - let request = CompareAndPutRequest::new() - .with_key(key.to_string()) - .with_value(value.as_bytes().context(CatalogEntrySerdeSnafu)?); - - let response = client - .compare_and_put(request.into()) + let schema = SchemaNameKey::new(catalog, &expr.database_name); + let exist = self + .catalog_manager + .table_metadata_manager_ref() + .schema_manager() + .exist(schema) .await - .context(RequestMetaSnafu)?; + .context(error::TableMetadataManagerSnafu)?; ensure!( - response.success, + !exist, SchemaExistsSnafu { - name: key.schema_name + name: schema.to_string(), } ); + self.catalog_manager + .table_metadata_manager_ref() + .schema_manager() + .create(schema) + .await + .context(error::TableMetadataManagerSnafu)?; + // Since the database created on meta does not go through KvBackend, so we manually // invalidate the cache here. // diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index da2f54276f..dbbaf4b514 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -179,13 +179,13 @@ mod tests { use api::v1::{Column, ColumnDataType, InsertRequest as GrpcInsertRequest, SemanticType}; use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; + use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey}; + use common_meta::key::schema_name::{SchemaManager, SchemaNameKey}; use common_meta::key::table_name::TableNameKey; use common_meta::key::table_region::RegionDistribution; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::{KvBackend, KvBackendRef}; - use common_meta::rpc::store::PutRequest; + use common_meta::kv_backend::KvBackendRef; use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema}; use datatypes::vectors::Int32Vector; @@ -198,24 +198,17 @@ mod tests { async fn prepare_mocked_backend() -> KvBackendRef { let backend = Arc::new(MemoryKvBackend::default()); - let default_catalog = CatalogKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - } - .to_string(); - let req = PutRequest::new() - .with_key(default_catalog.as_bytes()) - .with_value(CatalogValue.as_bytes().unwrap()); - backend.put(req).await.unwrap(); + let catalog_manager = CatalogManager::new(backend.clone()); + let schema_manager = SchemaManager::new(backend.clone()); - let default_schema = SchemaKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - } - .to_string(); - let req = PutRequest::new() - .with_key(default_schema.as_bytes()) - .with_value(SchemaValue.as_bytes().unwrap()); - backend.put(req).await.unwrap(); + catalog_manager + .create(CatalogNameKey::default()) + .await + .unwrap(); + schema_manager + .create(SchemaNameKey::default()) + .await + .unwrap(); backend } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index fb0107987e..e0c7a17530 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -26,6 +26,19 @@ use crate::pubsub::Message; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to list catalogs: {}", source))] + ListCatalogs { + location: Location, + source: BoxedError, + }, + + #[snafu(display("Failed to list {}'s schemas: {}", catalog, source))] + ListSchemas { + location: Location, + catalog: String, + source: BoxedError, + }, + #[snafu(display("Failed to join a future: {}", source))] Join { location: Location, @@ -563,6 +576,10 @@ impl ErrorExt for Error { source.status_code() } + Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => { + source.status_code() + } + Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted, Error::RegisterProcedureLoader { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/metadata_service.rs b/src/meta-srv/src/metadata_service.rs index 137b6942d9..d17d7d99f4 100644 --- a/src/meta-srv/src/metadata_service.rs +++ b/src/meta-srv/src/metadata_service.rs @@ -15,15 +15,15 @@ use std::sync::Arc; use async_trait::async_trait; -use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; -use common_meta::rpc::store::CompareAndPutRequest; +use common_meta::key::catalog_name::CatalogNameKey; +use common_meta::key::schema_name::SchemaNameKey; +use common_meta::key::TableMetadataManagerRef; use common_telemetry::{info, timer}; use metrics::increment_counter; use snafu::{ensure, ResultExt}; use crate::error; use crate::error::Result; -use crate::service::store::kv::KvStoreRef; /// This trait defines some methods of metadata #[async_trait] @@ -43,12 +43,14 @@ pub type MetadataServiceRef = Arc; #[derive(Clone)] pub struct DefaultMetadataService { - kv_store: KvStoreRef, + table_metadata_manager: TableMetadataManagerRef, } impl DefaultMetadataService { - pub fn new(kv_store: KvStoreRef) -> Self { - Self { kv_store } + pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { + Self { + table_metadata_manager, + } } } @@ -61,52 +63,40 @@ impl MetadataService for DefaultMetadataService { if_not_exist: bool, ) -> Result<()> { let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA); - let kv_store = self.kv_store.clone(); - let catalog_key = CatalogKey { - catalog_name: catalog_name.to_string(), - } - .to_string(); + self.table_metadata_manager + .catalog_manager() + .create(CatalogNameKey::new(catalog_name)) + .await + .context(error::TableMetadataManagerSnafu)?; - let schema_key = SchemaKey { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - } - .to_string(); + increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); + info!("Successfully created a catalog: {}", catalog_name); - let req = CompareAndPutRequest { - key: catalog_key.into(), - expect: vec![], - value: CatalogValue {} - .as_bytes() - .context(error::InvalidCatalogValueSnafu)?, - }; + let schema = SchemaNameKey::new(catalog_name, schema_name); - let resp = kv_store.compare_and_put(req).await?; - - if resp.success { - increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); - info!("Successfully created a catalog: {}", catalog_name); - } - - let req = CompareAndPutRequest { - key: schema_key.into(), - expect: vec![], - value: SchemaValue {} - .as_bytes() - .context(error::InvalidCatalogValueSnafu)?, - }; - let resp = kv_store.compare_and_put(req).await?; - - if resp.success { - info!("Successfully created a schema: {}", schema_name); - } + let exist = self + .table_metadata_manager + .schema_manager() + .exist(schema) + .await + .context(error::TableMetadataManagerSnafu)?; ensure!( - resp.success || if_not_exist, + !exist || if_not_exist, error::SchemaAlreadyExistsSnafu { schema_name } ); + if !exist { + self.table_metadata_manager + .schema_manager() + .create(schema) + .await + .context(error::TableMetadataManagerSnafu)?; + + info!("Successfully created a schema: {}", schema_name); + } + Ok(()) } @@ -119,16 +109,21 @@ impl MetadataService for DefaultMetadataService { mod tests { use std::sync::Arc; - use common_meta::helper::{CatalogKey, SchemaKey}; + use common_meta::key::catalog_name::CatalogNameKey; + use common_meta::key::schema_name::SchemaNameKey; + use common_meta::key::{TableMetaKey, TableMetadataManager}; use super::{DefaultMetadataService, MetadataService}; - use crate::service::store::kv::KvStoreRef; + use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; use crate::service::store::memory::MemStore; #[tokio::test] async fn test_create_schema() { let kv_store = Arc::new(MemStore::default()); - let service = DefaultMetadataService::new(kv_store.clone()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))); + let service = DefaultMetadataService::new(table_metadata_manager); service .create_schema("catalog", "public", false) @@ -147,22 +142,13 @@ mod tests { } async fn verify_result(kv_store: KvStoreRef) { - let key: Vec = CatalogKey { - catalog_name: "catalog".to_string(), - } - .to_string() - .into(); + let key = CatalogNameKey::new("catalog").as_raw_key(); let result = kv_store.get(&key).await.unwrap(); let kv = result.unwrap(); assert_eq!(key, kv.key()); - let key: Vec = SchemaKey { - catalog_name: "catalog".to_string(), - schema_name: "public".to_string(), - } - .to_string() - .into(); + let key = SchemaNameKey::new("catalog", "public").as_raw_key(); let result = kv_store.get(&key).await.unwrap(); let kv = result.unwrap(); diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 8b76a537e3..ef29bfa609 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -166,8 +166,11 @@ impl MetaSrvBuilder { let mailbox = build_mailbox(&kv_store, &pushers); let procedure_manager = build_procedure_manager(&options, &kv_store); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))); let metadata_service = metadata_service - .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); + .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(table_metadata_manager))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let table_metadata_manager = build_table_metadata_manager(&kv_store); let ddl_manager = build_ddl_manager( diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index df901c5ea4..d7393ba048 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -22,13 +22,14 @@ use api::v1::meta::store_server::StoreServer; use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::key::TableMetadataManager; use tower::service_fn; use crate::metadata_service::{DefaultMetadataService, MetadataService}; use crate::metasrv::builder::MetaSrvBuilder; use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use crate::service::store::etcd::EtcdStore; -use crate::service::store::kv::KvStoreRef; +use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; use crate::service::store::memory::MemStore; #[derive(Clone)] @@ -60,8 +61,10 @@ pub async fn mock( datanode_clients: Option>, ) -> MockInfo { let server_addr = opts.server_addr.clone(); - - let metadata_service = DefaultMetadataService::new(kv_store.clone()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))); + let metadata_service = DefaultMetadataService::new(table_metadata_manager); metadata_service .create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true) diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 0aae00b260..e0366fa19d 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -50,14 +50,14 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let router = router.route( "/catalogs", meta::CatalogsHandler { - kv_store: meta_srv.kv_store().clone(), + table_metadata_manager: meta_srv.table_metadata_manager().clone(), }, ); let router = router.route( "/schemas", meta::SchemasHandler { - kv_store: meta_srv.kv_store().clone(), + table_metadata_manager: meta_srv.table_metadata_manager().clone(), }, ); diff --git a/src/meta-srv/src/service/admin/meta.rs b/src/meta-srv/src/service/admin/meta.rs index 522cbefc14..fbcc918ae4 100644 --- a/src/meta-srv/src/service/admin/meta.rs +++ b/src/meta-srv/src/service/admin/meta.rs @@ -14,25 +14,23 @@ use std::collections::HashMap; -use common_meta::helper::{build_catalog_prefix, build_schema_prefix}; +use common_error::ext::BoxedError; use common_meta::key::table_name::TableNameKey; use common_meta::key::TableMetadataManagerRef; -use common_meta::rpc::store::{RangeRequest, RangeResponse}; -use common_meta::util; +use futures::TryStreamExt; use snafu::{OptionExt, ResultExt}; use tonic::codegen::http; use crate::error; use crate::error::{Result, TableMetadataManagerSnafu}; use crate::service::admin::HttpHandler; -use crate::service::store::kv::KvStoreRef; pub struct CatalogsHandler { - pub kv_store: KvStoreRef, + pub table_metadata_manager: TableMetadataManagerRef, } pub struct SchemasHandler { - pub kv_store: KvStoreRef, + pub table_metadata_manager: TableMetadataManagerRef, } pub struct TablesHandler { @@ -46,7 +44,19 @@ pub struct TableHandler { #[async_trait::async_trait] impl HttpHandler for CatalogsHandler { async fn handle(&self, _: &str, _: &HashMap) -> Result> { - get_http_response_by_prefix(build_catalog_prefix(), &self.kv_store).await + let stream = self + .table_metadata_manager + .catalog_manager() + .catalog_names() + .await; + + let keys = stream + .try_collect::>() + .await + .map_err(BoxedError::new) + .context(error::ListCatalogsSnafu)?; + + to_http_response(keys) } } @@ -62,7 +72,19 @@ impl HttpHandler for SchemasHandler { .context(error::MissingRequiredParameterSnafu { param: "catalog_name", })?; - get_http_response_by_prefix(build_schema_prefix(catalog), &self.kv_store).await + let stream = self + .table_metadata_manager + .schema_manager() + .schema_names(catalog) + .await; + + let keys = stream + .try_collect::>() + .await + .map_err(BoxedError::new) + .context(error::ListSchemasSnafu { catalog })?; + + to_http_response(keys) } } @@ -160,15 +182,6 @@ impl HttpHandler for TableHandler { } } -/// Get kv_store's key list with http response format by prefix key -async fn get_http_response_by_prefix( - key_prefix: String, - kv_store: &KvStoreRef, -) -> Result> { - let keys = get_keys_by_prefix(key_prefix, kv_store).await?; - to_http_response(keys) -} - fn to_http_response(keys: Vec) -> Result> { let body = serde_json::to_string(&keys).context(error::SerializeToJsonSnafu { input: format!("{keys:?}"), @@ -179,114 +192,3 @@ fn to_http_response(keys: Vec) -> Result> { .body(body) .context(error::InvalidHttpBodySnafu) } - -/// Get kv_store's key list by prefix key -async fn get_keys_by_prefix(key_prefix: String, kv_store: &KvStoreRef) -> Result> { - let key_prefix_u8 = key_prefix.clone().into_bytes(); - let range_end = util::get_prefix_end_key(&key_prefix_u8); - let req = RangeRequest { - key: key_prefix_u8, - range_end, - keys_only: true, - ..Default::default() - }; - - let response: RangeResponse = kv_store.range(req).await?; - - let kvs = response.kvs; - let mut values = Vec::with_capacity(kvs.len()); - for kv in kvs { - let value = String::from_utf8(kv.key).context(error::InvalidUtf8ValueSnafu)?; - let split_list = value.split(&key_prefix).collect::>(); - if let Some(v) = split_list.get(1) { - values.push(v.to_string()); - } - } - Ok(values) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use common_meta::helper::{build_catalog_prefix, build_schema_prefix, CatalogKey, SchemaKey}; - use common_meta::key::table_name::TableNameKey; - use common_meta::key::TableMetaKey; - use common_meta::rpc::store::PutRequest; - - use crate::service::admin::meta::get_keys_by_prefix; - use crate::service::store::kv::KvStoreRef; - use crate::service::store::memory::MemStore; - - #[tokio::test] - async fn test_get_list_by_prefix() { - let in_mem = Arc::new(MemStore::new()) as KvStoreRef; - let catalog_name = "test_catalog"; - let schema_name = "test_schema"; - let table_name = "test_table"; - let catalog = CatalogKey { - catalog_name: catalog_name.to_string(), - }; - assert!(in_mem - .put(PutRequest { - key: catalog.to_string().as_bytes().to_vec(), - value: "".as_bytes().to_vec(), - prev_kv: false, - }) - .await - .is_ok()); - - let schema = SchemaKey { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - }; - assert!(in_mem - .put(PutRequest { - key: schema.to_string().as_bytes().to_vec(), - value: "".as_bytes().to_vec(), - prev_kv: false, - }) - .await - .is_ok()); - - let table1 = TableNameKey::new(catalog_name, schema_name, table_name); - let table2 = TableNameKey::new(catalog_name, schema_name, "test_table1"); - assert!(in_mem - .put(PutRequest { - key: table1.as_raw_key(), - value: "".as_bytes().to_vec(), - prev_kv: false, - }) - .await - .is_ok()); - assert!(in_mem - .put(PutRequest { - key: table2.as_raw_key(), - value: "".as_bytes().to_vec(), - prev_kv: false, - }) - .await - .is_ok()); - - let catalog_key = get_keys_by_prefix(build_catalog_prefix(), &in_mem) - .await - .unwrap(); - let schema_key = get_keys_by_prefix(build_schema_prefix(schema.catalog_name), &in_mem) - .await - .unwrap(); - let table_key = get_keys_by_prefix( - format!( - "{}/", - TableNameKey::prefix_to_table(table1.catalog, table1.schema) - ), - &in_mem, - ) - .await - .unwrap(); - - assert_eq!(catalog_name, catalog_key[0]); - assert_eq!(schema_name, schema_key[0]); - assert_eq!(table_name, table_key[0]); - assert_eq!("test_table1", table_key[1]); - } -} diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 8b5cff7663..b792e6b59c 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -21,6 +21,7 @@ use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; @@ -35,7 +36,7 @@ use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metadata_service::{DefaultMetadataService, MetadataService}; use meta_srv::metasrv::{MetaSrv, MetaSrvOptions}; use meta_srv::mocks::MockInfo; -use meta_srv::service::store::kv::KvStoreRef; +use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef}; use meta_srv::service::store::memory::MemStore; use servers::grpc::GrpcServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; @@ -131,8 +132,10 @@ impl GreptimeDbClusterBuilder { let mock = meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await; - - let metadata_service = DefaultMetadataService::new(mock.meta_srv.kv_store().clone()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + mock.meta_srv.kv_store().clone(), + ))); + let metadata_service = DefaultMetadataService::new(table_metadata_manager); metadata_service .create_schema("another_catalog", "another_schema", true) .await