mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 14:52:58 +00:00
refactor: add TableInfoKey and TableRegionKey (#1865)
* refactor: add TableInfoKey and TableRegionKey * refactor: move KvBackend to common-meta * fix: resolve PR comments
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1803,6 +1803,7 @@ name = "common-meta"
|
||||
version = "0.4.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"common-catalog",
|
||||
@@ -1811,6 +1812,7 @@ dependencies = [
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"datatypes",
|
||||
"futures",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
|
||||
@@ -243,6 +243,12 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("A generic error has occurred, msg: {}", msg))]
|
||||
Generic { msg: String, location: Location },
|
||||
|
||||
#[snafu(display("Table metadata manager error: {}", source))]
|
||||
TableMetadataManager {
|
||||
source: common_meta::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -298,6 +304,7 @@ impl ErrorExt for Error {
|
||||
Error::Unimplemented { .. } | Error::NotSupported { .. } => StatusCode::Unsupported,
|
||||
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
|
||||
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
|
||||
Error::TableMetadataManager { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,18 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::Debug;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use client::{CachedMetaKvBackend, MetaKvBackend};
|
||||
use futures::Stream;
|
||||
use futures_util::StreamExt;
|
||||
pub use manager::RemoteCatalogManager;
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
mod client;
|
||||
mod manager;
|
||||
|
||||
@@ -31,59 +24,6 @@ mod manager;
|
||||
pub mod mock;
|
||||
pub mod region_alive_keeper;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Kv(pub Vec<u8>, pub Vec<u8>);
|
||||
|
||||
pub type ValueIter<'a, E> = Pin<Box<dyn Stream<Item = Result<Kv, E>> + Send + 'a>>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait KvBackend: Send + Sync {
|
||||
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error>
|
||||
where
|
||||
'a: 'b;
|
||||
|
||||
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error>;
|
||||
|
||||
/// Compare and set value of key. `expect` is the expected value, if backend's current value associated
|
||||
/// with key is the same as `expect`, the value will be updated to `val`.
|
||||
///
|
||||
/// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())`
|
||||
/// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec<u8>))`
|
||||
/// will be returned, the `Err(Vec<u8>)` indicates the current associated value of key.
|
||||
/// - If any error happens during operation, an `Err(Error)` will be returned.
|
||||
async fn compare_and_set(
|
||||
&self,
|
||||
key: &[u8],
|
||||
expect: &[u8],
|
||||
val: &[u8],
|
||||
) -> Result<Result<(), Option<Vec<u8>>>, Error>;
|
||||
|
||||
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error>;
|
||||
|
||||
async fn delete(&self, key: &[u8]) -> Result<(), Error> {
|
||||
self.delete_range(key, &[]).await
|
||||
}
|
||||
|
||||
/// Default get is implemented based on `range` method.
|
||||
async fn get(&self, key: &[u8]) -> Result<Option<Kv>, Error> {
|
||||
let mut iter = self.range(key);
|
||||
while let Some(r) = iter.next().await {
|
||||
let kv = r?;
|
||||
if kv.0 == key {
|
||||
return Ok(Some(kv));
|
||||
}
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
/// MoveValue atomically renames the key to the given updated key.
|
||||
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Error>;
|
||||
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
pub type KvBackendRef = Arc<dyn KvBackend>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait KvCacheInvalidator: Send + Sync {
|
||||
async fn invalidate_key(&self, key: &[u8]);
|
||||
@@ -93,14 +33,19 @@ pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use async_stream::stream;
|
||||
use std::any::Any;
|
||||
|
||||
use super::*;
|
||||
use async_stream::stream;
|
||||
use common_meta::kv_backend::{Kv, KvBackend, ValueIter};
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
struct MockKvBackend {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl KvBackend for MockKvBackend {
|
||||
type Error = Error;
|
||||
|
||||
fn range<'a, 'b>(&'a self, _key: &[u8]) -> ValueIter<'b, Error>
|
||||
where
|
||||
'a: 'b,
|
||||
|
||||
@@ -18,6 +18,9 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_stream::stream;
|
||||
use common_error::prelude::BoxedError;
|
||||
use common_meta::error::{Error, GetKvCacheSnafu, MetaSrvSnafu, Result};
|
||||
use common_meta::kv_backend::{Kv, KvBackend, KvBackendRef, ValueIter};
|
||||
use common_meta::rpc::store::{
|
||||
CompareAndPutRequest, DeleteRangeRequest, MoveValueRequest, PutRequest, RangeRequest,
|
||||
};
|
||||
@@ -27,9 +30,7 @@ use moka::future::{Cache, CacheBuilder};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use super::KvCacheInvalidator;
|
||||
use crate::error::{Error, GenericSnafu, MetaSrvSnafu, Result};
|
||||
use crate::metrics::{METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET};
|
||||
use crate::remote::{Kv, KvBackend, KvBackendRef, ValueIter};
|
||||
|
||||
const CACHE_MAX_CAPACITY: u64 = 10000;
|
||||
const CACHE_TTL_SECOND: u64 = 10 * 60;
|
||||
@@ -43,6 +44,8 @@ pub struct CachedMetaKvBackend {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl KvBackend for CachedMetaKvBackend {
|
||||
type Error = Error;
|
||||
|
||||
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error>
|
||||
where
|
||||
'a: 'b,
|
||||
@@ -59,8 +62,15 @@ impl KvBackend for CachedMetaKvBackend {
|
||||
self.kv_backend.get(key).await
|
||||
};
|
||||
|
||||
let schema_provider = self.cache.try_get_with_by_ref(key, init).await;
|
||||
schema_provider.map_err(|e| GenericSnafu { msg: e.to_string() }.build())
|
||||
self.cache
|
||||
.try_get_with_by_ref(key, init)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
GetKvCacheSnafu {
|
||||
err_msg: e.to_string(),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
}
|
||||
|
||||
async fn set(&self, key: &[u8], val: &[u8]) -> Result<()> {
|
||||
@@ -165,6 +175,8 @@ pub struct MetaKvBackend {
|
||||
/// comparing to `Accessor`'s list and get method.
|
||||
#[async_trait::async_trait]
|
||||
impl KvBackend for MetaKvBackend {
|
||||
type Error = Error;
|
||||
|
||||
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error>
|
||||
where
|
||||
'a: 'b,
|
||||
@@ -175,6 +187,7 @@ impl KvBackend for MetaKvBackend {
|
||||
.client
|
||||
.range(RangeRequest::new().with_prefix(key))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(MetaSrvSnafu)?;
|
||||
let kvs = resp.take_kvs();
|
||||
for mut kv in kvs.into_iter() {
|
||||
@@ -188,6 +201,7 @@ impl KvBackend for MetaKvBackend {
|
||||
.client
|
||||
.range(RangeRequest::new().with_key(key))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(MetaSrvSnafu)?;
|
||||
Ok(response
|
||||
.take_kvs()
|
||||
@@ -199,13 +213,23 @@ impl KvBackend for MetaKvBackend {
|
||||
let req = PutRequest::new()
|
||||
.with_key(key.to_vec())
|
||||
.with_value(val.to_vec());
|
||||
let _ = self.client.put(req).await.context(MetaSrvSnafu)?;
|
||||
let _ = self
|
||||
.client
|
||||
.put(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(MetaSrvSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<()> {
|
||||
let req = DeleteRangeRequest::new().with_range(key.to_vec(), end.to_vec());
|
||||
let resp = self.client.delete_range(req).await.context(MetaSrvSnafu)?;
|
||||
let resp = self
|
||||
.client
|
||||
.delete_range(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(MetaSrvSnafu)?;
|
||||
info!(
|
||||
"Delete range, key: {}, end: {}, deleted: {}",
|
||||
String::from_utf8_lossy(key),
|
||||
@@ -230,6 +254,7 @@ impl KvBackend for MetaKvBackend {
|
||||
.client
|
||||
.compare_and_put(request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(MetaSrvSnafu)?;
|
||||
if response.is_success() {
|
||||
Ok(Ok(()))
|
||||
@@ -240,7 +265,12 @@ impl KvBackend for MetaKvBackend {
|
||||
|
||||
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<()> {
|
||||
let req = MoveValueRequest::new(from_key, to_key);
|
||||
let _ = self.client.move_value(req).await.context(MetaSrvSnafu)?;
|
||||
let _ = self
|
||||
.client
|
||||
.move_value(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(MetaSrvSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ use async_stream::stream;
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::{MAX_SYS_TABLE_ID, MITO_ENGINE};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::kv_backend::{Kv, KvBackendRef};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use futures::Stream;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
@@ -35,6 +36,7 @@ use tokio::sync::Mutex;
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu,
|
||||
ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu,
|
||||
TableMetadataManagerSnafu,
|
||||
};
|
||||
use crate::helper::{
|
||||
build_catalog_prefix, build_schema_prefix, build_table_global_prefix,
|
||||
@@ -42,7 +44,6 @@ use crate::helper::{
|
||||
TableGlobalValue, TableRegionalKey, TableRegionalValue, CATALOG_KEY_PREFIX,
|
||||
};
|
||||
use crate::remote::region_alive_keeper::RegionAliveKeepers;
|
||||
use crate::remote::{Kv, KvBackendRef};
|
||||
use crate::{
|
||||
handle_system_table_request, CatalogManager, DeregisterTableRequest, RegisterSchemaRequest,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
|
||||
@@ -80,7 +81,7 @@ impl RemoteCatalogManager {
|
||||
let mut catalogs = self.backend.range(catalog_range_prefix.as_bytes());
|
||||
Box::pin(stream!({
|
||||
while let Some(r) = catalogs.next().await {
|
||||
let Kv(k, _) = r?;
|
||||
let Kv(k, _) = r.context(TableMetadataManagerSnafu)?;
|
||||
if !k.starts_with(catalog_range_prefix.as_bytes()) {
|
||||
debug!("Ignoring non-catalog key: {}", String::from_utf8_lossy(&k));
|
||||
continue;
|
||||
@@ -134,7 +135,8 @@ impl RemoteCatalogManager {
|
||||
.as_bytes()
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
info!("Created schema '{schema_key}'");
|
||||
|
||||
let catalog_key = CatalogKey {
|
||||
@@ -148,7 +150,8 @@ impl RemoteCatalogManager {
|
||||
.as_bytes()
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
info!("Created catalog '{catalog_key}");
|
||||
Ok(())
|
||||
}
|
||||
@@ -316,7 +319,8 @@ impl RemoteCatalogManager {
|
||||
table_key.as_bytes(),
|
||||
&table_value.as_bytes().context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
debug!(
|
||||
"Successfully set catalog table entry, key: {}, table value: {:?}",
|
||||
table_key, table_value
|
||||
@@ -343,7 +347,8 @@ impl RemoteCatalogManager {
|
||||
let engine_opt = self
|
||||
.backend
|
||||
.get(table_key.as_bytes())
|
||||
.await?
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map(|Kv(_, v)| {
|
||||
let TableRegionalValue {
|
||||
table_id,
|
||||
@@ -361,7 +366,10 @@ impl RemoteCatalogManager {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.backend.delete(table_key.as_bytes()).await?;
|
||||
self.backend
|
||||
.delete(table_key.as_bytes())
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
debug!(
|
||||
"Successfully deleted catalog table entry, key: {}",
|
||||
table_key
|
||||
@@ -428,7 +436,7 @@ async fn iter_remote_schemas<'a>(
|
||||
|
||||
Box::pin(stream!({
|
||||
while let Some(r) = schemas.next().await {
|
||||
let Kv(k, _) = r?;
|
||||
let Kv(k, _) = r.context(TableMetadataManagerSnafu)?;
|
||||
if !k.starts_with(schema_prefix.as_bytes()) {
|
||||
debug!("Ignoring non-schema key: {}", String::from_utf8_lossy(&k));
|
||||
continue;
|
||||
@@ -452,7 +460,7 @@ async fn iter_remote_tables<'a>(
|
||||
let mut tables = backend.range(table_prefix.as_bytes());
|
||||
Box::pin(stream!({
|
||||
while let Some(r) = tables.next().await {
|
||||
let Kv(k, v) = r?;
|
||||
let Kv(k, v) = r.context(TableMetadataManagerSnafu)?;
|
||||
if !k.starts_with(table_prefix.as_bytes()) {
|
||||
debug!("Ignoring non-table prefix: {}", String::from_utf8_lossy(&k));
|
||||
continue;
|
||||
@@ -701,7 +709,8 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
.as_bytes()
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
|
||||
Ok(true)
|
||||
@@ -720,7 +729,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
node_id: self.node_id,
|
||||
}
|
||||
.to_string();
|
||||
let Some(Kv(_, value_bytes)) = self.backend.get(old_table_key.as_bytes()).await? else {
|
||||
let Some(Kv(_, value_bytes)) = self.backend.get(old_table_key.as_bytes()).await.context(TableMetadataManagerSnafu)? else {
|
||||
return Ok(false)
|
||||
};
|
||||
let new_table_key = TableRegionalKey {
|
||||
@@ -731,10 +740,12 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
};
|
||||
self.backend
|
||||
.set(new_table_key.to_string().as_bytes(), &value_bytes)
|
||||
.await?;
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
self.backend
|
||||
.delete(old_table_key.to_string().as_bytes())
|
||||
.await?;
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -756,7 +767,12 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
let key = self
|
||||
.build_schema_key(catalog.to_string(), schema.to_string())
|
||||
.to_string();
|
||||
Ok(self.backend.get(key.as_bytes()).await?.is_some())
|
||||
Ok(self
|
||||
.backend
|
||||
.get(key.as_bytes())
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.is_some())
|
||||
}
|
||||
|
||||
async fn table(
|
||||
@@ -778,7 +794,8 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
let table_opt = self
|
||||
.backend
|
||||
.get(key.as_bytes())
|
||||
.await?
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map(|Kv(_, v)| {
|
||||
let TableRegionalValue {
|
||||
table_id,
|
||||
@@ -821,7 +838,8 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
Ok(self
|
||||
.backend
|
||||
.get(key.to_string().as_bytes())
|
||||
.await?
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.is_some())
|
||||
}
|
||||
|
||||
@@ -836,7 +854,12 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
}
|
||||
.to_string();
|
||||
|
||||
Ok(self.backend.get(key.as_bytes()).await?.is_some())
|
||||
Ok(self
|
||||
.backend
|
||||
.get(key.as_bytes())
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.is_some())
|
||||
}
|
||||
|
||||
async fn catalog_names(&self) -> Result<Vec<String>> {
|
||||
@@ -905,7 +928,8 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
.as_bytes()
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
@@ -12,20 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock as StdRwLock};
|
||||
|
||||
use async_stream::stream;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_recordbatch::RecordBatch;
|
||||
use common_telemetry::logging::info;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::vectors::StringVector;
|
||||
use serde::Serializer;
|
||||
use table::engine::{CloseTableResult, EngineContext, TableEngine};
|
||||
use table::metadata::TableId;
|
||||
use table::requests::{
|
||||
@@ -33,135 +26,6 @@ use table::requests::{
|
||||
};
|
||||
use table::test_util::MemTable;
|
||||
use table::TableRef;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
|
||||
use crate::remote::{Kv, KvBackend, ValueIter};
|
||||
|
||||
pub struct MockKvBackend {
|
||||
map: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl Default for MockKvBackend {
|
||||
fn default() -> Self {
|
||||
let catalog_value = CatalogValue {}.as_bytes().unwrap();
|
||||
let schema_value = SchemaValue {}.as_bytes().unwrap();
|
||||
|
||||
let default_catalog_key = CatalogKey {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
}
|
||||
.to_string();
|
||||
|
||||
let default_schema_key = SchemaKey {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
}
|
||||
.to_string();
|
||||
|
||||
let map = RwLock::new(BTreeMap::from([
|
||||
// create default catalog and schema
|
||||
(default_catalog_key.into(), catalog_value),
|
||||
(default_schema_key.into(), schema_value),
|
||||
]));
|
||||
Self { map }
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for MockKvBackend {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
futures::executor::block_on(async {
|
||||
let map = self.map.read().await;
|
||||
for (k, v) in map.iter() {
|
||||
f.serialize_str(&String::from_utf8_lossy(k))?;
|
||||
f.serialize_str(" -> ")?;
|
||||
f.serialize_str(&String::from_utf8_lossy(v))?;
|
||||
f.serialize_str("\n")?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl KvBackend for MockKvBackend {
|
||||
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error>
|
||||
where
|
||||
'a: 'b,
|
||||
{
|
||||
let prefix = key.to_vec();
|
||||
let prefix_string = String::from_utf8_lossy(&prefix).to_string();
|
||||
Box::pin(stream!({
|
||||
let maps = self.map.read().await.clone();
|
||||
for (k, v) in maps.range(prefix.clone()..) {
|
||||
let key_string = String::from_utf8_lossy(k).to_string();
|
||||
let matches = key_string.starts_with(&prefix_string);
|
||||
if matches {
|
||||
yield Ok(Kv(k.clone(), v.clone()))
|
||||
} else {
|
||||
info!("Stream finished");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
let mut map = self.map.write().await;
|
||||
let _ = map.insert(key.to_vec(), val.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn compare_and_set(
|
||||
&self,
|
||||
key: &[u8],
|
||||
expect: &[u8],
|
||||
val: &[u8],
|
||||
) -> Result<Result<(), Option<Vec<u8>>>, Error> {
|
||||
let mut map = self.map.write().await;
|
||||
let existing = map.entry(key.to_vec());
|
||||
match existing {
|
||||
Entry::Vacant(e) => {
|
||||
if expect.is_empty() {
|
||||
let _ = e.insert(val.to_vec());
|
||||
Ok(Ok(()))
|
||||
} else {
|
||||
Ok(Err(None))
|
||||
}
|
||||
}
|
||||
Entry::Occupied(mut existing) => {
|
||||
if existing.get() == expect {
|
||||
let _ = existing.insert(val.to_vec());
|
||||
Ok(Ok(()))
|
||||
} else {
|
||||
Ok(Err(Some(existing.get().clone())))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> {
|
||||
let mut map = self.map.write().await;
|
||||
if end.is_empty() {
|
||||
let _ = map.remove(key);
|
||||
} else {
|
||||
let start = key.to_vec();
|
||||
let end = end.to_vec();
|
||||
let range = start..end;
|
||||
|
||||
map.retain(|k, _| !range.contains(k));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<(), Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MockTableEngine {
|
||||
|
||||
@@ -22,12 +22,14 @@ mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
|
||||
use catalog::remote::mock::{MockKvBackend, MockTableEngine};
|
||||
use catalog::remote::mock::MockTableEngine;
|
||||
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
|
||||
use catalog::remote::{CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager};
|
||||
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
|
||||
use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackend;
|
||||
use datatypes::schema::RawSchema;
|
||||
use futures_util::StreamExt;
|
||||
use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef};
|
||||
@@ -37,8 +39,6 @@ mod tests {
|
||||
use tokio::time::Instant;
|
||||
|
||||
struct TestingComponents {
|
||||
#[allow(dead_code)]
|
||||
kv_backend: KvBackendRef,
|
||||
catalog_manager: Arc<RemoteCatalogManager>,
|
||||
table_engine_manager: TableEngineManagerRef,
|
||||
region_alive_keepers: Arc<RegionAliveKeepers>,
|
||||
@@ -53,7 +53,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_backend() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let backend = MockKvBackend::default();
|
||||
let backend = MemoryKvBackend::default();
|
||||
|
||||
let default_catalog_key = CatalogKey {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
@@ -92,8 +92,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cached_backend() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let backend = CachedMetaKvBackend::wrap(Arc::new(MockKvBackend::default()));
|
||||
let backend = CachedMetaKvBackend::wrap(Arc::new(MemoryKvBackend::default()));
|
||||
|
||||
let default_catalog_key = CatalogKey {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
@@ -135,9 +134,11 @@ mod tests {
|
||||
}
|
||||
|
||||
async fn prepare_components(node_id: u64) -> TestingComponents {
|
||||
let cached_backend = Arc::new(CachedMetaKvBackend::wrap(
|
||||
Arc::new(MockKvBackend::default()),
|
||||
));
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
backend.set(b"__c-greptime", b"").await.unwrap();
|
||||
backend.set(b"__s-greptime-public", b"").await.unwrap();
|
||||
|
||||
let cached_backend = Arc::new(CachedMetaKvBackend::wrap(backend));
|
||||
|
||||
let table_engine = Arc::new(MockTableEngine::default());
|
||||
let engine_manager = Arc::new(MemoryTableEngineManager::alias(
|
||||
@@ -156,7 +157,6 @@ mod tests {
|
||||
catalog_manager.start().await.unwrap();
|
||||
|
||||
TestingComponents {
|
||||
kv_backend: cached_backend,
|
||||
catalog_manager: Arc::new(catalog_manager),
|
||||
table_engine_manager: engine_manager,
|
||||
region_alive_keepers,
|
||||
|
||||
@@ -6,12 +6,14 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
api = { path = "../../api" }
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
common-catalog = { path = "../catalog" }
|
||||
common-error = { path = "../error" }
|
||||
common-runtime = { path = "../runtime" }
|
||||
common-telemetry = { path = "../telemetry" }
|
||||
common-time = { path = "../time" }
|
||||
futures.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
snafu.workspace = true
|
||||
|
||||
@@ -55,6 +55,18 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Invalid protobuf message, err: {}", err_msg))]
|
||||
InvalidProtoMsg { err_msg: String, location: Location },
|
||||
|
||||
#[snafu(display("Invalid table metadata, err: {}", err_msg))]
|
||||
InvalidTableMetadata { err_msg: String, location: Location },
|
||||
|
||||
#[snafu(display("Failed to get kv cache, err: {}", err_msg))]
|
||||
GetKvCache { err_msg: String, location: Location },
|
||||
|
||||
#[snafu(display("Failed to request MetaSrv, source: {}", source))]
|
||||
MetaSrv {
|
||||
source: BoxedError,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -65,15 +77,18 @@ impl ErrorExt for Error {
|
||||
match self {
|
||||
IllegalServerState { .. } => StatusCode::Internal,
|
||||
|
||||
SerdeJson { .. } | RouteInfoCorrupted { .. } | InvalidProtoMsg { .. } => {
|
||||
StatusCode::Unexpected
|
||||
}
|
||||
SerdeJson { .. }
|
||||
| RouteInfoCorrupted { .. }
|
||||
| InvalidProtoMsg { .. }
|
||||
| InvalidTableMetadata { .. } => StatusCode::Unexpected,
|
||||
|
||||
SendMessage { .. } => StatusCode::Internal,
|
||||
SendMessage { .. } | GetKvCache { .. } => StatusCode::Internal,
|
||||
|
||||
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => {
|
||||
StatusCode::Unexpected
|
||||
}
|
||||
|
||||
MetaSrv { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,16 +12,102 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! This mod defines all the keys used in the metadata store (Metasrv).
|
||||
//! Specifically, there are these kinds of keys:
|
||||
//!
|
||||
//! 1. Table info key: `__table_info/{table_id}`
|
||||
//! - The value is a [TableInfoValue] struct; it contains the whole table info (like column
|
||||
//! schemas).
|
||||
//! - This key is mainly used in constructing the table in Datanode and Frontend.
|
||||
//!
|
||||
//! 2. Table region key: `__table_region/{table_id}`
|
||||
//! - The value is a [TableRegionValue] struct; it contains the region distribution of the
|
||||
//! table in the Datanodes.
|
||||
//!
|
||||
//! All keys have related managers. The managers take care of the serialization and deserialization
|
||||
//! of keys and values, and the interaction with the underlying KV store backend.
|
||||
//!
|
||||
//! To simplify the managers used in struct fields and function parameters, we define a "unify"
|
||||
//! table metadata manager: [TableMetadataManager]. It contains all the managers defined above.
|
||||
//! It's recommended to just use this manager only.
|
||||
|
||||
pub mod table_info;
|
||||
pub mod table_region;
|
||||
mod table_route;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use snafu::ResultExt;
|
||||
use table_info::{TableInfoManager, TableInfoValue};
|
||||
use table_region::{TableRegionManager, TableRegionValue};
|
||||
|
||||
use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu};
|
||||
pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
|
||||
pub const REMOVED_PREFIX: &str = "__removed";
|
||||
|
||||
const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
|
||||
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";
|
||||
|
||||
pub fn to_removed_key(key: &str) -> String {
|
||||
format!("{REMOVED_PREFIX}-{key}")
|
||||
}
|
||||
|
||||
pub trait TableMetaKey {
|
||||
fn as_raw_key(&self) -> Vec<u8>;
|
||||
}
|
||||
|
||||
pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
|
||||
|
||||
pub struct TableMetadataManager {
|
||||
table_info_manager: TableInfoManager,
|
||||
table_region_manager: TableRegionManager,
|
||||
}
|
||||
|
||||
impl TableMetadataManager {
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
TableMetadataManager {
|
||||
table_info_manager: TableInfoManager::new(kv_backend.clone()),
|
||||
table_region_manager: TableRegionManager::new(kv_backend),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_info_manager(&self) -> &TableInfoManager {
|
||||
&self.table_info_manager
|
||||
}
|
||||
|
||||
pub fn table_region_manager(&self) -> &TableRegionManager {
|
||||
&self.table_region_manager
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_table_meta_value {
|
||||
( $($val_ty: ty), *) => {
|
||||
$(
|
||||
impl $val_ty {
|
||||
pub fn try_from_raw_value(raw_value: Vec<u8>) -> Result<Self> {
|
||||
let raw_value = String::from_utf8(raw_value).map_err(|e| {
|
||||
InvalidTableMetadataSnafu { err_msg: e.to_string() }.build()
|
||||
})?;
|
||||
serde_json::from_str(&raw_value).context(SerdeJsonSnafu)
|
||||
}
|
||||
|
||||
pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
|
||||
serde_json::to_string(self)
|
||||
.map(|x| x.into_bytes())
|
||||
.context(SerdeJsonSnafu)
|
||||
}
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
impl_table_meta_value! {
|
||||
TableInfoValue,
|
||||
TableRegionValue
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::key::to_removed_key;
|
||||
|
||||
230
src/common/meta/src/key/table_info.rs
Normal file
230
src/common/meta/src/key/table_info.rs
Normal file
@@ -0,0 +1,230 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
|
||||
use super::TABLE_INFO_KEY_PREFIX;
|
||||
use crate::error::Result;
|
||||
use crate::key::{to_removed_key, TableMetaKey};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
|
||||
pub struct TableInfoKey {
|
||||
table_id: TableId,
|
||||
}
|
||||
|
||||
impl TableInfoKey {
|
||||
pub fn new(table_id: TableId) -> Self {
|
||||
Self { table_id }
|
||||
}
|
||||
}
|
||||
|
||||
impl TableMetaKey for TableInfoKey {
|
||||
fn as_raw_key(&self) -> Vec<u8> {
|
||||
format!("{}/{}", TABLE_INFO_KEY_PREFIX, self.table_id).into_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct TableInfoValue {
|
||||
pub table_info: RawTableInfo,
|
||||
version: u64,
|
||||
}
|
||||
|
||||
pub struct TableInfoManager {
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
impl TableInfoManager {
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
Self { kv_backend }
|
||||
}
|
||||
|
||||
pub async fn get(&self, table_id: TableId) -> Result<Option<TableInfoValue>> {
|
||||
let key = TableInfoKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
self.kv_backend
|
||||
.get(&raw_key)
|
||||
.await?
|
||||
.map(|x| TableInfoValue::try_from_raw_value(x.1))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn compare_and_set(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
expect: Option<TableInfoValue>,
|
||||
table_info: RawTableInfo,
|
||||
) -> Result<std::result::Result<(), Option<Vec<u8>>>> {
|
||||
let key = TableInfoKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
|
||||
let (expect, version) = if let Some(x) = expect {
|
||||
(x.try_as_raw_value()?, x.version + 1)
|
||||
} else {
|
||||
(vec![], 0)
|
||||
};
|
||||
|
||||
let value = TableInfoValue {
|
||||
table_info,
|
||||
version,
|
||||
};
|
||||
let raw_value = value.try_as_raw_value()?;
|
||||
|
||||
self.kv_backend
|
||||
.compare_and_set(&raw_key, &expect, &raw_value)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn remove(&self, table_id: TableId) -> Result<()> {
|
||||
let key = TableInfoKey::new(table_id);
|
||||
let removed_key = to_removed_key(&String::from_utf8_lossy(key.as_raw_key().as_slice()));
|
||||
self.kv_backend
|
||||
.move_value(&key.as_raw_key(), removed_key.as_bytes())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
|
||||
use table::metadata::{RawTableMeta, TableIdent, TableType};
|
||||
|
||||
use super::*;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::KvBackend;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_info_manager() {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
for i in 1..=3 {
|
||||
let key = TableInfoKey::new(i).as_raw_key();
|
||||
let val = TableInfoValue {
|
||||
table_info: new_table_info(i),
|
||||
version: 1,
|
||||
}
|
||||
.try_as_raw_value()
|
||||
.unwrap();
|
||||
backend.set(&key, &val).await.unwrap();
|
||||
}
|
||||
|
||||
let manager = TableInfoManager::new(backend.clone());
|
||||
|
||||
let val = manager.get(1).await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
val,
|
||||
TableInfoValue {
|
||||
table_info: new_table_info(1),
|
||||
version: 1,
|
||||
}
|
||||
);
|
||||
assert!(manager.get(4).await.unwrap().is_none());
|
||||
|
||||
let table_info = new_table_info(4);
|
||||
let result = manager
|
||||
.compare_and_set(4, None, table_info.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.is_ok());
|
||||
|
||||
// test cas failed, the new table info is not set
|
||||
let new_table_info = new_table_info(4);
|
||||
let result = manager
|
||||
.compare_and_set(4, None, new_table_info.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let actual = TableInfoValue::try_from_raw_value(result.unwrap_err().unwrap()).unwrap();
|
||||
assert_eq!(
|
||||
actual,
|
||||
TableInfoValue {
|
||||
table_info: table_info.clone(),
|
||||
version: 0,
|
||||
}
|
||||
);
|
||||
|
||||
// test cas success
|
||||
let result = manager
|
||||
.compare_and_set(4, Some(actual), new_table_info.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.is_ok());
|
||||
|
||||
assert!(manager.remove(4).await.is_ok());
|
||||
|
||||
let kv = backend
|
||||
.get(b"__removed-__table_info/4")
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(b"__removed-__table_info/4", kv.0.as_slice());
|
||||
let value = TableInfoValue::try_from_raw_value(kv.1).unwrap();
|
||||
assert_eq!(value.table_info, new_table_info);
|
||||
assert_eq!(value.version, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_key_serde() {
|
||||
let key = TableInfoKey::new(42);
|
||||
let raw_key = key.as_raw_key();
|
||||
assert_eq!(raw_key, b"__table_info/42");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_value_serde() {
|
||||
let value = TableInfoValue {
|
||||
table_info: new_table_info(42),
|
||||
version: 1,
|
||||
};
|
||||
let serialized = value.try_as_raw_value().unwrap();
|
||||
let deserialized = TableInfoValue::try_from_raw_value(serialized).unwrap();
|
||||
assert_eq!(value, deserialized);
|
||||
}
|
||||
|
||||
fn new_table_info(table_id: TableId) -> RawTableInfo {
|
||||
let schema = Schema::new(vec![ColumnSchema::new(
|
||||
"name",
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
)]);
|
||||
|
||||
let meta = RawTableMeta {
|
||||
schema: RawSchema::from(&schema),
|
||||
engine: "mito".to_string(),
|
||||
created_on: chrono::DateTime::default(),
|
||||
primary_key_indices: vec![0, 1],
|
||||
next_column_id: 3,
|
||||
engine_options: Default::default(),
|
||||
value_indices: vec![2, 3],
|
||||
options: Default::default(),
|
||||
region_numbers: vec![1],
|
||||
};
|
||||
|
||||
RawTableInfo {
|
||||
ident: TableIdent {
|
||||
table_id,
|
||||
version: 1,
|
||||
},
|
||||
name: "table_1".to_string(),
|
||||
desc: Some("blah".to_string()),
|
||||
catalog_name: "catalog_1".to_string(),
|
||||
schema_name: "schema_1".to_string(),
|
||||
meta,
|
||||
table_type: TableType::Base,
|
||||
}
|
||||
}
|
||||
}
|
||||
190
src/common/meta/src/key/table_region.rs
Normal file
190
src/common/meta/src/key/table_region.rs
Normal file
@@ -0,0 +1,190 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use super::TABLE_REGION_KEY_PREFIX;
|
||||
use crate::error::Result;
|
||||
use crate::key::{to_removed_key, TableMetaKey};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::DatanodeId;
|
||||
|
||||
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
|
||||
|
||||
pub struct TableRegionKey {
|
||||
table_id: TableId,
|
||||
}
|
||||
|
||||
impl TableRegionKey {
|
||||
pub fn new(table_id: TableId) -> Self {
|
||||
Self { table_id }
|
||||
}
|
||||
}
|
||||
|
||||
impl TableMetaKey for TableRegionKey {
|
||||
fn as_raw_key(&self) -> Vec<u8> {
|
||||
format!("{}/{}", TABLE_REGION_KEY_PREFIX, self.table_id).into_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct TableRegionValue {
|
||||
pub region_distribution: RegionDistribution,
|
||||
version: u64,
|
||||
}
|
||||
|
||||
pub struct TableRegionManager {
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
impl TableRegionManager {
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
Self { kv_backend }
|
||||
}
|
||||
|
||||
pub async fn get(&self, table_id: TableId) -> Result<Option<TableRegionValue>> {
|
||||
let key = TableRegionKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
self.kv_backend
|
||||
.get(&raw_key)
|
||||
.await?
|
||||
.map(|x| TableRegionValue::try_from_raw_value(x.1))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn compare_and_set(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
expect: Option<TableRegionValue>,
|
||||
region_distribution: RegionDistribution,
|
||||
) -> Result<std::result::Result<(), Option<Vec<u8>>>> {
|
||||
let key = TableRegionKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
|
||||
let (expect, version) = if let Some(x) = expect {
|
||||
(x.try_as_raw_value()?, x.version + 1)
|
||||
} else {
|
||||
(vec![], 0)
|
||||
};
|
||||
|
||||
let value = TableRegionValue {
|
||||
region_distribution,
|
||||
version,
|
||||
};
|
||||
let raw_value = value.try_as_raw_value()?;
|
||||
|
||||
self.kv_backend
|
||||
.compare_and_set(&raw_key, &expect, &raw_value)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn remove(&self, table_id: TableId) -> Result<()> {
|
||||
let key = TableRegionKey::new(table_id);
|
||||
let remove_key = to_removed_key(&String::from_utf8_lossy(key.as_raw_key().as_slice()));
|
||||
self.kv_backend
|
||||
.move_value(&key.as_raw_key(), remove_key.as_bytes())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::KvBackend;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_region_manager() {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
let manager = TableRegionManager::new(backend.clone());
|
||||
|
||||
let region_distribution =
|
||||
RegionDistribution::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6])]);
|
||||
let result = manager
|
||||
.compare_and_set(1, None, region_distribution.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.is_ok());
|
||||
|
||||
let new_region_distribution =
|
||||
RegionDistribution::from([(1, vec![4, 5, 6]), (2, vec![1, 2, 3])]);
|
||||
let curr = manager
|
||||
.compare_and_set(1, None, new_region_distribution.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap_err()
|
||||
.unwrap();
|
||||
let curr = TableRegionValue::try_from_raw_value(curr).unwrap();
|
||||
assert_eq!(
|
||||
curr,
|
||||
TableRegionValue {
|
||||
region_distribution,
|
||||
version: 0
|
||||
}
|
||||
);
|
||||
|
||||
assert!(manager
|
||||
.compare_and_set(1, Some(curr), new_region_distribution.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.is_ok());
|
||||
|
||||
let value = manager.get(1).await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
value,
|
||||
TableRegionValue {
|
||||
region_distribution: new_region_distribution.clone(),
|
||||
version: 1
|
||||
}
|
||||
);
|
||||
assert!(manager.get(2).await.unwrap().is_none());
|
||||
|
||||
assert!(manager.remove(1).await.is_ok());
|
||||
|
||||
let kv = backend
|
||||
.get(b"__removed-__table_region/1")
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(b"__removed-__table_region/1", kv.0.as_slice());
|
||||
let value = TableRegionValue::try_from_raw_value(kv.1).unwrap();
|
||||
assert_eq!(value.region_distribution, new_region_distribution);
|
||||
assert_eq!(value.version, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serde() {
|
||||
let key = TableRegionKey::new(1);
|
||||
let raw_key = key.as_raw_key();
|
||||
assert_eq!(raw_key, b"__table_region/1");
|
||||
|
||||
let value = TableRegionValue {
|
||||
region_distribution: RegionDistribution::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6])]),
|
||||
version: 0,
|
||||
};
|
||||
let literal = br#"{"region_distribution":{"1":[1,2,3],"2":[4,5,6]},"version":0}"#;
|
||||
|
||||
assert_eq!(value.try_as_raw_value().unwrap(), literal);
|
||||
assert_eq!(
|
||||
TableRegionValue::try_from_raw_value(literal.to_vec()).unwrap(),
|
||||
value,
|
||||
);
|
||||
}
|
||||
}
|
||||
80
src/common/meta/src/kv_backend.rs
Normal file
80
src/common/meta/src/kv_backend.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod memory;
|
||||
|
||||
use std::any::Any;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use futures::{Stream, StreamExt};
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct Kv(pub Vec<u8>, pub Vec<u8>);
|
||||
|
||||
pub type ValueIter<'a, E> = Pin<Box<dyn Stream<Item = Result<Kv, E>> + Send + 'a>>;
|
||||
|
||||
pub type KvBackendRef = Arc<dyn KvBackend<Error = Error>>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait KvBackend: Send + Sync {
|
||||
type Error: ErrorExt;
|
||||
|
||||
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Self::Error>
|
||||
where
|
||||
'a: 'b;
|
||||
|
||||
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Self::Error>;
|
||||
|
||||
/// Compare and set value of key. `expect` is the expected value, if backend's current value associated
|
||||
/// with key is the same as `expect`, the value will be updated to `val`.
|
||||
///
|
||||
/// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())`
|
||||
/// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec<u8>))`
|
||||
/// will be returned, the `Err(Vec<u8>)` indicates the current associated value of key.
|
||||
/// - If any error happens during operation, an `Err(Error)` will be returned.
|
||||
async fn compare_and_set(
|
||||
&self,
|
||||
key: &[u8],
|
||||
expect: &[u8],
|
||||
val: &[u8],
|
||||
) -> Result<Result<(), Option<Vec<u8>>>, Self::Error>;
|
||||
|
||||
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Self::Error>;
|
||||
|
||||
async fn delete(&self, key: &[u8]) -> Result<(), Self::Error> {
|
||||
self.delete_range(key, &[]).await
|
||||
}
|
||||
|
||||
/// Default get is implemented based on `range` method.
|
||||
async fn get(&self, key: &[u8]) -> Result<Option<Kv>, Self::Error> {
|
||||
let mut iter = self.range(key);
|
||||
while let Some(r) = iter.next().await {
|
||||
let kv = r?;
|
||||
if kv.0 == key {
|
||||
return Ok(Some(kv));
|
||||
}
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
/// MoveValue atomically renames the key to the given updated key.
|
||||
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Self::Error>;
|
||||
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
197
src/common/meta/src/kv_backend/memory.rs
Normal file
197
src/common/meta/src/kv_backend/memory.rs
Normal file
@@ -0,0 +1,197 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::sync::RwLock;
|
||||
|
||||
use async_stream::stream;
|
||||
use async_trait::async_trait;
|
||||
use serde::Serializer;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::kv_backend::{Kv, KvBackend, ValueIter};
|
||||
|
||||
pub struct MemoryKvBackend {
|
||||
kvs: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl Display for MemoryKvBackend {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
let kvs = self.kvs.read().unwrap();
|
||||
for (k, v) in kvs.iter() {
|
||||
f.serialize_str(&String::from_utf8_lossy(k))?;
|
||||
f.serialize_str(" -> ")?;
|
||||
f.serialize_str(&String::from_utf8_lossy(v))?;
|
||||
f.serialize_str("\n")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for MemoryKvBackend {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
kvs: RwLock::new(BTreeMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl KvBackend for MemoryKvBackend {
|
||||
type Error = Error;
|
||||
|
||||
fn range<'a, 'b>(&'a self, prefix: &[u8]) -> ValueIter<'b, Error>
|
||||
where
|
||||
'a: 'b,
|
||||
{
|
||||
let kvs = self.kvs.read().unwrap();
|
||||
let kvs = kvs.clone();
|
||||
|
||||
let prefix = prefix.to_vec();
|
||||
Box::pin(stream!({
|
||||
for (k, v) in kvs.range(prefix.clone()..) {
|
||||
if !k.starts_with(&prefix) {
|
||||
break;
|
||||
}
|
||||
yield Ok(Kv(k.clone(), v.clone()));
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
let mut kvs = self.kvs.write().unwrap();
|
||||
let _ = kvs.insert(key.to_vec(), val.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn compare_and_set(
|
||||
&self,
|
||||
key: &[u8],
|
||||
expect: &[u8],
|
||||
val: &[u8],
|
||||
) -> Result<Result<(), Option<Vec<u8>>>, Error> {
|
||||
let key = key.to_vec();
|
||||
let val = val.to_vec();
|
||||
|
||||
let mut kvs = self.kvs.write().unwrap();
|
||||
let existed = kvs.entry(key);
|
||||
Ok(match existed {
|
||||
Entry::Vacant(e) => {
|
||||
if expect.is_empty() {
|
||||
let _ = e.insert(val);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(None)
|
||||
}
|
||||
}
|
||||
Entry::Occupied(mut existed) => {
|
||||
if existed.get() == expect {
|
||||
let _ = existed.insert(val);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Some(existed.get().clone()))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> {
|
||||
let mut kvs = self.kvs.write().unwrap();
|
||||
if end.is_empty() {
|
||||
let _ = kvs.remove(key);
|
||||
} else {
|
||||
let start = key.to_vec();
|
||||
let end = end.to_vec();
|
||||
let range = start..end;
|
||||
|
||||
kvs.retain(|k, _| !range.contains(k));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Error> {
|
||||
let mut kvs = self.kvs.write().unwrap();
|
||||
if let Some(v) = kvs.remove(from_key) {
|
||||
let _ = kvs.insert(to_key.to_vec(), v);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::TryStreamExt;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_memory_kv_backend() {
|
||||
let backend = MemoryKvBackend::default();
|
||||
|
||||
for i in 1..10 {
|
||||
let key = format!("key{}", i);
|
||||
let val = format!("val{}", i);
|
||||
assert!(backend.set(key.as_bytes(), val.as_bytes()).await.is_ok());
|
||||
}
|
||||
|
||||
let result = backend
|
||||
.compare_and_set(b"hello", b"what", b"world")
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.unwrap_err().is_none());
|
||||
|
||||
let result = backend
|
||||
.compare_and_set(b"hello", b"", b"world")
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.is_ok());
|
||||
|
||||
let result = backend
|
||||
.compare_and_set(b"hello", b"world", b"greptime")
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.is_ok());
|
||||
|
||||
let result = backend
|
||||
.compare_and_set(b"hello", b"world", b"what")
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.unwrap_err().unwrap(), b"greptime");
|
||||
|
||||
assert!(backend.delete_range(b"key1", &[]).await.is_ok());
|
||||
assert!(backend.delete_range(b"key3", b"key9").await.is_ok());
|
||||
|
||||
assert!(backend.move_value(b"key9", b"key10").await.is_ok());
|
||||
|
||||
assert_eq!(
|
||||
backend.to_string(),
|
||||
r#"hello -> greptime
|
||||
key10 -> val9
|
||||
key2 -> val2
|
||||
"#
|
||||
);
|
||||
|
||||
let range = backend.range(b"key").try_collect::<Vec<_>>().await.unwrap();
|
||||
assert_eq!(range.len(), 2);
|
||||
assert_eq!(range[0], Kv(b"key10".to_vec(), b"val9".to_vec()));
|
||||
assert_eq!(range[1], Kv(b"key2".to_vec(), b"val2".to_vec()));
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@ pub mod heartbeat;
|
||||
pub mod ident;
|
||||
pub mod instruction;
|
||||
pub mod key;
|
||||
pub mod kv_backend;
|
||||
pub mod peer;
|
||||
pub mod rpc;
|
||||
pub mod table_name;
|
||||
|
||||
@@ -19,14 +19,14 @@ use std::sync::Arc;
|
||||
use api::v1::CreateTableExpr;
|
||||
use catalog::error::{
|
||||
self as catalog_err, InternalSnafu, InvalidCatalogValueSnafu, InvalidSystemTableDefSnafu,
|
||||
Result as CatalogResult, UnimplementedSnafu,
|
||||
Result as CatalogResult, TableMetadataManagerSnafu, UnimplementedSnafu,
|
||||
};
|
||||
use catalog::helper::{
|
||||
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
|
||||
TableGlobalKey, TableGlobalValue,
|
||||
};
|
||||
use catalog::information_schema::InformationSchemaProvider;
|
||||
use catalog::remote::{Kv, KvBackendRef, KvCacheInvalidatorRef};
|
||||
use catalog::remote::KvCacheInvalidatorRef;
|
||||
use catalog::{
|
||||
CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
|
||||
RegisterTableRequest, RenameTableRequest,
|
||||
@@ -34,6 +34,7 @@ use catalog::{
|
||||
use client::client_manager::DatanodeClients;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME};
|
||||
use common_error::prelude::BoxedError;
|
||||
use common_meta::kv_backend::{Kv, KvBackendRef};
|
||||
use common_meta::table_name::TableName;
|
||||
use common_telemetry::warn;
|
||||
use futures::StreamExt;
|
||||
@@ -254,7 +255,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
let mut iter = self.backend.range(key.as_bytes());
|
||||
let mut res = HashSet::new();
|
||||
while let Some(r) = iter.next().await {
|
||||
let Kv(k, _) = r?;
|
||||
let Kv(k, _) = r.context(TableMetadataManagerSnafu)?;
|
||||
let catalog_key = String::from_utf8_lossy(&k);
|
||||
if let Ok(key) = CatalogKey::parse(catalog_key.as_ref()) {
|
||||
let _ = res.insert(key.catalog_name);
|
||||
@@ -270,7 +271,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
let mut iter = self.backend.range(key.as_bytes());
|
||||
let mut res = HashSet::new();
|
||||
while let Some(r) = iter.next().await {
|
||||
let Kv(k, _) = r?;
|
||||
let Kv(k, _) = r.context(TableMetadataManagerSnafu)?;
|
||||
let key =
|
||||
SchemaKey::parse(String::from_utf8_lossy(&k)).context(InvalidCatalogValueSnafu)?;
|
||||
let _ = res.insert(key.schema_name);
|
||||
@@ -287,7 +288,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
let iter = self.backend.range(key.as_bytes());
|
||||
let result = iter
|
||||
.map(|r| {
|
||||
let Kv(k, _) = r?;
|
||||
let Kv(k, _) = r.context(TableMetadataManagerSnafu)?;
|
||||
let key = TableGlobalKey::parse(String::from_utf8_lossy(&k))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
Ok(key.table_name)
|
||||
@@ -304,7 +305,12 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
}
|
||||
.to_string();
|
||||
|
||||
Ok(self.backend.get(key.as_bytes()).await?.is_some())
|
||||
Ok(self
|
||||
.backend
|
||||
.get(key.as_bytes())
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.is_some())
|
||||
}
|
||||
|
||||
async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
|
||||
@@ -314,7 +320,12 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
}
|
||||
.to_string();
|
||||
|
||||
Ok(self.backend().get(schema_key.as_bytes()).await?.is_some())
|
||||
Ok(self
|
||||
.backend()
|
||||
.get(schema_key.as_bytes())
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.is_some())
|
||||
}
|
||||
|
||||
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
|
||||
@@ -326,7 +337,8 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
Ok(self
|
||||
.backend()
|
||||
.get(table_global_key.to_string().as_bytes())
|
||||
.await?
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.is_some())
|
||||
}
|
||||
|
||||
@@ -362,7 +374,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
schema_name: schema.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
};
|
||||
let Some(kv) = self.backend().get(table_global_key.to_string().as_bytes()).await? else {
|
||||
let Some(kv) = self.backend().get(table_global_key.to_string().as_bytes()).await.context(TableMetadataManagerSnafu)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let v = TableGlobalValue::from_bytes(kv.1).context(InvalidCatalogValueSnafu)?;
|
||||
|
||||
@@ -565,6 +565,12 @@ pub enum Error {
|
||||
value: String,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Table metadata manager error: {}", source))]
|
||||
TableMetadataManager {
|
||||
source: common_meta::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -688,6 +694,7 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::WriteParquet { source, .. } => source.status_code(),
|
||||
Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments,
|
||||
Error::TableMetadataManager { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -179,10 +179,10 @@ mod tests {
|
||||
use catalog::helper::{
|
||||
CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue,
|
||||
};
|
||||
use catalog::remote::mock::MockKvBackend;
|
||||
use catalog::remote::{KvBackend, KvBackendRef};
|
||||
use client::client_manager::DatanodeClients;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::{KvBackend, KvBackendRef};
|
||||
use datatypes::prelude::{ConcreteDataType, VectorRef};
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema};
|
||||
use datatypes::vectors::Int32Vector;
|
||||
@@ -193,7 +193,7 @@ mod tests {
|
||||
use crate::table::test::create_partition_rule_manager;
|
||||
|
||||
async fn prepare_mocked_backend() -> KvBackendRef {
|
||||
let backend = Arc::new(MockKvBackend::default());
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
let default_catalog = CatalogKey {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
|
||||
@@ -55,7 +55,9 @@ use table::Table;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::catalog::FrontendCatalogManager;
|
||||
use crate::error::{self, FindDatanodeSnafu, FindTableRouteSnafu, Result};
|
||||
use crate::error::{
|
||||
self, FindDatanodeSnafu, FindTableRouteSnafu, Result, TableMetadataManagerSnafu,
|
||||
};
|
||||
use crate::instance::distributed::inserter::DistInserter;
|
||||
use crate::table::delete::to_grpc_delete_request;
|
||||
use crate::table::scan::{DatanodeInstance, TableScanPlan};
|
||||
@@ -256,7 +258,7 @@ impl DistTable {
|
||||
.backend()
|
||||
.get(key.to_string().as_bytes())
|
||||
.await
|
||||
.context(error::CatalogSnafu)?;
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
Ok(if let Some(raw) = raw {
|
||||
Some(TableGlobalValue::from_bytes(raw.1).context(error::CatalogEntrySerdeSnafu)?)
|
||||
} else {
|
||||
@@ -274,7 +276,7 @@ impl DistTable {
|
||||
.backend()
|
||||
.set(key.to_string().as_bytes(), &value)
|
||||
.await
|
||||
.context(error::CatalogSnafu)
|
||||
.context(TableMetadataManagerSnafu)
|
||||
}
|
||||
|
||||
async fn delete_table_global_value(&self, key: TableGlobalKey) -> Result<()> {
|
||||
@@ -282,7 +284,7 @@ impl DistTable {
|
||||
.backend()
|
||||
.delete(key.to_string().as_bytes())
|
||||
.await
|
||||
.context(error::CatalogSnafu)
|
||||
.context(TableMetadataManagerSnafu)
|
||||
}
|
||||
|
||||
async fn move_table_route_value(
|
||||
@@ -313,7 +315,7 @@ impl DistTable {
|
||||
.backend()
|
||||
.move_value(old_key.as_bytes(), new_key.as_bytes())
|
||||
.await
|
||||
.context(error::CatalogSnafu)?;
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
self.catalog_manager
|
||||
.partition_manager()
|
||||
|
||||
@@ -18,9 +18,10 @@ use std::time::Duration;
|
||||
|
||||
use api::v1::meta::Peer;
|
||||
use catalog::helper::TableGlobalKey;
|
||||
use catalog::remote::{CachedMetaKvBackend, Kv};
|
||||
use catalog::remote::CachedMetaKvBackend;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::kv_backend::Kv;
|
||||
use common_meta::rpc::router::TableRoute;
|
||||
use common_meta::table_name::TableName;
|
||||
use common_meta::RegionIdent;
|
||||
|
||||
Reference in New Issue
Block a user