From 4626c2efe533e9f96f91995c1430f81380d49f2d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 2 Aug 2023 11:56:29 +0800 Subject: [PATCH] feat: add Catalog and Schema Manager (#2037) * feat: add Range Stream * feat: add catalog and schema manager * feat: enhance KeyValueDecoderFn * chore: apply suggestions from CR * chore: apply suggestions from CR --- src/common/meta/src/error.rs | 33 ++- src/common/meta/src/key.rs | 38 ++- src/common/meta/src/key/catalog_name.rs | 135 +++++++++++ src/common/meta/src/key/schema_name.rs | 142 ++++++++++++ src/common/meta/src/lib.rs | 1 + src/common/meta/src/range_stream.rs | 293 ++++++++++++++++++++++++ src/common/meta/src/rpc/store.rs | 7 + src/common/meta/src/util.rs | 18 ++ 8 files changed, 659 insertions(+), 8 deletions(-) create mode 100644 src/common/meta/src/key/catalog_name.rs create mode 100644 src/common/meta/src/key/schema_name.rs create mode 100644 src/common/meta/src/range_stream.rs diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 6caed7775e..7492c0d6ef 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::Utf8Error; + use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use serde_json::error::Error as JsonError; @@ -68,6 +70,22 @@ pub enum Error { location: Location, }, + #[snafu(display("Catalog already exists, catalog: {}", catalog))] + CatalogAlreadyExists { catalog: String, location: Location }, + + #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))] + SchemaAlreadyExists { + catalog: String, + schema: String, + location: Location, + }, + + #[snafu(display("Failed to convert raw key to str, source: {}", source))] + ConvertRawKey { + location: Location, + source: Utf8Error, + }, + #[snafu(display("Table does not exist, table_name: {}", table_name))] TableNotExist { table_name: String, @@ -113,6 +131,9 @@ pub enum Error { source: common_catalog::error::Error, location: Location, }, + + #[snafu(display("External error: {}", err_msg))] + External { location: Location, err_msg: String }, } pub type Result = std::result::Result; @@ -128,18 +149,22 @@ impl ErrorExt for Error { | InvalidProtoMsg { .. } | InvalidTableMetadata { .. } | MoveRegion { .. } - | Unexpected { .. } => StatusCode::Unexpected, + | Unexpected { .. } + | External { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } | TableAlreadyExists { .. } + | CatalogAlreadyExists { .. } + | SchemaAlreadyExists { .. } | TableNotExist { .. } | RenameTable { .. } => StatusCode::Internal, - EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => { - StatusCode::Unexpected - } + EncodeJson { .. } + | DecodeJson { .. } + | PayloadNotExist { .. } + | ConvertRawKey { .. } => StatusCode::Unexpected, MetaSrv { source, .. } => source.status_code(), diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index ae29fe41aa..6326ced7a6 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -26,11 +26,17 @@ //! schemas). //! - This key is mainly used in constructing the table in Datanode and Frontend. //! -//! 3. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}` +//! 3. Catalog name key: `__catalog_name/{catalog_name}` +//! - Indices all catalog names +//! +//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}` +//! - Indices all schema names belong to the {catalog_name} +//! +//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}` //! - The value is a [TableNameValue] struct; it contains the table id. //! - Used in the table name to table id lookup. //! -//! 4. Table region key: `__table_region/{table_id}` +//! 6. Table region key: `__table_region/{table_id}` //! - The value is a [TableRegionValue] struct; it contains the region distribution of the //! table in the Datanodes. //! @@ -41,7 +47,9 @@ //! table metadata manager: [TableMetadataManager]. It contains all the managers defined above. //! It's recommended to just use this manager only. +pub mod catalog_name; pub mod datanode_table; +pub mod schema_name; pub mod table_info; pub mod table_name; pub mod table_region; @@ -57,18 +65,22 @@ use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; use table_region::{TableRegionKey, TableRegionManager, TableRegionValue}; +use self::catalog_name::CatalogNameValue; +use self::schema_name::SchemaNameValue; use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX}; use crate::kv_backend::KvBackendRef; pub const REMOVED_PREFIX: &str = "__removed"; -const TABLE_NAME_PATTERN: &str = "[a-zA-Z_:][a-zA-Z0-9_:]*"; +const NAME_PATTERN: &str = "[a-zA-Z_:-][a-zA-Z0-9_:-]*"; const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; const TABLE_REGION_KEY_PREFIX: &str = "__table_region"; +const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; +const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name"; lazy_static! { static ref DATANODE_TABLE_KEY_PATTERN: Regex = @@ -77,7 +89,23 @@ lazy_static! { lazy_static! { static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!( - "^{TABLE_NAME_KEY_PREFIX}/({TABLE_NAME_PATTERN})/({TABLE_NAME_PATTERN})/({TABLE_NAME_PATTERN})$" + "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$" + )) + .unwrap(); +} + +lazy_static! { + /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name} + static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!( + "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$" + )) + .unwrap(); +} + +lazy_static! { + /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name} + static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!( + "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$" )) .unwrap(); } @@ -167,6 +195,8 @@ macro_rules! impl_table_meta_value { } impl_table_meta_value! { + CatalogNameValue, + SchemaNameValue, TableNameValue, TableInfoValue, TableRegionValue, diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs new file mode 100644 index 0000000000..f5d8a5194c --- /dev/null +++ b/src/common/meta/src/key/catalog_name.rs @@ -0,0 +1,135 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::sync::Arc; + +use futures::stream::BoxStream; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{self, Error, InvalidTableMetadataSnafu, Result}; +use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX}; +use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::{PutRequest, RangeRequest}; +use crate::rpc::KeyValue; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct CatalogNameKey<'a> { + pub catalog: &'a str, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CatalogNameValue; + +impl<'a> CatalogNameKey<'a> { + pub fn new(catalog: &'a str) -> Self { + Self { catalog } + } + + pub fn range_start_key() -> String { + format!("{}/", CATALOG_NAME_KEY_PREFIX) + } +} + +impl Display for CatalogNameKey<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog) + } +} + +impl TableMetaKey for CatalogNameKey<'_> { + fn as_raw_key(&self) -> Vec { + self.to_string().into_bytes() + } +} + +impl<'a> TryFrom<&'a str> for CatalogNameKey<'a> { + type Error = Error; + + fn try_from(s: &'a str) -> Result { + let captures = CATALOG_NAME_KEY_PATTERN + .captures(s) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Illegal CatalogNameKey format: '{s}'"), + })?; + + // Safety: pass the regex check above + Ok(Self { + catalog: captures.get(1).unwrap().as_str(), + }) + } +} + +/// Decoder `KeyValue` to ({catalog},()) +pub fn catalog_decoder(kv: KeyValue) -> Result<(String, ())> { + let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?; + let catalog_name = CatalogNameKey::try_from(str)?; + + Ok((catalog_name.catalog.to_string(), ())) +} + +pub struct CatalogManager { + kv_backend: KvBackendRef, +} + +impl CatalogManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Creates `CatalogNameKey`. + pub async fn create(&self, catalog: CatalogNameKey<'_>) -> Result<()> { + let raw_key = catalog.as_raw_key(); + + let req = PutRequest::new() + .with_key(raw_key) + .with_value(CatalogNameValue.try_as_raw_value()?); + self.kv_backend.put(req).await?; + + Ok(()) + } + + pub async fn catalog_names(&self) -> BoxStream<'static, Result> { + let start_key = CatalogNameKey::range_start_key(); + let req = RangeRequest::new().with_prefix(start_key.as_bytes()); + + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + Arc::new(catalog_decoder), + ); + + Box::pin(stream.map(|kv| kv.map(|kv| kv.0))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_serialization() { + let key = CatalogNameKey::new("my-catalog"); + + assert_eq!(key.to_string(), "__catalog_name/my-catalog"); + + let parsed: CatalogNameKey = "__catalog_name/my-catalog".try_into().unwrap(); + + assert_eq!(key, parsed); + } +} diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs new file mode 100644 index 0000000000..2317e73e38 --- /dev/null +++ b/src/common/meta/src/key/schema_name.rs @@ -0,0 +1,142 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::sync::Arc; + +use futures::stream::BoxStream; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{self, Error, InvalidTableMetadataSnafu, Result}; +use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX}; +use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::{PutRequest, RangeRequest}; +use crate::rpc::KeyValue; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct SchemaNameKey<'a> { + pub catalog: &'a str, + pub schema: &'a str, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SchemaNameValue; + +impl<'a> SchemaNameKey<'a> { + pub fn new(catalog: &'a str, schema: &'a str) -> Self { + Self { catalog, schema } + } + + pub fn range_start_key(catalog: &str) -> String { + format!("{}/{}/", SCHEMA_NAME_KEY_PREFIX, catalog) + } +} + +impl Display for SchemaNameKey<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}/{}/{}", + SCHEMA_NAME_KEY_PREFIX, self.catalog, self.schema + ) + } +} + +impl TableMetaKey for SchemaNameKey<'_> { + fn as_raw_key(&self) -> Vec { + self.to_string().into_bytes() + } +} + +/// Decoder `KeyValue` to ({schema},()) +pub fn schema_decoder(kv: KeyValue) -> Result<(String, ())> { + let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?; + let schema_name = SchemaNameKey::try_from(str)?; + + Ok((schema_name.schema.to_string(), ())) +} + +impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> { + type Error = Error; + + fn try_from(s: &'a str) -> Result { + let captures = SCHEMA_NAME_KEY_PATTERN + .captures(s) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Illegal SchemaNameKey format: '{s}'"), + })?; + + // Safety: pass the regex check above + Ok(Self { + catalog: captures.get(1).unwrap().as_str(), + schema: captures.get(2).unwrap().as_str(), + }) + } +} + +pub struct SchemaManager { + kv_backend: KvBackendRef, +} + +impl SchemaManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Creates `SchemaNameKey`. + pub async fn create(&self, schema: SchemaNameKey<'_>) -> Result<()> { + let raw_key = schema.as_raw_key(); + let req = PutRequest::new() + .with_key(raw_key) + .with_value(SchemaNameValue.try_as_raw_value()?); + + self.kv_backend.put(req).await?; + + Ok(()) + } + + /// Returns a schema stream, it lists all schemas belong to the target `catalog`. + pub async fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result> { + let start_key = SchemaNameKey::range_start_key(catalog); + let req = RangeRequest::new().with_prefix(start_key.as_bytes()); + + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + Arc::new(schema_decoder), + ); + + Box::pin(stream.map(|kv| kv.map(|kv| kv.0))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_serialization() { + let key = SchemaNameKey::new("my-catalog", "my-schema"); + + assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema"); + + let parsed: SchemaNameKey<'_> = "__schema_name/my-catalog/my-schema".try_into().unwrap(); + + assert_eq!(key, parsed); + } +} diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 5906708c36..70fe566b5c 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -23,6 +23,7 @@ pub mod key; pub mod kv_backend; pub mod metrics; pub mod peer; +pub mod range_stream; pub mod rpc; pub mod table_name; pub mod util; diff --git a/src/common/meta/src/range_stream.rs b/src/common/meta/src/range_stream.rs new file mode 100644 index 0000000000..76b13928f1 --- /dev/null +++ b/src/common/meta/src/range_stream.rs @@ -0,0 +1,293 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures::future::BoxFuture; +use futures::{ready, FutureExt, Stream}; + +use crate::error::Result; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::{RangeRequest, RangeResponse}; +use crate::rpc::KeyValue; +use crate::util::get_next_prefix_key; + +pub type KeyValueDecoderFn = dyn Fn(KeyValue) -> Result<(K, V)> + Send + Sync; + +enum PaginationStreamState { + /// At the start of reading. + Init, + /// Decoding key value pairs. + Decoding(SimpleKeyValueDecoder), + /// Retrieving data from backend. + Reading(BoxFuture<'static, Result<(PaginationStreamFactory, Option)>>), + /// Error + Error, +} + +pub const DEFAULT_PAGE_SIZE: usize = 512; + +struct PaginationStreamFactory { + kv: KvBackendRef, + /// key is the first key for the range, If range_end is not given, the + /// request only looks up key. + pub key: Vec, + /// range_end is the upper bound on the requested range [key, range_end). + /// If range_end is '\0', the range is all keys >= key. + /// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), + /// then the range request gets all keys prefixed with key. + /// If both key and range_end are '\0', then the range request returns all + /// keys. + pub range_end: Vec, + + /// page_size is the pagination page size. + pub page_size: usize, + /// keys_only when set returns only the keys and not the values. + pub keys_only: bool, + + pub more: bool, +} + +impl PaginationStreamFactory { + pub fn new( + kv: &KvBackendRef, + key: Vec, + range_end: Vec, + page_size: usize, + keys_only: bool, + more: bool, + ) -> Self { + Self { + kv: kv.clone(), + key, + range_end, + page_size, + keys_only, + more, + } + } +} + +impl PaginationStreamFactory { + pub async fn read_next(self) -> Result<(Self, Option)> { + if self.more { + let resp = self + .kv + .range(RangeRequest { + key: self.key.clone(), + range_end: self.range_end.clone(), + limit: self.page_size as i64, + keys_only: self.keys_only, + }) + .await?; + + let key = resp + .kvs + .last() + .map(|kv| kv.key.clone()) + .unwrap_or_else(Vec::new); + + let next_key = get_next_prefix_key(&key); + + Ok(( + Self { + kv: self.kv, + key: next_key, + range_end: self.range_end, + page_size: self.page_size, + keys_only: self.keys_only, + more: resp.more, + }, + Some(resp), + )) + } else { + Ok((self, None)) + } + } +} + +pub struct PaginationStream { + state: PaginationStreamState, + decoder_fn: Arc>, + factory: Option, +} + +impl PaginationStream { + pub fn new( + kv: KvBackendRef, + req: RangeRequest, + page_size: usize, + decoder_fn: Arc>, + ) -> Self { + Self { + state: PaginationStreamState::Init, + decoder_fn, + factory: Some(PaginationStreamFactory::new( + &kv, + req.key, + req.range_end, + page_size, + req.keys_only, + true, + )), + } + } +} + +struct SimpleKeyValueDecoder { + kv: VecDeque, + decoder: Arc>, +} + +impl Iterator for SimpleKeyValueDecoder { + type Item = Result<(K, V)>; + + fn next(&mut self) -> Option { + if let Some(kv) = self.kv.pop_front() { + Some((self.decoder)(kv)) + } else { + None + } + } +} + +impl Stream for PaginationStream { + type Item = Result<(K, V)>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.state { + PaginationStreamState::Decoding(decoder) => match decoder.next() { + Some(Ok(result)) => return Poll::Ready(Some(Ok(result))), + Some(Err(e)) => { + self.state = PaginationStreamState::Error; + return Poll::Ready(Some(Err(e))); + } + None => self.state = PaginationStreamState::Init, + }, + PaginationStreamState::Init => { + let factory = self.factory.take().expect("lost factory"); + if !factory.more { + return Poll::Ready(None); + } + let fut = factory.read_next().boxed(); + self.state = PaginationStreamState::Reading(fut); + } + PaginationStreamState::Reading(f) => match ready!(f.poll_unpin(cx)) { + Ok((factory, Some(resp))) => { + self.factory = Some(factory); + let decoder = SimpleKeyValueDecoder { + kv: resp.kvs.into(), + decoder: self.decoder_fn.clone(), + }; + self.state = PaginationStreamState::Decoding(decoder); + } + Ok((factory, None)) => { + self.factory = Some(factory); + self.state = PaginationStreamState::Init; + } + Err(e) => { + self.state = PaginationStreamState::Error; + return Poll::Ready(Some(Err(e))); + } + }, + PaginationStreamState::Error => return Poll::Ready(None), // Ends the stream as error happens. + } + } + } +} + +#[cfg(test)] +mod tests { + + use std::collections::BTreeMap; + + use futures::TryStreamExt; + + use super::*; + use crate::error::{Error, Result}; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + use crate::rpc::store::PutRequest; + + fn decoder(kv: KeyValue) -> Result<(Vec, Vec)> { + Ok((kv.key.clone(), kv.value)) + } + + #[tokio::test] + async fn test_range_empty() { + let kv_store = Arc::new(MemoryKvBackend::::new()); + + let stream = PaginationStream::new( + kv_store.clone(), + RangeRequest { + key: b"a".to_vec(), + ..Default::default() + }, + DEFAULT_PAGE_SIZE, + Arc::new(decoder), + ); + let kv = stream.try_collect::>().await.unwrap(); + + assert!(kv.is_empty()); + } + + #[tokio::test] + async fn test_range() { + let kv_store = Arc::new(MemoryKvBackend::::new()); + let total = 26; + + let mut expected = BTreeMap::, ()>::new(); + for i in 0..total { + let key = vec![97 + i]; + + assert!(kv_store + .put(PutRequest { + key: key.clone(), + value: key.clone(), + ..Default::default() + }) + .await + .is_ok()); + + expected.insert(key, ()); + } + + let key = b"a".to_vec(); + let range_end = b"f".to_vec(); + + let stream = PaginationStream::new( + kv_store.clone(), + RangeRequest { + key, + range_end, + ..Default::default() + }, + 2, + Arc::new(decoder), + ); + let kv = stream + .try_collect::>() + .await + .unwrap() + .into_iter() + .map(|kv| kv.0) + .collect::>(); + + assert_eq!(vec![vec![97], vec![98], vec![99], vec![100], vec![101]], kv); + } +} diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index e1bdfdac2f..e69a2f87bd 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -612,6 +612,13 @@ impl TryFrom for CompareAndPutResponse { } impl CompareAndPutResponse { + pub fn handle(&self, f: F) -> std::result::Result + where + F: FnOnce(&Self) -> std::result::Result, + { + f(self) + } + pub fn to_proto_resp(self, header: PbResponseHeader) -> PbCompareAndPutResponse { PbCompareAndPutResponse { header: Some(header), diff --git a/src/common/meta/src/util.rs b/src/common/meta/src/util.rs index 8b8c54c1e7..80742cece5 100644 --- a/src/common/meta/src/util.rs +++ b/src/common/meta/src/util.rs @@ -27,10 +27,28 @@ pub fn get_prefix_end_key(key: &[u8]) -> Vec { vec![0] } +/// Get next prefix key of `key`. +#[inline] +pub fn get_next_prefix_key(key: &[u8]) -> Vec { + let mut next = Vec::with_capacity(key.len() + 1); + next.extend_from_slice(key); + next.push(0); + + next +} + #[cfg(test)] mod tests { use super::*; + #[test] + fn test_get_next_prefix() { + let key = b"testa"; + let mut expected = b"testa".to_vec(); + expected.push(0); + assert_eq!(expected, get_next_prefix_key(key)); + } + #[test] fn test_get_prefix() { let key = b"testa";