diff --git a/Cargo.lock b/Cargo.lock index 51dc20eb6d..ad11c98c85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1803,6 +1803,7 @@ name = "common-meta" version = "0.4.0" dependencies = [ "api", + "async-stream", "async-trait", "chrono", "common-catalog", @@ -1811,6 +1812,7 @@ dependencies = [ "common-telemetry", "common-time", "datatypes", + "futures", "serde", "serde_json", "snafu", diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 34143793ea..ffbdc79f0f 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -243,6 +243,12 @@ pub enum Error { #[snafu(display("A generic error has occurred, msg: {}", msg))] Generic { msg: String, location: Location }, + + #[snafu(display("Table metadata manager error: {}", source))] + TableMetadataManager { + source: common_meta::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -298,6 +304,7 @@ impl ErrorExt for Error { Error::Unimplemented { .. } | Error::NotSupported { .. } => StatusCode::Unsupported, Error::QueryAccessDenied { .. } => StatusCode::AccessDenied, Error::Datafusion { .. } => StatusCode::EngineExecuteQuery, + Error::TableMetadataManager { source, .. } => source.status_code(), } } diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs index e82dd32715..a69c8b28a8 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/remote.rs @@ -12,18 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; -use std::fmt::Debug; -use std::pin::Pin; use std::sync::Arc; pub use client::{CachedMetaKvBackend, MetaKvBackend}; -use futures::Stream; -use futures_util::StreamExt; pub use manager::RemoteCatalogManager; -use crate::error::Error; - mod client; mod manager; @@ -31,59 +24,6 @@ mod manager; pub mod mock; pub mod region_alive_keeper; -#[derive(Debug, Clone)] -pub struct Kv(pub Vec, pub Vec); - -pub type ValueIter<'a, E> = Pin> + Send + 'a>>; - -#[async_trait::async_trait] -pub trait KvBackend: Send + Sync { - fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error> - where - 'a: 'b; - - async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error>; - - /// Compare and set value of key. `expect` is the expected value, if backend's current value associated - /// with key is the same as `expect`, the value will be updated to `val`. - /// - /// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())` - /// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec))` - /// will be returned, the `Err(Vec)` indicates the current associated value of key. - /// - If any error happens during operation, an `Err(Error)` will be returned. - async fn compare_and_set( - &self, - key: &[u8], - expect: &[u8], - val: &[u8], - ) -> Result>>, Error>; - - async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error>; - - async fn delete(&self, key: &[u8]) -> Result<(), Error> { - self.delete_range(key, &[]).await - } - - /// Default get is implemented based on `range` method. - async fn get(&self, key: &[u8]) -> Result, Error> { - let mut iter = self.range(key); - while let Some(r) = iter.next().await { - let kv = r?; - if kv.0 == key { - return Ok(Some(kv)); - } - } - return Ok(None); - } - - /// MoveValue atomically renames the key to the given updated key. - async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Error>; - - fn as_any(&self) -> &dyn Any; -} - -pub type KvBackendRef = Arc; - #[async_trait::async_trait] pub trait KvCacheInvalidator: Send + Sync { async fn invalidate_key(&self, key: &[u8]); @@ -93,14 +33,19 @@ pub type KvCacheInvalidatorRef = Arc; #[cfg(test)] mod tests { - use async_stream::stream; + use std::any::Any; - use super::*; + use async_stream::stream; + use common_meta::kv_backend::{Kv, KvBackend, ValueIter}; + + use crate::error::Error; struct MockKvBackend {} #[async_trait::async_trait] impl KvBackend for MockKvBackend { + type Error = Error; + fn range<'a, 'b>(&'a self, _key: &[u8]) -> ValueIter<'b, Error> where 'a: 'b, diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/remote/client.rs index 66e4709323..9d87fa37a9 100644 --- a/src/catalog/src/remote/client.rs +++ b/src/catalog/src/remote/client.rs @@ -18,6 +18,9 @@ use std::sync::Arc; use std::time::Duration; use async_stream::stream; +use common_error::prelude::BoxedError; +use common_meta::error::{Error, GetKvCacheSnafu, MetaSrvSnafu, Result}; +use common_meta::kv_backend::{Kv, KvBackend, KvBackendRef, ValueIter}; use common_meta::rpc::store::{ CompareAndPutRequest, DeleteRangeRequest, MoveValueRequest, PutRequest, RangeRequest, }; @@ -27,9 +30,7 @@ use moka::future::{Cache, CacheBuilder}; use snafu::ResultExt; use super::KvCacheInvalidator; -use crate::error::{Error, GenericSnafu, MetaSrvSnafu, Result}; use crate::metrics::{METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET}; -use crate::remote::{Kv, KvBackend, KvBackendRef, ValueIter}; const CACHE_MAX_CAPACITY: u64 = 10000; const CACHE_TTL_SECOND: u64 = 10 * 60; @@ -43,6 +44,8 @@ pub struct CachedMetaKvBackend { #[async_trait::async_trait] impl KvBackend for CachedMetaKvBackend { + type Error = Error; + fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error> where 'a: 'b, @@ -59,8 +62,15 @@ impl KvBackend for CachedMetaKvBackend { self.kv_backend.get(key).await }; - let schema_provider = self.cache.try_get_with_by_ref(key, init).await; - schema_provider.map_err(|e| GenericSnafu { msg: e.to_string() }.build()) + self.cache + .try_get_with_by_ref(key, init) + .await + .map_err(|e| { + GetKvCacheSnafu { + err_msg: e.to_string(), + } + .build() + }) } async fn set(&self, key: &[u8], val: &[u8]) -> Result<()> { @@ -165,6 +175,8 @@ pub struct MetaKvBackend { /// comparing to `Accessor`'s list and get method. #[async_trait::async_trait] impl KvBackend for MetaKvBackend { + type Error = Error; + fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error> where 'a: 'b, @@ -175,6 +187,7 @@ impl KvBackend for MetaKvBackend { .client .range(RangeRequest::new().with_prefix(key)) .await + .map_err(BoxedError::new) .context(MetaSrvSnafu)?; let kvs = resp.take_kvs(); for mut kv in kvs.into_iter() { @@ -188,6 +201,7 @@ impl KvBackend for MetaKvBackend { .client .range(RangeRequest::new().with_key(key)) .await + .map_err(BoxedError::new) .context(MetaSrvSnafu)?; Ok(response .take_kvs() @@ -199,13 +213,23 @@ impl KvBackend for MetaKvBackend { let req = PutRequest::new() .with_key(key.to_vec()) .with_value(val.to_vec()); - let _ = self.client.put(req).await.context(MetaSrvSnafu)?; + let _ = self + .client + .put(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu)?; Ok(()) } async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<()> { let req = DeleteRangeRequest::new().with_range(key.to_vec(), end.to_vec()); - let resp = self.client.delete_range(req).await.context(MetaSrvSnafu)?; + let resp = self + .client + .delete_range(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu)?; info!( "Delete range, key: {}, end: {}, deleted: {}", String::from_utf8_lossy(key), @@ -230,6 +254,7 @@ impl KvBackend for MetaKvBackend { .client .compare_and_put(request) .await + .map_err(BoxedError::new) .context(MetaSrvSnafu)?; if response.is_success() { Ok(Ok(())) @@ -240,7 +265,12 @@ impl KvBackend for MetaKvBackend { async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<()> { let req = MoveValueRequest::new(from_key, to_key); - let _ = self.client.move_value(req).await.context(MetaSrvSnafu)?; + let _ = self + .client + .move_value(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu)?; Ok(()) } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 127be693e8..0085aaf2ee 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -21,6 +21,7 @@ use async_stream::stream; use async_trait::async_trait; use common_catalog::consts::{MAX_SYS_TABLE_ID, MITO_ENGINE}; use common_meta::ident::TableIdent; +use common_meta::kv_backend::{Kv, KvBackendRef}; use common_telemetry::{debug, error, info, warn}; use futures::Stream; use futures_util::{StreamExt, TryStreamExt}; @@ -35,6 +36,7 @@ use tokio::sync::Mutex; use crate::error::{ CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu, + TableMetadataManagerSnafu, }; use crate::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, @@ -42,7 +44,6 @@ use crate::helper::{ TableGlobalValue, TableRegionalKey, TableRegionalValue, CATALOG_KEY_PREFIX, }; use crate::remote::region_alive_keeper::RegionAliveKeepers; -use crate::remote::{Kv, KvBackendRef}; use crate::{ handle_system_table_request, CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, @@ -80,7 +81,7 @@ impl RemoteCatalogManager { let mut catalogs = self.backend.range(catalog_range_prefix.as_bytes()); Box::pin(stream!({ while let Some(r) = catalogs.next().await { - let Kv(k, _) = r?; + let Kv(k, _) = r.context(TableMetadataManagerSnafu)?; if !k.starts_with(catalog_range_prefix.as_bytes()) { debug!("Ignoring non-catalog key: {}", String::from_utf8_lossy(&k)); continue; @@ -134,7 +135,8 @@ impl RemoteCatalogManager { .as_bytes() .context(InvalidCatalogValueSnafu)?, ) - .await?; + .await + .context(TableMetadataManagerSnafu)?; info!("Created schema '{schema_key}'"); let catalog_key = CatalogKey { @@ -148,7 +150,8 @@ impl RemoteCatalogManager { .as_bytes() .context(InvalidCatalogValueSnafu)?, ) - .await?; + .await + .context(TableMetadataManagerSnafu)?; info!("Created catalog '{catalog_key}"); Ok(()) } @@ -316,7 +319,8 @@ impl RemoteCatalogManager { table_key.as_bytes(), &table_value.as_bytes().context(InvalidCatalogValueSnafu)?, ) - .await?; + .await + .context(TableMetadataManagerSnafu)?; debug!( "Successfully set catalog table entry, key: {}, table value: {:?}", table_key, table_value @@ -343,7 +347,8 @@ impl RemoteCatalogManager { let engine_opt = self .backend .get(table_key.as_bytes()) - .await? + .await + .context(TableMetadataManagerSnafu)? .map(|Kv(_, v)| { let TableRegionalValue { table_id, @@ -361,7 +366,10 @@ impl RemoteCatalogManager { return Ok(None); }; - self.backend.delete(table_key.as_bytes()).await?; + self.backend + .delete(table_key.as_bytes()) + .await + .context(TableMetadataManagerSnafu)?; debug!( "Successfully deleted catalog table entry, key: {}", table_key @@ -428,7 +436,7 @@ async fn iter_remote_schemas<'a>( Box::pin(stream!({ while let Some(r) = schemas.next().await { - let Kv(k, _) = r?; + let Kv(k, _) = r.context(TableMetadataManagerSnafu)?; if !k.starts_with(schema_prefix.as_bytes()) { debug!("Ignoring non-schema key: {}", String::from_utf8_lossy(&k)); continue; @@ -452,7 +460,7 @@ async fn iter_remote_tables<'a>( let mut tables = backend.range(table_prefix.as_bytes()); Box::pin(stream!({ while let Some(r) = tables.next().await { - let Kv(k, v) = r?; + let Kv(k, v) = r.context(TableMetadataManagerSnafu)?; if !k.starts_with(table_prefix.as_bytes()) { debug!("Ignoring non-table prefix: {}", String::from_utf8_lossy(&k)); continue; @@ -701,7 +709,8 @@ impl CatalogManager for RemoteCatalogManager { .as_bytes() .context(InvalidCatalogValueSnafu)?, ) - .await?; + .await + .context(TableMetadataManagerSnafu)?; increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0); Ok(true) @@ -720,7 +729,7 @@ impl CatalogManager for RemoteCatalogManager { node_id: self.node_id, } .to_string(); - let Some(Kv(_, value_bytes)) = self.backend.get(old_table_key.as_bytes()).await? else { + let Some(Kv(_, value_bytes)) = self.backend.get(old_table_key.as_bytes()).await.context(TableMetadataManagerSnafu)? else { return Ok(false) }; let new_table_key = TableRegionalKey { @@ -731,10 +740,12 @@ impl CatalogManager for RemoteCatalogManager { }; self.backend .set(new_table_key.to_string().as_bytes(), &value_bytes) - .await?; + .await + .context(TableMetadataManagerSnafu)?; self.backend .delete(old_table_key.to_string().as_bytes()) - .await?; + .await + .context(TableMetadataManagerSnafu)?; Ok(true) } @@ -756,7 +767,12 @@ impl CatalogManager for RemoteCatalogManager { let key = self .build_schema_key(catalog.to_string(), schema.to_string()) .to_string(); - Ok(self.backend.get(key.as_bytes()).await?.is_some()) + Ok(self + .backend + .get(key.as_bytes()) + .await + .context(TableMetadataManagerSnafu)? + .is_some()) } async fn table( @@ -778,7 +794,8 @@ impl CatalogManager for RemoteCatalogManager { let table_opt = self .backend .get(key.as_bytes()) - .await? + .await + .context(TableMetadataManagerSnafu)? .map(|Kv(_, v)| { let TableRegionalValue { table_id, @@ -821,7 +838,8 @@ impl CatalogManager for RemoteCatalogManager { Ok(self .backend .get(key.to_string().as_bytes()) - .await? + .await + .context(TableMetadataManagerSnafu)? .is_some()) } @@ -836,7 +854,12 @@ impl CatalogManager for RemoteCatalogManager { } .to_string(); - Ok(self.backend.get(key.as_bytes()).await?.is_some()) + Ok(self + .backend + .get(key.as_bytes()) + .await + .context(TableMetadataManagerSnafu)? + .is_some()) } async fn catalog_names(&self) -> Result> { @@ -905,7 +928,8 @@ impl CatalogManager for RemoteCatalogManager { .as_bytes() .context(InvalidCatalogValueSnafu)?, ) - .await?; + .await + .context(TableMetadataManagerSnafu)?; increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0); Ok(false) } diff --git a/src/catalog/src/remote/mock.rs b/src/catalog/src/remote/mock.rs index 248ee4a430..b321f8818e 100644 --- a/src/catalog/src/remote/mock.rs +++ b/src/catalog/src/remote/mock.rs @@ -12,20 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; -use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, HashMap}; -use std::fmt::{Display, Formatter}; +use std::collections::HashMap; use std::sync::{Arc, RwLock as StdRwLock}; -use async_stream::stream; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_recordbatch::RecordBatch; -use common_telemetry::logging::info; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::StringVector; -use serde::Serializer; use table::engine::{CloseTableResult, EngineContext, TableEngine}; use table::metadata::TableId; use table::requests::{ @@ -33,135 +26,6 @@ use table::requests::{ }; use table::test_util::MemTable; use table::TableRef; -use tokio::sync::RwLock; - -use crate::error::Error; -use crate::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; -use crate::remote::{Kv, KvBackend, ValueIter}; - -pub struct MockKvBackend { - map: RwLock, Vec>>, -} - -impl Default for MockKvBackend { - fn default() -> Self { - let catalog_value = CatalogValue {}.as_bytes().unwrap(); - let schema_value = SchemaValue {}.as_bytes().unwrap(); - - let default_catalog_key = CatalogKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - } - .to_string(); - - let default_schema_key = SchemaKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - } - .to_string(); - - let map = RwLock::new(BTreeMap::from([ - // create default catalog and schema - (default_catalog_key.into(), catalog_value), - (default_schema_key.into(), schema_value), - ])); - Self { map } - } -} - -impl Display for MockKvBackend { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - futures::executor::block_on(async { - let map = self.map.read().await; - for (k, v) in map.iter() { - f.serialize_str(&String::from_utf8_lossy(k))?; - f.serialize_str(" -> ")?; - f.serialize_str(&String::from_utf8_lossy(v))?; - f.serialize_str("\n")?; - } - Ok(()) - }) - } -} - -#[async_trait::async_trait] -impl KvBackend for MockKvBackend { - fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error> - where - 'a: 'b, - { - let prefix = key.to_vec(); - let prefix_string = String::from_utf8_lossy(&prefix).to_string(); - Box::pin(stream!({ - let maps = self.map.read().await.clone(); - for (k, v) in maps.range(prefix.clone()..) { - let key_string = String::from_utf8_lossy(k).to_string(); - let matches = key_string.starts_with(&prefix_string); - if matches { - yield Ok(Kv(k.clone(), v.clone())) - } else { - info!("Stream finished"); - return; - } - } - })) - } - - async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> { - let mut map = self.map.write().await; - let _ = map.insert(key.to_vec(), val.to_vec()); - Ok(()) - } - - async fn compare_and_set( - &self, - key: &[u8], - expect: &[u8], - val: &[u8], - ) -> Result>>, Error> { - let mut map = self.map.write().await; - let existing = map.entry(key.to_vec()); - match existing { - Entry::Vacant(e) => { - if expect.is_empty() { - let _ = e.insert(val.to_vec()); - Ok(Ok(())) - } else { - Ok(Err(None)) - } - } - Entry::Occupied(mut existing) => { - if existing.get() == expect { - let _ = existing.insert(val.to_vec()); - Ok(Ok(())) - } else { - Ok(Err(Some(existing.get().clone()))) - } - } - } - } - - async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> { - let mut map = self.map.write().await; - if end.is_empty() { - let _ = map.remove(key); - } else { - let start = key.to_vec(); - let end = end.to_vec(); - let range = start..end; - - map.retain(|k, _| !range.contains(k)); - } - Ok(()) - } - - async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<(), Error> { - unimplemented!() - } - - fn as_any(&self) -> &dyn Any { - self - } -} #[derive(Default)] pub struct MockTableEngine { diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 909af77af6..fb0d3fd598 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -22,12 +22,14 @@ mod tests { use std::time::Duration; use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; - use catalog::remote::mock::{MockKvBackend, MockTableEngine}; + use catalog::remote::mock::MockTableEngine; use catalog::remote::region_alive_keeper::RegionAliveKeepers; - use catalog::remote::{CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager}; + use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::ident::TableIdent; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::KvBackend; use datatypes::schema::RawSchema; use futures_util::StreamExt; use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef}; @@ -37,8 +39,6 @@ mod tests { use tokio::time::Instant; struct TestingComponents { - #[allow(dead_code)] - kv_backend: KvBackendRef, catalog_manager: Arc, table_engine_manager: TableEngineManagerRef, region_alive_keepers: Arc, @@ -53,7 +53,7 @@ mod tests { #[tokio::test] async fn test_backend() { common_telemetry::init_default_ut_logging(); - let backend = MockKvBackend::default(); + let backend = MemoryKvBackend::default(); let default_catalog_key = CatalogKey { catalog_name: DEFAULT_CATALOG_NAME.to_string(), @@ -92,8 +92,7 @@ mod tests { #[tokio::test] async fn test_cached_backend() { - common_telemetry::init_default_ut_logging(); - let backend = CachedMetaKvBackend::wrap(Arc::new(MockKvBackend::default())); + let backend = CachedMetaKvBackend::wrap(Arc::new(MemoryKvBackend::default())); let default_catalog_key = CatalogKey { catalog_name: DEFAULT_CATALOG_NAME.to_string(), @@ -135,9 +134,11 @@ mod tests { } async fn prepare_components(node_id: u64) -> TestingComponents { - let cached_backend = Arc::new(CachedMetaKvBackend::wrap( - Arc::new(MockKvBackend::default()), - )); + let backend = Arc::new(MemoryKvBackend::default()); + backend.set(b"__c-greptime", b"").await.unwrap(); + backend.set(b"__s-greptime-public", b"").await.unwrap(); + + let cached_backend = Arc::new(CachedMetaKvBackend::wrap(backend)); let table_engine = Arc::new(MockTableEngine::default()); let engine_manager = Arc::new(MemoryTableEngineManager::alias( @@ -156,7 +157,6 @@ mod tests { catalog_manager.start().await.unwrap(); TestingComponents { - kv_backend: cached_backend, catalog_manager: Arc::new(catalog_manager), table_engine_manager: engine_manager, region_alive_keepers, diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index e91a9bbbad..26bee6c672 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -6,12 +6,14 @@ license.workspace = true [dependencies] api = { path = "../../api" } +async-stream.workspace = true async-trait.workspace = true common-catalog = { path = "../catalog" } common-error = { path = "../error" } common-runtime = { path = "../runtime" } common-telemetry = { path = "../telemetry" } common-time = { path = "../time" } +futures.workspace = true serde.workspace = true serde_json.workspace = true snafu.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index a75234fd92..804b69dd6e 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -55,6 +55,18 @@ pub enum Error { #[snafu(display("Invalid protobuf message, err: {}", err_msg))] InvalidProtoMsg { err_msg: String, location: Location }, + + #[snafu(display("Invalid table metadata, err: {}", err_msg))] + InvalidTableMetadata { err_msg: String, location: Location }, + + #[snafu(display("Failed to get kv cache, err: {}", err_msg))] + GetKvCache { err_msg: String, location: Location }, + + #[snafu(display("Failed to request MetaSrv, source: {}", source))] + MetaSrv { + source: BoxedError, + location: Location, + }, } pub type Result = std::result::Result; @@ -65,15 +77,18 @@ impl ErrorExt for Error { match self { IllegalServerState { .. } => StatusCode::Internal, - SerdeJson { .. } | RouteInfoCorrupted { .. } | InvalidProtoMsg { .. } => { - StatusCode::Unexpected - } + SerdeJson { .. } + | RouteInfoCorrupted { .. } + | InvalidProtoMsg { .. } + | InvalidTableMetadata { .. } => StatusCode::Unexpected, - SendMessage { .. } => StatusCode::Internal, + SendMessage { .. } | GetKvCache { .. } => StatusCode::Internal, EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => { StatusCode::Unexpected } + + MetaSrv { source, .. } => source.status_code(), } } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 86ec38cc12..ff029c8b75 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -12,16 +12,102 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! This mod defines all the keys used in the metadata store (Metasrv). +//! Specifically, there are these kinds of keys: +//! +//! 1. Table info key: `__table_info/{table_id}` +//! - The value is a [TableInfoValue] struct; it contains the whole table info (like column +//! schemas). +//! - This key is mainly used in constructing the table in Datanode and Frontend. +//! +//! 2. Table region key: `__table_region/{table_id}` +//! - The value is a [TableRegionValue] struct; it contains the region distribution of the +//! table in the Datanodes. +//! +//! All keys have related managers. The managers take care of the serialization and deserialization +//! of keys and values, and the interaction with the underlying KV store backend. +//! +//! To simplify the managers used in struct fields and function parameters, we define a "unify" +//! table metadata manager: [TableMetadataManager]. It contains all the managers defined above. +//! It's recommended to just use this manager only. + +pub mod table_info; +pub mod table_region; mod table_route; +use std::sync::Arc; + +use snafu::ResultExt; +use table_info::{TableInfoManager, TableInfoValue}; +use table_region::{TableRegionManager, TableRegionValue}; + +use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX}; +use crate::kv_backend::KvBackendRef; pub const REMOVED_PREFIX: &str = "__removed"; +const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; +const TABLE_REGION_KEY_PREFIX: &str = "__table_region"; + pub fn to_removed_key(key: &str) -> String { format!("{REMOVED_PREFIX}-{key}") } +pub trait TableMetaKey { + fn as_raw_key(&self) -> Vec; +} + +pub type TableMetadataManagerRef = Arc; + +pub struct TableMetadataManager { + table_info_manager: TableInfoManager, + table_region_manager: TableRegionManager, +} + +impl TableMetadataManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + TableMetadataManager { + table_info_manager: TableInfoManager::new(kv_backend.clone()), + table_region_manager: TableRegionManager::new(kv_backend), + } + } + + pub fn table_info_manager(&self) -> &TableInfoManager { + &self.table_info_manager + } + + pub fn table_region_manager(&self) -> &TableRegionManager { + &self.table_region_manager + } +} + +macro_rules! impl_table_meta_value { + ( $($val_ty: ty), *) => { + $( + impl $val_ty { + pub fn try_from_raw_value(raw_value: Vec) -> Result { + let raw_value = String::from_utf8(raw_value).map_err(|e| { + InvalidTableMetadataSnafu { err_msg: e.to_string() }.build() + })?; + serde_json::from_str(&raw_value).context(SerdeJsonSnafu) + } + + pub fn try_as_raw_value(&self) -> Result> { + serde_json::to_string(self) + .map(|x| x.into_bytes()) + .context(SerdeJsonSnafu) + } + } + )* + } +} + +impl_table_meta_value! { + TableInfoValue, + TableRegionValue +} + #[cfg(test)] mod tests { use crate::key::to_removed_key; diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs new file mode 100644 index 0000000000..1920863436 --- /dev/null +++ b/src/common/meta/src/key/table_info.rs @@ -0,0 +1,230 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; +use table::metadata::{RawTableInfo, TableId}; + +use super::TABLE_INFO_KEY_PREFIX; +use crate::error::Result; +use crate::key::{to_removed_key, TableMetaKey}; +use crate::kv_backend::KvBackendRef; + +pub struct TableInfoKey { + table_id: TableId, +} + +impl TableInfoKey { + pub fn new(table_id: TableId) -> Self { + Self { table_id } + } +} + +impl TableMetaKey for TableInfoKey { + fn as_raw_key(&self) -> Vec { + format!("{}/{}", TABLE_INFO_KEY_PREFIX, self.table_id).into_bytes() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TableInfoValue { + pub table_info: RawTableInfo, + version: u64, +} + +pub struct TableInfoManager { + kv_backend: KvBackendRef, +} + +impl TableInfoManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + pub async fn get(&self, table_id: TableId) -> Result> { + let key = TableInfoKey::new(table_id); + let raw_key = key.as_raw_key(); + self.kv_backend + .get(&raw_key) + .await? + .map(|x| TableInfoValue::try_from_raw_value(x.1)) + .transpose() + } + + pub async fn compare_and_set( + &self, + table_id: TableId, + expect: Option, + table_info: RawTableInfo, + ) -> Result>>> { + let key = TableInfoKey::new(table_id); + let raw_key = key.as_raw_key(); + + let (expect, version) = if let Some(x) = expect { + (x.try_as_raw_value()?, x.version + 1) + } else { + (vec![], 0) + }; + + let value = TableInfoValue { + table_info, + version, + }; + let raw_value = value.try_as_raw_value()?; + + self.kv_backend + .compare_and_set(&raw_key, &expect, &raw_value) + .await + } + + pub async fn remove(&self, table_id: TableId) -> Result<()> { + let key = TableInfoKey::new(table_id); + let removed_key = to_removed_key(&String::from_utf8_lossy(key.as_raw_key().as_slice())); + self.kv_backend + .move_value(&key.as_raw_key(), removed_key.as_bytes()) + .await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, RawSchema, Schema}; + use table::metadata::{RawTableMeta, TableIdent, TableType}; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + + #[tokio::test] + async fn test_table_info_manager() { + let backend = Arc::new(MemoryKvBackend::default()); + + for i in 1..=3 { + let key = TableInfoKey::new(i).as_raw_key(); + let val = TableInfoValue { + table_info: new_table_info(i), + version: 1, + } + .try_as_raw_value() + .unwrap(); + backend.set(&key, &val).await.unwrap(); + } + + let manager = TableInfoManager::new(backend.clone()); + + let val = manager.get(1).await.unwrap().unwrap(); + assert_eq!( + val, + TableInfoValue { + table_info: new_table_info(1), + version: 1, + } + ); + assert!(manager.get(4).await.unwrap().is_none()); + + let table_info = new_table_info(4); + let result = manager + .compare_and_set(4, None, table_info.clone()) + .await + .unwrap(); + assert!(result.is_ok()); + + // test cas failed, the new table info is not set + let new_table_info = new_table_info(4); + let result = manager + .compare_and_set(4, None, new_table_info.clone()) + .await + .unwrap(); + let actual = TableInfoValue::try_from_raw_value(result.unwrap_err().unwrap()).unwrap(); + assert_eq!( + actual, + TableInfoValue { + table_info: table_info.clone(), + version: 0, + } + ); + + // test cas success + let result = manager + .compare_and_set(4, Some(actual), new_table_info.clone()) + .await + .unwrap(); + assert!(result.is_ok()); + + assert!(manager.remove(4).await.is_ok()); + + let kv = backend + .get(b"__removed-__table_info/4") + .await + .unwrap() + .unwrap(); + assert_eq!(b"__removed-__table_info/4", kv.0.as_slice()); + let value = TableInfoValue::try_from_raw_value(kv.1).unwrap(); + assert_eq!(value.table_info, new_table_info); + assert_eq!(value.version, 1); + } + + #[test] + fn test_key_serde() { + let key = TableInfoKey::new(42); + let raw_key = key.as_raw_key(); + assert_eq!(raw_key, b"__table_info/42"); + } + + #[test] + fn test_value_serde() { + let value = TableInfoValue { + table_info: new_table_info(42), + version: 1, + }; + let serialized = value.try_as_raw_value().unwrap(); + let deserialized = TableInfoValue::try_from_raw_value(serialized).unwrap(); + assert_eq!(value, deserialized); + } + + fn new_table_info(table_id: TableId) -> RawTableInfo { + let schema = Schema::new(vec![ColumnSchema::new( + "name", + ConcreteDataType::string_datatype(), + true, + )]); + + let meta = RawTableMeta { + schema: RawSchema::from(&schema), + engine: "mito".to_string(), + created_on: chrono::DateTime::default(), + primary_key_indices: vec![0, 1], + next_column_id: 3, + engine_options: Default::default(), + value_indices: vec![2, 3], + options: Default::default(), + region_numbers: vec![1], + }; + + RawTableInfo { + ident: TableIdent { + table_id, + version: 1, + }, + name: "table_1".to_string(), + desc: Some("blah".to_string()), + catalog_name: "catalog_1".to_string(), + schema_name: "schema_1".to_string(), + meta, + table_type: TableType::Base, + } + } +} diff --git a/src/common/meta/src/key/table_region.rs b/src/common/meta/src/key/table_region.rs new file mode 100644 index 0000000000..7d6254d3fb --- /dev/null +++ b/src/common/meta/src/key/table_region.rs @@ -0,0 +1,190 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; +use store_api::storage::RegionNumber; +use table::metadata::TableId; + +use super::TABLE_REGION_KEY_PREFIX; +use crate::error::Result; +use crate::key::{to_removed_key, TableMetaKey}; +use crate::kv_backend::KvBackendRef; +use crate::DatanodeId; + +pub type RegionDistribution = BTreeMap>; + +pub struct TableRegionKey { + table_id: TableId, +} + +impl TableRegionKey { + pub fn new(table_id: TableId) -> Self { + Self { table_id } + } +} + +impl TableMetaKey for TableRegionKey { + fn as_raw_key(&self) -> Vec { + format!("{}/{}", TABLE_REGION_KEY_PREFIX, self.table_id).into_bytes() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TableRegionValue { + pub region_distribution: RegionDistribution, + version: u64, +} + +pub struct TableRegionManager { + kv_backend: KvBackendRef, +} + +impl TableRegionManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + pub async fn get(&self, table_id: TableId) -> Result> { + let key = TableRegionKey::new(table_id); + let raw_key = key.as_raw_key(); + self.kv_backend + .get(&raw_key) + .await? + .map(|x| TableRegionValue::try_from_raw_value(x.1)) + .transpose() + } + + pub async fn compare_and_set( + &self, + table_id: TableId, + expect: Option, + region_distribution: RegionDistribution, + ) -> Result>>> { + let key = TableRegionKey::new(table_id); + let raw_key = key.as_raw_key(); + + let (expect, version) = if let Some(x) = expect { + (x.try_as_raw_value()?, x.version + 1) + } else { + (vec![], 0) + }; + + let value = TableRegionValue { + region_distribution, + version, + }; + let raw_value = value.try_as_raw_value()?; + + self.kv_backend + .compare_and_set(&raw_key, &expect, &raw_value) + .await + } + + pub async fn remove(&self, table_id: TableId) -> Result<()> { + let key = TableRegionKey::new(table_id); + let remove_key = to_removed_key(&String::from_utf8_lossy(key.as_raw_key().as_slice())); + self.kv_backend + .move_value(&key.as_raw_key(), remove_key.as_bytes()) + .await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + + #[tokio::test] + async fn test_table_region_manager() { + let backend = Arc::new(MemoryKvBackend::default()); + let manager = TableRegionManager::new(backend.clone()); + + let region_distribution = + RegionDistribution::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6])]); + let result = manager + .compare_and_set(1, None, region_distribution.clone()) + .await + .unwrap(); + assert!(result.is_ok()); + + let new_region_distribution = + RegionDistribution::from([(1, vec![4, 5, 6]), (2, vec![1, 2, 3])]); + let curr = manager + .compare_and_set(1, None, new_region_distribution.clone()) + .await + .unwrap() + .unwrap_err() + .unwrap(); + let curr = TableRegionValue::try_from_raw_value(curr).unwrap(); + assert_eq!( + curr, + TableRegionValue { + region_distribution, + version: 0 + } + ); + + assert!(manager + .compare_and_set(1, Some(curr), new_region_distribution.clone()) + .await + .unwrap() + .is_ok()); + + let value = manager.get(1).await.unwrap().unwrap(); + assert_eq!( + value, + TableRegionValue { + region_distribution: new_region_distribution.clone(), + version: 1 + } + ); + assert!(manager.get(2).await.unwrap().is_none()); + + assert!(manager.remove(1).await.is_ok()); + + let kv = backend + .get(b"__removed-__table_region/1") + .await + .unwrap() + .unwrap(); + assert_eq!(b"__removed-__table_region/1", kv.0.as_slice()); + let value = TableRegionValue::try_from_raw_value(kv.1).unwrap(); + assert_eq!(value.region_distribution, new_region_distribution); + assert_eq!(value.version, 1); + } + + #[test] + fn test_serde() { + let key = TableRegionKey::new(1); + let raw_key = key.as_raw_key(); + assert_eq!(raw_key, b"__table_region/1"); + + let value = TableRegionValue { + region_distribution: RegionDistribution::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6])]), + version: 0, + }; + let literal = br#"{"region_distribution":{"1":[1,2,3],"2":[4,5,6]},"version":0}"#; + + assert_eq!(value.try_as_raw_value().unwrap(), literal); + assert_eq!( + TableRegionValue::try_from_raw_value(literal.to_vec()).unwrap(), + value, + ); + } +} diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs new file mode 100644 index 0000000000..81ff2a3cb3 --- /dev/null +++ b/src/common/meta/src/kv_backend.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod memory; + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use common_error::ext::ErrorExt; +use futures::{Stream, StreamExt}; + +use crate::error::Error; + +#[derive(Debug, Clone, PartialEq)] +pub struct Kv(pub Vec, pub Vec); + +pub type ValueIter<'a, E> = Pin> + Send + 'a>>; + +pub type KvBackendRef = Arc>; + +#[async_trait] +pub trait KvBackend: Send + Sync { + type Error: ErrorExt; + + fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Self::Error> + where + 'a: 'b; + + async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Self::Error>; + + /// Compare and set value of key. `expect` is the expected value, if backend's current value associated + /// with key is the same as `expect`, the value will be updated to `val`. + /// + /// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())` + /// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec))` + /// will be returned, the `Err(Vec)` indicates the current associated value of key. + /// - If any error happens during operation, an `Err(Error)` will be returned. + async fn compare_and_set( + &self, + key: &[u8], + expect: &[u8], + val: &[u8], + ) -> Result>>, Self::Error>; + + async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Self::Error>; + + async fn delete(&self, key: &[u8]) -> Result<(), Self::Error> { + self.delete_range(key, &[]).await + } + + /// Default get is implemented based on `range` method. + async fn get(&self, key: &[u8]) -> Result, Self::Error> { + let mut iter = self.range(key); + while let Some(r) = iter.next().await { + let kv = r?; + if kv.0 == key { + return Ok(Some(kv)); + } + } + return Ok(None); + } + + /// MoveValue atomically renames the key to the given updated key. + async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Self::Error>; + + fn as_any(&self) -> &dyn Any; +} diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs new file mode 100644 index 0000000000..6108120e16 --- /dev/null +++ b/src/common/meta/src/kv_backend/memory.rs @@ -0,0 +1,197 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::fmt::{Display, Formatter}; +use std::sync::RwLock; + +use async_stream::stream; +use async_trait::async_trait; +use serde::Serializer; + +use crate::error::Error; +use crate::kv_backend::{Kv, KvBackend, ValueIter}; + +pub struct MemoryKvBackend { + kvs: RwLock, Vec>>, +} + +impl Display for MemoryKvBackend { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let kvs = self.kvs.read().unwrap(); + for (k, v) in kvs.iter() { + f.serialize_str(&String::from_utf8_lossy(k))?; + f.serialize_str(" -> ")?; + f.serialize_str(&String::from_utf8_lossy(v))?; + f.serialize_str("\n")?; + } + Ok(()) + } +} + +impl Default for MemoryKvBackend { + fn default() -> Self { + Self { + kvs: RwLock::new(BTreeMap::new()), + } + } +} + +#[async_trait] +impl KvBackend for MemoryKvBackend { + type Error = Error; + + fn range<'a, 'b>(&'a self, prefix: &[u8]) -> ValueIter<'b, Error> + where + 'a: 'b, + { + let kvs = self.kvs.read().unwrap(); + let kvs = kvs.clone(); + + let prefix = prefix.to_vec(); + Box::pin(stream!({ + for (k, v) in kvs.range(prefix.clone()..) { + if !k.starts_with(&prefix) { + break; + } + yield Ok(Kv(k.clone(), v.clone())); + } + })) + } + + async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> { + let mut kvs = self.kvs.write().unwrap(); + let _ = kvs.insert(key.to_vec(), val.to_vec()); + Ok(()) + } + + async fn compare_and_set( + &self, + key: &[u8], + expect: &[u8], + val: &[u8], + ) -> Result>>, Error> { + let key = key.to_vec(); + let val = val.to_vec(); + + let mut kvs = self.kvs.write().unwrap(); + let existed = kvs.entry(key); + Ok(match existed { + Entry::Vacant(e) => { + if expect.is_empty() { + let _ = e.insert(val); + Ok(()) + } else { + Err(None) + } + } + Entry::Occupied(mut existed) => { + if existed.get() == expect { + let _ = existed.insert(val); + Ok(()) + } else { + Err(Some(existed.get().clone())) + } + } + }) + } + + async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> { + let mut kvs = self.kvs.write().unwrap(); + if end.is_empty() { + let _ = kvs.remove(key); + } else { + let start = key.to_vec(); + let end = end.to_vec(); + let range = start..end; + + kvs.retain(|k, _| !range.contains(k)); + } + Ok(()) + } + + async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Error> { + let mut kvs = self.kvs.write().unwrap(); + if let Some(v) = kvs.remove(from_key) { + let _ = kvs.insert(to_key.to_vec(), v); + } + Ok(()) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[cfg(test)] +mod tests { + use futures::TryStreamExt; + + use super::*; + + #[tokio::test] + async fn test_memory_kv_backend() { + let backend = MemoryKvBackend::default(); + + for i in 1..10 { + let key = format!("key{}", i); + let val = format!("val{}", i); + assert!(backend.set(key.as_bytes(), val.as_bytes()).await.is_ok()); + } + + let result = backend + .compare_and_set(b"hello", b"what", b"world") + .await + .unwrap(); + assert!(result.unwrap_err().is_none()); + + let result = backend + .compare_and_set(b"hello", b"", b"world") + .await + .unwrap(); + assert!(result.is_ok()); + + let result = backend + .compare_and_set(b"hello", b"world", b"greptime") + .await + .unwrap(); + assert!(result.is_ok()); + + let result = backend + .compare_and_set(b"hello", b"world", b"what") + .await + .unwrap(); + assert_eq!(result.unwrap_err().unwrap(), b"greptime"); + + assert!(backend.delete_range(b"key1", &[]).await.is_ok()); + assert!(backend.delete_range(b"key3", b"key9").await.is_ok()); + + assert!(backend.move_value(b"key9", b"key10").await.is_ok()); + + assert_eq!( + backend.to_string(), + r#"hello -> greptime +key10 -> val9 +key2 -> val2 +"# + ); + + let range = backend.range(b"key").try_collect::>().await.unwrap(); + assert_eq!(range.len(), 2); + assert_eq!(range[0], Kv(b"key10".to_vec(), b"val9".to_vec())); + assert_eq!(range[1], Kv(b"key2".to_vec(), b"val2".to_vec())); + } +} diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 7659bfed2c..bd099570a3 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -17,6 +17,7 @@ pub mod heartbeat; pub mod ident; pub mod instruction; pub mod key; +pub mod kv_backend; pub mod peer; pub mod rpc; pub mod table_name; diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 69aa5cf0cc..5973f66211 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -19,14 +19,14 @@ use std::sync::Arc; use api::v1::CreateTableExpr; use catalog::error::{ self as catalog_err, InternalSnafu, InvalidCatalogValueSnafu, InvalidSystemTableDefSnafu, - Result as CatalogResult, UnimplementedSnafu, + Result as CatalogResult, TableMetadataManagerSnafu, UnimplementedSnafu, }; use catalog::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue, }; use catalog::information_schema::InformationSchemaProvider; -use catalog::remote::{Kv, KvBackendRef, KvCacheInvalidatorRef}; +use catalog::remote::KvCacheInvalidatorRef; use catalog::{ CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, @@ -34,6 +34,7 @@ use catalog::{ use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME}; use common_error::prelude::BoxedError; +use common_meta::kv_backend::{Kv, KvBackendRef}; use common_meta::table_name::TableName; use common_telemetry::warn; use futures::StreamExt; @@ -254,7 +255,7 @@ impl CatalogManager for FrontendCatalogManager { let mut iter = self.backend.range(key.as_bytes()); let mut res = HashSet::new(); while let Some(r) = iter.next().await { - let Kv(k, _) = r?; + let Kv(k, _) = r.context(TableMetadataManagerSnafu)?; let catalog_key = String::from_utf8_lossy(&k); if let Ok(key) = CatalogKey::parse(catalog_key.as_ref()) { let _ = res.insert(key.catalog_name); @@ -270,7 +271,7 @@ impl CatalogManager for FrontendCatalogManager { let mut iter = self.backend.range(key.as_bytes()); let mut res = HashSet::new(); while let Some(r) = iter.next().await { - let Kv(k, _) = r?; + let Kv(k, _) = r.context(TableMetadataManagerSnafu)?; let key = SchemaKey::parse(String::from_utf8_lossy(&k)).context(InvalidCatalogValueSnafu)?; let _ = res.insert(key.schema_name); @@ -287,7 +288,7 @@ impl CatalogManager for FrontendCatalogManager { let iter = self.backend.range(key.as_bytes()); let result = iter .map(|r| { - let Kv(k, _) = r?; + let Kv(k, _) = r.context(TableMetadataManagerSnafu)?; let key = TableGlobalKey::parse(String::from_utf8_lossy(&k)) .context(InvalidCatalogValueSnafu)?; Ok(key.table_name) @@ -304,7 +305,12 @@ impl CatalogManager for FrontendCatalogManager { } .to_string(); - Ok(self.backend.get(key.as_bytes()).await?.is_some()) + Ok(self + .backend + .get(key.as_bytes()) + .await + .context(TableMetadataManagerSnafu)? + .is_some()) } async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult { @@ -314,7 +320,12 @@ impl CatalogManager for FrontendCatalogManager { } .to_string(); - Ok(self.backend().get(schema_key.as_bytes()).await?.is_some()) + Ok(self + .backend() + .get(schema_key.as_bytes()) + .await + .context(TableMetadataManagerSnafu)? + .is_some()) } async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult { @@ -326,7 +337,8 @@ impl CatalogManager for FrontendCatalogManager { Ok(self .backend() .get(table_global_key.to_string().as_bytes()) - .await? + .await + .context(TableMetadataManagerSnafu)? .is_some()) } @@ -362,7 +374,7 @@ impl CatalogManager for FrontendCatalogManager { schema_name: schema.to_string(), table_name: table_name.to_string(), }; - let Some(kv) = self.backend().get(table_global_key.to_string().as_bytes()).await? else { + let Some(kv) = self.backend().get(table_global_key.to_string().as_bytes()).await.context(TableMetadataManagerSnafu)? else { return Ok(None); }; let v = TableGlobalValue::from_bytes(kv.1).context(InvalidCatalogValueSnafu)?; diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 415bb3f3ac..046a132ec3 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -565,6 +565,12 @@ pub enum Error { value: String, location: Location, }, + + #[snafu(display("Table metadata manager error: {}", source))] + TableMetadataManager { + source: common_meta::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -688,6 +694,7 @@ impl ErrorExt for Error { Error::WriteParquet { source, .. } => source.status_code(), Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments, + Error::TableMetadataManager { source, .. } => source.status_code(), } } diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index 87761e3ebe..5d00f6e17c 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -179,10 +179,10 @@ mod tests { use catalog::helper::{ CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, }; - use catalog::remote::mock::MockKvBackend; - use catalog::remote::{KvBackend, KvBackendRef}; use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::{KvBackend, KvBackendRef}; use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema}; use datatypes::vectors::Int32Vector; @@ -193,7 +193,7 @@ mod tests { use crate::table::test::create_partition_rule_manager; async fn prepare_mocked_backend() -> KvBackendRef { - let backend = Arc::new(MockKvBackend::default()); + let backend = Arc::new(MemoryKvBackend::default()); let default_catalog = CatalogKey { catalog_name: DEFAULT_CATALOG_NAME.to_string(), diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 4fac281fc1..32949a58ad 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -55,7 +55,9 @@ use table::Table; use tokio::sync::RwLock; use crate::catalog::FrontendCatalogManager; -use crate::error::{self, FindDatanodeSnafu, FindTableRouteSnafu, Result}; +use crate::error::{ + self, FindDatanodeSnafu, FindTableRouteSnafu, Result, TableMetadataManagerSnafu, +}; use crate::instance::distributed::inserter::DistInserter; use crate::table::delete::to_grpc_delete_request; use crate::table::scan::{DatanodeInstance, TableScanPlan}; @@ -256,7 +258,7 @@ impl DistTable { .backend() .get(key.to_string().as_bytes()) .await - .context(error::CatalogSnafu)?; + .context(TableMetadataManagerSnafu)?; Ok(if let Some(raw) = raw { Some(TableGlobalValue::from_bytes(raw.1).context(error::CatalogEntrySerdeSnafu)?) } else { @@ -274,7 +276,7 @@ impl DistTable { .backend() .set(key.to_string().as_bytes(), &value) .await - .context(error::CatalogSnafu) + .context(TableMetadataManagerSnafu) } async fn delete_table_global_value(&self, key: TableGlobalKey) -> Result<()> { @@ -282,7 +284,7 @@ impl DistTable { .backend() .delete(key.to_string().as_bytes()) .await - .context(error::CatalogSnafu) + .context(TableMetadataManagerSnafu) } async fn move_table_route_value( @@ -313,7 +315,7 @@ impl DistTable { .backend() .move_value(old_key.as_bytes(), new_key.as_bytes()) .await - .context(error::CatalogSnafu)?; + .context(TableMetadataManagerSnafu)?; self.catalog_manager .partition_manager() diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 2bde845075..621fbb25dc 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -18,9 +18,10 @@ use std::time::Duration; use api::v1::meta::Peer; use catalog::helper::TableGlobalKey; -use catalog::remote::{CachedMetaKvBackend, Kv}; +use catalog::remote::CachedMetaKvBackend; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::ident::TableIdent; +use common_meta::kv_backend::Kv; use common_meta::rpc::router::TableRoute; use common_meta::table_name::TableName; use common_meta::RegionIdent;