refactor: default catalog and schema are created at Metasrv (#1391)

* refactor: default catalog and schema are created at Metasrv

* fix: unit test

* fix: add license

* simplify the meta mock

* cr
This commit is contained in:
fys
2023-04-20 17:58:37 +08:00
committed by GitHub
parent e8cd2f0e48
commit f2cfd8e608
8 changed files with 245 additions and 11 deletions

View File

@@ -20,9 +20,7 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use async_stream::stream;
use async_trait::async_trait;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE,
};
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
use common_telemetry::{debug, error, info};
use dashmap::DashMap;
use futures::Stream;
@@ -189,13 +187,6 @@ impl RemoteCatalogManager {
let mut res = HashMap::new();
let max_table_id = MIN_USER_TABLE_ID - 1;
// initiate default catalog and schema
let default_catalog = self
.create_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await?;
res.insert(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
info!("Default catalog and schema registered");
let mut catalogs = self.iter_remote_catalogs().await;
while let Some(r) = catalogs.next().await {
let CatalogKey { catalog_name, .. } = r?;

View File

@@ -20,7 +20,9 @@ use std::sync::Arc;
use async_stream::stream;
use catalog::error::Error;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::{Kv, KvBackend, ValueIter};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::RecordBatch;
use common_telemetry::logging::info;
use datatypes::data_type::ConcreteDataType;
@@ -34,11 +36,36 @@ use table::test_util::MemTable;
use table::TableRef;
use tokio::sync::RwLock;
#[derive(Default)]
pub struct MockKvBackend {
map: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
}
impl Default for MockKvBackend {
fn default() -> Self {
let mut map = BTreeMap::default();
let catalog_value = CatalogValue {}.as_bytes().unwrap();
let schema_value = SchemaValue {}.as_bytes().unwrap();
let default_catalog_key = CatalogKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
}
.to_string();
let default_schema_key = SchemaKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
}
.to_string();
// create default catalog and schema
map.insert(default_catalog_key.into(), catalog_value);
map.insert(default_schema_key.into(), schema_value);
let map = RwLock::new(map);
Self { map }
}
}
impl Display for MockKvBackend {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
futures::executor::block_on(async {

View File

@@ -285,6 +285,12 @@ pub enum Error {
#[snafu(backtrace)]
source: common_procedure::Error,
},
#[snafu(display("Schema already exists, name: {schema_name}"))]
SchemaAlreadyExists {
schema_name: String,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -325,6 +331,7 @@ impl ErrorExt for Error {
| Error::ExceededRetryLimit { .. }
| Error::SendShutdownSignal { .. }
| Error::ParseAddr { .. }
| Error::SchemaAlreadyExists { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }

View File

@@ -24,6 +24,7 @@ pub mod handler;
pub mod keys;
pub mod lease;
pub mod lock;
pub mod metadata_service;
pub mod metasrv;
#[cfg(feature = "mock")]
pub mod mocks;

View File

@@ -0,0 +1,175 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::CompareAndPutRequest;
use async_trait::async_trait;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use common_telemetry::info;
use snafu::{ensure, ResultExt};
use crate::error;
use crate::error::Result;
use crate::service::store::kv::KvStoreRef;
/// This trait defines some methods of metadata
#[async_trait]
pub trait MetadataService: Send + Sync {
// An error occurs if the schema exists and "if_not_exist" == false.
async fn create_schema(
&self,
catalog_name: &str,
schema_name: &str,
if_not_exist: bool,
) -> Result<()>;
async fn delete_schema(&self, catalog_name: &str, schema_name: &str) -> Result<()>;
}
pub type MetadataServiceRef = Arc<dyn MetadataService>;
#[derive(Clone)]
pub struct DefaultMetadataService {
kv_store: KvStoreRef,
}
impl DefaultMetadataService {
pub fn new(kv_store: KvStoreRef) -> Self {
Self { kv_store }
}
}
#[async_trait]
impl MetadataService for DefaultMetadataService {
async fn create_schema(
&self,
catalog_name: &str,
schema_name: &str,
if_not_exist: bool,
) -> Result<()> {
let kv_store = self.kv_store.clone();
let catalog_key = CatalogKey {
catalog_name: catalog_name.to_string(),
}
.to_string();
let schema_key = SchemaKey {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
}
.to_string();
let req = CompareAndPutRequest {
key: catalog_key.into(),
expect: vec![],
value: CatalogValue {}
.as_bytes()
.context(error::InvalidCatalogValueSnafu)?,
..Default::default()
};
let resp = kv_store.compare_and_put(req).await?;
if resp.success {
info!("Successfully created a catalog: {}", catalog_name);
}
let req = CompareAndPutRequest {
key: schema_key.into(),
expect: vec![],
value: SchemaValue {}
.as_bytes()
.context(error::InvalidCatalogValueSnafu)?,
..Default::default()
};
let resp = kv_store.compare_and_put(req).await?;
if resp.success {
info!("Successfully created a schema: {}", schema_name);
}
ensure!(
resp.success || if_not_exist,
error::SchemaAlreadyExistsSnafu { schema_name }
);
Ok(())
}
async fn delete_schema(&self, _catalog_name: &str, _schema_name: &str) -> Result<()> {
unimplemented!()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use catalog::helper::{CatalogKey, SchemaKey};
use super::{DefaultMetadataService, MetadataService};
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::KvStoreRef;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_create_schema() {
let kv_store = Arc::new(MemStore::default());
let service = DefaultMetadataService::new(kv_store.clone());
let result = service.create_schema("catalog", "public", false).await;
assert!(result.is_ok());
verify_result(kv_store.clone()).await;
let result = service.create_schema("catalog", "public", false).await;
assert!(result.is_err());
let result = service.create_schema("catalog", "public", true).await;
assert!(result.is_ok());
verify_result(kv_store.clone()).await;
}
async fn verify_result(kv_store: KvStoreRef) {
let key: Vec<u8> = CatalogKey {
catalog_name: "catalog".to_string(),
}
.to_string()
.into();
let result = kv_store.get(key.clone()).await.unwrap();
assert!(result.is_some());
let kv = result.unwrap();
assert_eq!(key, kv.key);
let key: Vec<u8> = SchemaKey {
catalog_name: "catalog".to_string(),
schema_name: "public".to_string(),
}
.to_string()
.into();
let result = kv_store.get(key.clone()).await.unwrap();
assert!(result.is_some());
let kv = result.unwrap();
assert_eq!(key, kv.key);
}
}

View File

@@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use api::v1::meta::Peer;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::ProcedureManagerRef;
use common_telemetry::{error, info, warn};
use serde::{Deserialize, Serialize};
@@ -30,6 +31,7 @@ use crate::election::{Election, LeaderChangeMessage};
use crate::error::{RecoverProcedureSnafu, Result};
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::metadata_service::MetadataServiceRef;
use crate::selector::{Selector, SelectorType};
use crate::sequence::SequenceRef;
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
@@ -110,6 +112,7 @@ pub struct MetaSrv {
meta_peer_client: Option<MetaPeerClient>,
lock: Option<DistLockRef>,
procedure_manager: ProcedureManagerRef,
metadata_service: MetadataServiceRef,
}
impl MetaSrv {
@@ -123,6 +126,8 @@ impl MetaSrv {
return Ok(());
}
self.create_default_schema_if_not_exist().await?;
if let Some(election) = self.election() {
let procedure_manager = self.procedure_manager.clone();
let mut rx = election.subscribe_leader_change();
@@ -177,6 +182,12 @@ impl MetaSrv {
Ok(())
}
async fn create_default_schema_if_not_exist(&self) -> Result<()> {
self.metadata_service
.create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true)
.await
}
pub fn shutdown(&self) {
self.started.store(false, Ordering::Relaxed);
}

View File

@@ -23,6 +23,7 @@ use crate::handler::{
OnLeaderStartHandler, PersistStatsHandler, RegionFailureHandler, ResponseHeaderHandler,
};
use crate::lock::DistLockRef;
use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef};
use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ};
use crate::procedure::state_store::MetaStateStore;
use crate::selector::lease_based::LeaseBasedSelector;
@@ -40,6 +41,7 @@ pub struct MetaSrvBuilder {
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClient>,
lock: Option<DistLockRef>,
metadata_service: Option<MetadataServiceRef>,
}
impl MetaSrvBuilder {
@@ -53,6 +55,7 @@ impl MetaSrvBuilder {
election: None,
options: None,
lock: None,
metadata_service: None,
}
}
@@ -96,6 +99,11 @@ impl MetaSrvBuilder {
self
}
pub fn metadata_service(mut self, metadata_service: MetadataServiceRef) -> Self {
self.metadata_service = Some(metadata_service);
self
}
pub async fn build(self) -> MetaSrv {
let started = Arc::new(AtomicBool::new(false));
@@ -108,6 +116,7 @@ impl MetaSrvBuilder {
selector,
handler_group,
lock,
metadata_service,
} = self;
let options = options.unwrap_or_default();
@@ -146,6 +155,9 @@ impl MetaSrvBuilder {
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
let procedure_manager = Arc::new(LocalManager::new(config, state_store));
let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
MetaSrv {
started,
options,
@@ -158,6 +170,7 @@ impl MetaSrvBuilder {
meta_peer_client,
lock,
procedure_manager,
metadata_service,
}
}
}

View File

@@ -18,9 +18,11 @@ use std::time::Duration;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use tower::service_fn;
use crate::metadata_service::{DefaultMetadataService, MetadataService};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::{MetaSrvOptions, SelectorRef};
use crate::service::store::etcd::EtcdStore;
@@ -55,6 +57,13 @@ pub async fn mock(
) -> MockInfo {
let server_addr = opts.server_addr.clone();
let metadata_service = DefaultMetadataService::new(kv_store.clone());
metadata_service
.create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true)
.await
.unwrap();
let builder = MetaSrvBuilder::new().options(opts).kv_store(kv_store);
let builder = match selector {