feat: add Catalog and Schema Manager (#2037)

* feat: add Range Stream

* feat: add catalog and schema manager

* feat: enhance KeyValueDecoderFn

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-08-02 11:56:29 +08:00
committed by GitHub
parent 346c52eb72
commit 4626c2efe5
8 changed files with 659 additions and 8 deletions

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::str::Utf8Error;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use serde_json::error::Error as JsonError;
@@ -68,6 +70,22 @@ pub enum Error {
location: Location,
},
#[snafu(display("Catalog already exists, catalog: {}", catalog))]
CatalogAlreadyExists { catalog: String, location: Location },
#[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
SchemaAlreadyExists {
catalog: String,
schema: String,
location: Location,
},
#[snafu(display("Failed to convert raw key to str, source: {}", source))]
ConvertRawKey {
location: Location,
source: Utf8Error,
},
#[snafu(display("Table does not exist, table_name: {}", table_name))]
TableNotExist {
table_name: String,
@@ -113,6 +131,9 @@ pub enum Error {
source: common_catalog::error::Error,
location: Location,
},
#[snafu(display("External error: {}", err_msg))]
External { location: Location, err_msg: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -128,18 +149,22 @@ impl ErrorExt for Error {
| InvalidProtoMsg { .. }
| InvalidTableMetadata { .. }
| MoveRegion { .. }
| Unexpected { .. } => StatusCode::Unexpected,
| Unexpected { .. }
| External { .. } => StatusCode::Unexpected,
SendMessage { .. }
| GetKvCache { .. }
| CacheNotGet { .. }
| TableAlreadyExists { .. }
| CatalogAlreadyExists { .. }
| SchemaAlreadyExists { .. }
| TableNotExist { .. }
| RenameTable { .. } => StatusCode::Internal,
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => {
StatusCode::Unexpected
}
EncodeJson { .. }
| DecodeJson { .. }
| PayloadNotExist { .. }
| ConvertRawKey { .. } => StatusCode::Unexpected,
MetaSrv { source, .. } => source.status_code(),

View File

@@ -26,11 +26,17 @@
//! schemas).
//! - This key is mainly used in constructing the table in Datanode and Frontend.
//!
//! 3. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
//! 3. Catalog name key: `__catalog_name/{catalog_name}`
//! - Indices all catalog names
//!
//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
//! - Indices all schema names belong to the {catalog_name}
//!
//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
//! - The value is a [TableNameValue] struct; it contains the table id.
//! - Used in the table name to table id lookup.
//!
//! 4. Table region key: `__table_region/{table_id}`
//! 6. Table region key: `__table_region/{table_id}`
//! - The value is a [TableRegionValue] struct; it contains the region distribution of the
//! table in the Datanodes.
//!
@@ -41,7 +47,9 @@
//! table metadata manager: [TableMetadataManager]. It contains all the managers defined above.
//! It's recommended to just use this manager only.
pub mod catalog_name;
pub mod datanode_table;
pub mod schema_name;
pub mod table_info;
pub mod table_name;
pub mod table_region;
@@ -57,18 +65,22 @@ use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};
use table_region::{TableRegionKey, TableRegionManager, TableRegionValue};
use self::catalog_name::CatalogNameValue;
use self::schema_name::SchemaNameValue;
use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu};
pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::KvBackendRef;
pub const REMOVED_PREFIX: &str = "__removed";
const TABLE_NAME_PATTERN: &str = "[a-zA-Z_:][a-zA-Z0-9_:]*";
const NAME_PATTERN: &str = "[a-zA-Z_:-][a-zA-Z0-9_:-]*";
const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";
const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
lazy_static! {
static ref DATANODE_TABLE_KEY_PATTERN: Regex =
@@ -77,7 +89,23 @@ lazy_static! {
lazy_static! {
static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
"^{TABLE_NAME_KEY_PREFIX}/({TABLE_NAME_PATTERN})/({TABLE_NAME_PATTERN})/({TABLE_NAME_PATTERN})$"
"^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
))
.unwrap();
}
lazy_static! {
/// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
"^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
))
.unwrap();
}
lazy_static! {
/// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
"^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
))
.unwrap();
}
@@ -167,6 +195,8 @@ macro_rules! impl_table_meta_value {
}
impl_table_meta_value! {
CatalogNameValue,
SchemaNameValue,
TableNameValue,
TableInfoValue,
TableRegionValue,

View File

@@ -0,0 +1,135 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Error, InvalidTableMetadataSnafu, Result};
use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{PutRequest, RangeRequest};
use crate::rpc::KeyValue;
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct CatalogNameKey<'a> {
pub catalog: &'a str,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CatalogNameValue;
impl<'a> CatalogNameKey<'a> {
pub fn new(catalog: &'a str) -> Self {
Self { catalog }
}
pub fn range_start_key() -> String {
format!("{}/", CATALOG_NAME_KEY_PREFIX)
}
}
impl Display for CatalogNameKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog)
}
}
impl TableMetaKey for CatalogNameKey<'_> {
fn as_raw_key(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
}
impl<'a> TryFrom<&'a str> for CatalogNameKey<'a> {
type Error = Error;
fn try_from(s: &'a str) -> Result<Self> {
let captures = CATALOG_NAME_KEY_PATTERN
.captures(s)
.context(InvalidTableMetadataSnafu {
err_msg: format!("Illegal CatalogNameKey format: '{s}'"),
})?;
// Safety: pass the regex check above
Ok(Self {
catalog: captures.get(1).unwrap().as_str(),
})
}
}
/// Decoder `KeyValue` to ({catalog},())
pub fn catalog_decoder(kv: KeyValue) -> Result<(String, ())> {
let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
let catalog_name = CatalogNameKey::try_from(str)?;
Ok((catalog_name.catalog.to_string(), ()))
}
pub struct CatalogManager {
kv_backend: KvBackendRef,
}
impl CatalogManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Creates `CatalogNameKey`.
pub async fn create(&self, catalog: CatalogNameKey<'_>) -> Result<()> {
let raw_key = catalog.as_raw_key();
let req = PutRequest::new()
.with_key(raw_key)
.with_value(CatalogNameValue.try_as_raw_value()?);
self.kv_backend.put(req).await?;
Ok(())
}
pub async fn catalog_names(&self) -> BoxStream<'static, Result<String>> {
let start_key = CatalogNameKey::range_start_key();
let req = RangeRequest::new().with_prefix(start_key.as_bytes());
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(catalog_decoder),
);
Box::pin(stream.map(|kv| kv.map(|kv| kv.0)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialization() {
let key = CatalogNameKey::new("my-catalog");
assert_eq!(key.to_string(), "__catalog_name/my-catalog");
let parsed: CatalogNameKey = "__catalog_name/my-catalog".try_into().unwrap();
assert_eq!(key, parsed);
}
}

View File

@@ -0,0 +1,142 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Error, InvalidTableMetadataSnafu, Result};
use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{PutRequest, RangeRequest};
use crate::rpc::KeyValue;
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct SchemaNameKey<'a> {
pub catalog: &'a str,
pub schema: &'a str,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SchemaNameValue;
impl<'a> SchemaNameKey<'a> {
pub fn new(catalog: &'a str, schema: &'a str) -> Self {
Self { catalog, schema }
}
pub fn range_start_key(catalog: &str) -> String {
format!("{}/{}/", SCHEMA_NAME_KEY_PREFIX, catalog)
}
}
impl Display for SchemaNameKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}/{}/{}",
SCHEMA_NAME_KEY_PREFIX, self.catalog, self.schema
)
}
}
impl TableMetaKey for SchemaNameKey<'_> {
fn as_raw_key(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
}
/// Decoder `KeyValue` to ({schema},())
pub fn schema_decoder(kv: KeyValue) -> Result<(String, ())> {
let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
let schema_name = SchemaNameKey::try_from(str)?;
Ok((schema_name.schema.to_string(), ()))
}
impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> {
type Error = Error;
fn try_from(s: &'a str) -> Result<Self> {
let captures = SCHEMA_NAME_KEY_PATTERN
.captures(s)
.context(InvalidTableMetadataSnafu {
err_msg: format!("Illegal SchemaNameKey format: '{s}'"),
})?;
// Safety: pass the regex check above
Ok(Self {
catalog: captures.get(1).unwrap().as_str(),
schema: captures.get(2).unwrap().as_str(),
})
}
}
pub struct SchemaManager {
kv_backend: KvBackendRef,
}
impl SchemaManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Creates `SchemaNameKey`.
pub async fn create(&self, schema: SchemaNameKey<'_>) -> Result<()> {
let raw_key = schema.as_raw_key();
let req = PutRequest::new()
.with_key(raw_key)
.with_value(SchemaNameValue.try_as_raw_value()?);
self.kv_backend.put(req).await?;
Ok(())
}
/// Returns a schema stream, it lists all schemas belong to the target `catalog`.
pub async fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
let start_key = SchemaNameKey::range_start_key(catalog);
let req = RangeRequest::new().with_prefix(start_key.as_bytes());
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(schema_decoder),
);
Box::pin(stream.map(|kv| kv.map(|kv| kv.0)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialization() {
let key = SchemaNameKey::new("my-catalog", "my-schema");
assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema");
let parsed: SchemaNameKey<'_> = "__schema_name/my-catalog/my-schema".try_into().unwrap();
assert_eq!(key, parsed);
}
}

View File

@@ -23,6 +23,7 @@ pub mod key;
pub mod kv_backend;
pub mod metrics;
pub mod peer;
pub mod range_stream;
pub mod rpc;
pub mod table_name;
pub mod util;

View File

@@ -0,0 +1,293 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::future::BoxFuture;
use futures::{ready, FutureExt, Stream};
use crate::error::Result;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{RangeRequest, RangeResponse};
use crate::rpc::KeyValue;
use crate::util::get_next_prefix_key;
pub type KeyValueDecoderFn<K, V> = dyn Fn(KeyValue) -> Result<(K, V)> + Send + Sync;
enum PaginationStreamState<K, V> {
/// At the start of reading.
Init,
/// Decoding key value pairs.
Decoding(SimpleKeyValueDecoder<K, V>),
/// Retrieving data from backend.
Reading(BoxFuture<'static, Result<(PaginationStreamFactory, Option<RangeResponse>)>>),
/// Error
Error,
}
pub const DEFAULT_PAGE_SIZE: usize = 512;
struct PaginationStreamFactory {
kv: KvBackendRef,
/// key is the first key for the range, If range_end is not given, the
/// request only looks up key.
pub key: Vec<u8>,
/// range_end is the upper bound on the requested range [key, range_end).
/// If range_end is '\0', the range is all keys >= key.
/// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
/// then the range request gets all keys prefixed with key.
/// If both key and range_end are '\0', then the range request returns all
/// keys.
pub range_end: Vec<u8>,
/// page_size is the pagination page size.
pub page_size: usize,
/// keys_only when set returns only the keys and not the values.
pub keys_only: bool,
pub more: bool,
}
impl PaginationStreamFactory {
pub fn new(
kv: &KvBackendRef,
key: Vec<u8>,
range_end: Vec<u8>,
page_size: usize,
keys_only: bool,
more: bool,
) -> Self {
Self {
kv: kv.clone(),
key,
range_end,
page_size,
keys_only,
more,
}
}
}
impl PaginationStreamFactory {
pub async fn read_next(self) -> Result<(Self, Option<RangeResponse>)> {
if self.more {
let resp = self
.kv
.range(RangeRequest {
key: self.key.clone(),
range_end: self.range_end.clone(),
limit: self.page_size as i64,
keys_only: self.keys_only,
})
.await?;
let key = resp
.kvs
.last()
.map(|kv| kv.key.clone())
.unwrap_or_else(Vec::new);
let next_key = get_next_prefix_key(&key);
Ok((
Self {
kv: self.kv,
key: next_key,
range_end: self.range_end,
page_size: self.page_size,
keys_only: self.keys_only,
more: resp.more,
},
Some(resp),
))
} else {
Ok((self, None))
}
}
}
pub struct PaginationStream<K, V> {
state: PaginationStreamState<K, V>,
decoder_fn: Arc<KeyValueDecoderFn<K, V>>,
factory: Option<PaginationStreamFactory>,
}
impl<K, V> PaginationStream<K, V> {
pub fn new(
kv: KvBackendRef,
req: RangeRequest,
page_size: usize,
decoder_fn: Arc<KeyValueDecoderFn<K, V>>,
) -> Self {
Self {
state: PaginationStreamState::Init,
decoder_fn,
factory: Some(PaginationStreamFactory::new(
&kv,
req.key,
req.range_end,
page_size,
req.keys_only,
true,
)),
}
}
}
struct SimpleKeyValueDecoder<K, V> {
kv: VecDeque<KeyValue>,
decoder: Arc<KeyValueDecoderFn<K, V>>,
}
impl<K, V> Iterator for SimpleKeyValueDecoder<K, V> {
type Item = Result<(K, V)>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(kv) = self.kv.pop_front() {
Some((self.decoder)(kv))
} else {
None
}
}
}
impl<K, V> Stream for PaginationStream<K, V> {
type Item = Result<(K, V)>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
PaginationStreamState::Decoding(decoder) => match decoder.next() {
Some(Ok(result)) => return Poll::Ready(Some(Ok(result))),
Some(Err(e)) => {
self.state = PaginationStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
None => self.state = PaginationStreamState::Init,
},
PaginationStreamState::Init => {
let factory = self.factory.take().expect("lost factory");
if !factory.more {
return Poll::Ready(None);
}
let fut = factory.read_next().boxed();
self.state = PaginationStreamState::Reading(fut);
}
PaginationStreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
Ok((factory, Some(resp))) => {
self.factory = Some(factory);
let decoder = SimpleKeyValueDecoder {
kv: resp.kvs.into(),
decoder: self.decoder_fn.clone(),
};
self.state = PaginationStreamState::Decoding(decoder);
}
Ok((factory, None)) => {
self.factory = Some(factory);
self.state = PaginationStreamState::Init;
}
Err(e) => {
self.state = PaginationStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
},
PaginationStreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use futures::TryStreamExt;
use super::*;
use crate::error::{Error, Result};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
fn decoder(kv: KeyValue) -> Result<(Vec<u8>, Vec<u8>)> {
Ok((kv.key.clone(), kv.value))
}
#[tokio::test]
async fn test_range_empty() {
let kv_store = Arc::new(MemoryKvBackend::<Error>::new());
let stream = PaginationStream::new(
kv_store.clone(),
RangeRequest {
key: b"a".to_vec(),
..Default::default()
},
DEFAULT_PAGE_SIZE,
Arc::new(decoder),
);
let kv = stream.try_collect::<Vec<_>>().await.unwrap();
assert!(kv.is_empty());
}
#[tokio::test]
async fn test_range() {
let kv_store = Arc::new(MemoryKvBackend::<Error>::new());
let total = 26;
let mut expected = BTreeMap::<Vec<u8>, ()>::new();
for i in 0..total {
let key = vec![97 + i];
assert!(kv_store
.put(PutRequest {
key: key.clone(),
value: key.clone(),
..Default::default()
})
.await
.is_ok());
expected.insert(key, ());
}
let key = b"a".to_vec();
let range_end = b"f".to_vec();
let stream = PaginationStream::new(
kv_store.clone(),
RangeRequest {
key,
range_end,
..Default::default()
},
2,
Arc::new(decoder),
);
let kv = stream
.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.map(|kv| kv.0)
.collect::<Vec<_>>();
assert_eq!(vec![vec![97], vec![98], vec![99], vec![100], vec![101]], kv);
}
}

View File

@@ -612,6 +612,13 @@ impl TryFrom<PbCompareAndPutResponse> for CompareAndPutResponse {
}
impl CompareAndPutResponse {
pub fn handle<R, E, F>(&self, f: F) -> std::result::Result<R, E>
where
F: FnOnce(&Self) -> std::result::Result<R, E>,
{
f(self)
}
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbCompareAndPutResponse {
PbCompareAndPutResponse {
header: Some(header),

View File

@@ -27,10 +27,28 @@ pub fn get_prefix_end_key(key: &[u8]) -> Vec<u8> {
vec![0]
}
/// Get next prefix key of `key`.
#[inline]
pub fn get_next_prefix_key(key: &[u8]) -> Vec<u8> {
let mut next = Vec::with_capacity(key.len() + 1);
next.extend_from_slice(key);
next.push(0);
next
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_next_prefix() {
let key = b"testa";
let mut expected = b"testa".to_vec();
expected.push(0);
assert_eq!(expected, get_next_prefix_key(key));
}
#[test]
fn test_get_prefix() {
let key = b"testa";