diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 291507c234..910032efb9 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -20,9 +20,7 @@ use std::sync::Arc; use arc_swap::ArcSwap; use async_stream::stream; use async_trait::async_trait; -use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE, -}; +use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; use common_telemetry::{debug, error, info}; use dashmap::DashMap; use futures::Stream; @@ -189,13 +187,6 @@ impl RemoteCatalogManager { let mut res = HashMap::new(); let max_table_id = MIN_USER_TABLE_ID - 1; - // initiate default catalog and schema - let default_catalog = self - .create_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) - .await?; - res.insert(DEFAULT_CATALOG_NAME.to_string(), default_catalog); - info!("Default catalog and schema registered"); - let mut catalogs = self.iter_remote_catalogs().await; while let Some(r) = catalogs.next().await { let CatalogKey { catalog_name, .. } = r?; diff --git a/src/catalog/tests/mock.rs b/src/catalog/tests/mock.rs index d34a284b89..b28094351a 100644 --- a/src/catalog/tests/mock.rs +++ b/src/catalog/tests/mock.rs @@ -20,7 +20,9 @@ use std::sync::Arc; use async_stream::stream; use catalog::error::Error; +use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use catalog::remote::{Kv, KvBackend, ValueIter}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_recordbatch::RecordBatch; use common_telemetry::logging::info; use datatypes::data_type::ConcreteDataType; @@ -34,11 +36,36 @@ use table::test_util::MemTable; use table::TableRef; use tokio::sync::RwLock; -#[derive(Default)] pub struct MockKvBackend { map: RwLock, Vec>>, } +impl Default for MockKvBackend { + fn default() -> Self { + let mut map = BTreeMap::default(); + let catalog_value = CatalogValue {}.as_bytes().unwrap(); + let schema_value = SchemaValue {}.as_bytes().unwrap(); + + let default_catalog_key = CatalogKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + } + .to_string(); + + let default_schema_key = SchemaKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + } + .to_string(); + + // create default catalog and schema + map.insert(default_catalog_key.into(), catalog_value); + map.insert(default_schema_key.into(), schema_value); + + let map = RwLock::new(map); + Self { map } + } +} + impl Display for MockKvBackend { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { futures::executor::block_on(async { diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index dbf6d9b6f9..ec580fd5a1 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -285,6 +285,12 @@ pub enum Error { #[snafu(backtrace)] source: common_procedure::Error, }, + + #[snafu(display("Schema already exists, name: {schema_name}"))] + SchemaAlreadyExists { + schema_name: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -325,6 +331,7 @@ impl ErrorExt for Error { | Error::ExceededRetryLimit { .. } | Error::SendShutdownSignal { .. } | Error::ParseAddr { .. } + | Error::SchemaAlreadyExists { .. } | Error::StartGrpc { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index a03022e624..27dcff476d 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -24,6 +24,7 @@ pub mod handler; pub mod keys; pub mod lease; pub mod lock; +pub mod metadata_service; pub mod metasrv; #[cfg(feature = "mock")] pub mod mocks; diff --git a/src/meta-srv/src/metadata_service.rs b/src/meta-srv/src/metadata_service.rs new file mode 100644 index 0000000000..32e77635c7 --- /dev/null +++ b/src/meta-srv/src/metadata_service.rs @@ -0,0 +1,175 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::meta::CompareAndPutRequest; +use async_trait::async_trait; +use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; +use common_telemetry::info; +use snafu::{ensure, ResultExt}; + +use crate::error; +use crate::error::Result; +use crate::service::store::kv::KvStoreRef; + +/// This trait defines some methods of metadata +#[async_trait] +pub trait MetadataService: Send + Sync { + // An error occurs if the schema exists and "if_not_exist" == false. + async fn create_schema( + &self, + catalog_name: &str, + schema_name: &str, + if_not_exist: bool, + ) -> Result<()>; + + async fn delete_schema(&self, catalog_name: &str, schema_name: &str) -> Result<()>; +} + +pub type MetadataServiceRef = Arc; + +#[derive(Clone)] +pub struct DefaultMetadataService { + kv_store: KvStoreRef, +} + +impl DefaultMetadataService { + pub fn new(kv_store: KvStoreRef) -> Self { + Self { kv_store } + } +} + +#[async_trait] +impl MetadataService for DefaultMetadataService { + async fn create_schema( + &self, + catalog_name: &str, + schema_name: &str, + if_not_exist: bool, + ) -> Result<()> { + let kv_store = self.kv_store.clone(); + + let catalog_key = CatalogKey { + catalog_name: catalog_name.to_string(), + } + .to_string(); + + let schema_key = SchemaKey { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + } + .to_string(); + + let req = CompareAndPutRequest { + key: catalog_key.into(), + expect: vec![], + value: CatalogValue {} + .as_bytes() + .context(error::InvalidCatalogValueSnafu)?, + ..Default::default() + }; + + let resp = kv_store.compare_and_put(req).await?; + + if resp.success { + info!("Successfully created a catalog: {}", catalog_name); + } + + let req = CompareAndPutRequest { + key: schema_key.into(), + expect: vec![], + value: SchemaValue {} + .as_bytes() + .context(error::InvalidCatalogValueSnafu)?, + ..Default::default() + }; + let resp = kv_store.compare_and_put(req).await?; + + if resp.success { + info!("Successfully created a schema: {}", schema_name); + } + + ensure!( + resp.success || if_not_exist, + error::SchemaAlreadyExistsSnafu { schema_name } + ); + + Ok(()) + } + + async fn delete_schema(&self, _catalog_name: &str, _schema_name: &str) -> Result<()> { + unimplemented!() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use catalog::helper::{CatalogKey, SchemaKey}; + + use super::{DefaultMetadataService, MetadataService}; + use crate::service::store::ext::KvStoreExt; + use crate::service::store::kv::KvStoreRef; + use crate::service::store::memory::MemStore; + + #[tokio::test] + async fn test_create_schema() { + let kv_store = Arc::new(MemStore::default()); + let service = DefaultMetadataService::new(kv_store.clone()); + + let result = service.create_schema("catalog", "public", false).await; + + assert!(result.is_ok()); + verify_result(kv_store.clone()).await; + + let result = service.create_schema("catalog", "public", false).await; + assert!(result.is_err()); + + let result = service.create_schema("catalog", "public", true).await; + + assert!(result.is_ok()); + verify_result(kv_store.clone()).await; + } + + async fn verify_result(kv_store: KvStoreRef) { + let key: Vec = CatalogKey { + catalog_name: "catalog".to_string(), + } + .to_string() + .into(); + + let result = kv_store.get(key.clone()).await.unwrap(); + + assert!(result.is_some()); + let kv = result.unwrap(); + + assert_eq!(key, kv.key); + + let key: Vec = SchemaKey { + catalog_name: "catalog".to_string(), + schema_name: "public".to_string(), + } + .to_string() + .into(); + + let result = kv_store.get(key.clone()).await.unwrap(); + + assert!(result.is_some()); + let kv = result.unwrap(); + + assert_eq!(key, kv.key); + } +} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d6a095042d..e129941d00 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use api::v1::meta::Peer; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure::ProcedureManagerRef; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; @@ -30,6 +31,7 @@ use crate::election::{Election, LeaderChangeMessage}; use crate::error::{RecoverProcedureSnafu, Result}; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; +use crate::metadata_service::MetadataServiceRef; use crate::selector::{Selector, SelectorType}; use crate::sequence::SequenceRef; use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; @@ -110,6 +112,7 @@ pub struct MetaSrv { meta_peer_client: Option, lock: Option, procedure_manager: ProcedureManagerRef, + metadata_service: MetadataServiceRef, } impl MetaSrv { @@ -123,6 +126,8 @@ impl MetaSrv { return Ok(()); } + self.create_default_schema_if_not_exist().await?; + if let Some(election) = self.election() { let procedure_manager = self.procedure_manager.clone(); let mut rx = election.subscribe_leader_change(); @@ -177,6 +182,12 @@ impl MetaSrv { Ok(()) } + async fn create_default_schema_if_not_exist(&self) -> Result<()> { + self.metadata_service + .create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true) + .await + } + pub fn shutdown(&self) { self.started.store(false, Ordering::Relaxed); } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index d8ec581adf..48c7c23233 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -23,6 +23,7 @@ use crate::handler::{ OnLeaderStartHandler, PersistStatsHandler, RegionFailureHandler, ResponseHeaderHandler, }; use crate::lock::DistLockRef; +use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef}; use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ}; use crate::procedure::state_store::MetaStateStore; use crate::selector::lease_based::LeaseBasedSelector; @@ -40,6 +41,7 @@ pub struct MetaSrvBuilder { election: Option, meta_peer_client: Option, lock: Option, + metadata_service: Option, } impl MetaSrvBuilder { @@ -53,6 +55,7 @@ impl MetaSrvBuilder { election: None, options: None, lock: None, + metadata_service: None, } } @@ -96,6 +99,11 @@ impl MetaSrvBuilder { self } + pub fn metadata_service(mut self, metadata_service: MetadataServiceRef) -> Self { + self.metadata_service = Some(metadata_service); + self + } + pub async fn build(self) -> MetaSrv { let started = Arc::new(AtomicBool::new(false)); @@ -108,6 +116,7 @@ impl MetaSrvBuilder { selector, handler_group, lock, + metadata_service, } = self; let options = options.unwrap_or_default(); @@ -146,6 +155,9 @@ impl MetaSrvBuilder { let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); let procedure_manager = Arc::new(LocalManager::new(config, state_store)); + let metadata_service = metadata_service + .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); + MetaSrv { started, options, @@ -158,6 +170,7 @@ impl MetaSrvBuilder { meta_peer_client, lock, procedure_manager, + metadata_service, } } } diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index fcd08c5c6a..0381d29032 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -18,9 +18,11 @@ use std::time::Duration; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use tower::service_fn; +use crate::metadata_service::{DefaultMetadataService, MetadataService}; use crate::metasrv::builder::MetaSrvBuilder; use crate::metasrv::{MetaSrvOptions, SelectorRef}; use crate::service::store::etcd::EtcdStore; @@ -55,6 +57,13 @@ pub async fn mock( ) -> MockInfo { let server_addr = opts.server_addr.clone(); + let metadata_service = DefaultMetadataService::new(kv_store.clone()); + + metadata_service + .create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true) + .await + .unwrap(); + let builder = MetaSrvBuilder::new().options(opts).kv_store(kv_store); let builder = match selector {