mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-16 02:02:56 +00:00
feat: remote catalog (#315)
* chore: refactor dir for local catalog manager * refactor: CatalogProvider returns Result * refactor: SchemaProvider returns Result * feat: add kv operations to remote catalog * chore: refactor some code * feat: impl catalog initialization * feat: add register table and register system table function * refactor: add table_info method for Table trait * chore: add some tests * chore: add register schema test * chore: fix build issue after rebase onto develop * refactor: mock to separate file * build: failed to compile * fix: use a container struct to bridge KvBackend and Accessor trait * feat: upgrade opendal to 0.17 * test: add more tests * chore: add catalog name and schema name to table info * chore: add catalog name and schema name to table info * chore: rebase onto develop * refactor: common-catalog crate * chore: refactor dir for local catalog manager * refactor: CatalogProvider returns Result * refactor: SchemaProvider returns Result * feat: add kv operations to remote catalog * chore: refactor some code * feat: impl catalog initialization * feat: add register table and register system table function * refactor: add table_info method for Table trait * chore: add some tests * chore: add register schema test * chore: fix build issue after rebase onto develop * refactor: mock to separate file * build: failed to compile * fix: use a container struct to bridge KvBackend and Accessor trait * feat: upgrade opendal to 0.17 * test: add more tests * chore: add catalog name and schema name to table info * chore: add catalog name and schema name to table info * chore: rebase onto develop * refactor: common-catalog crate * refactor: remove remote catalog related files * fix: compilation * feat: add table version to TableKey * feat: add node id to TableValue * fix: some CR comments * chore: change async fn create_expr_to_request to sync * fix: add backtrace to errors * fix: code style * refactor: merge refactor/catalog-crate * feat: table key with version * feat: impl KvBackend for MetaClient * fix: integrate metaclient * fix: catalog use local table info as baseline * fix: sync metsrv * fix: wip * fix: update remote catalog on register and deregister * refactor: CatalogProvider * refactor: CatalogManager * fix: catalog key filtering * fix: pass some test * refactor: catalog iterating * fix: CatalogManager::table also requires both catalog_name and schema_name * chore: merge develop * chore: merge catalog crate * fix: adapt to recent meta-client api change * feat: databode lease * feat: remote catalog (#356) * chore: refactor dir for local catalog manager * refactor: CatalogProvider returns Result * refactor: SchemaProvider returns Result * feat: add kv operations to remote catalog * chore: refactor some code * feat: impl catalog initialization * feat: add register table and register system table function * refactor: add table_info method for Table trait * chore: add some tests * chore: add register schema test * chore: fix build issue after rebase onto develop * refactor: mock to separate file * build: failed to compile * fix: use a container struct to bridge KvBackend and Accessor trait * feat: upgrade opendal to 0.17 * test: add more tests * chore: add catalog name and schema name to table info * chore: add catalog name and schema name to table info * chore: rebase onto develop * refactor: common-catalog crate * chore: refactor dir for local catalog manager * refactor: CatalogProvider returns Result * refactor: SchemaProvider returns Result * feat: add kv operations to remote catalog * chore: refactor some code * feat: impl catalog initialization * feat: add register table and register system table function * refactor: add table_info method for Table trait * chore: add some tests * chore: add register schema test * chore: fix build issue after rebase onto develop * refactor: mock to separate file * build: failed to compile * fix: use a container struct to bridge KvBackend and Accessor trait * feat: upgrade opendal to 0.17 * test: add more tests * chore: add catalog name and schema name to table info * chore: add catalog name and schema name to table info * chore: rebase onto develop * refactor: common-catalog crate * refactor: remove remote catalog related files * fix: compilation * feat: add table version to TableKey * feat: add node id to TableValue * fix: some CR comments * chore: change async fn create_expr_to_request to sync * fix: add backtrace to errors * fix: code style * refactor: merge refactor/catalog-crate * feat: table key with version * feat: impl KvBackend for MetaClient * fix: integrate metaclient * fix: catalog use local table info as baseline * fix: sync metsrv * fix: wip * fix: update remote catalog on register and deregister * refactor: CatalogProvider * refactor: CatalogManager * fix: catalog key filtering * fix: pass some test * refactor: catalog iterating * fix: CatalogManager::table also requires both catalog_name and schema_name * chore: merge develop * chore: merge catalog crate * fix: adapt to recent meta-client api change * feat: datanode heartbeat (#355) * feat: add heartbeat task to instance * feat: add node_id datanode opts * fix: use real node id in heartbeat and meta client * feat: distribute table in frontend * test: distribute read demo * test: distribute read demo * test: distribute read demo * add write spliter * fix: node id changed to u64 * feat: datanode uses remote catalog implementation * dist insert integrate table * feat: specify region ids on creating table (#359) * fix: compiling issues * feat: datanode lease (#354) * Some glue code about dist_insert * fix: correctly wrap string value with quotes * feat: create route * feat: frontend catalog (#362) * feat: integrate catalog to frontend * feat: preserve partition rule on create * fix: print tables on start * chore: log in create route * test: distribute read demo * feat: support metasrv addr command line options * feat: optimize DataNodeInstance creation (#368) * chore: remove unnecessary changes * chore: revert changes to src/api * chore: revert changes to src/datanode/src/server.rs * chore: remove opendal backend * chore: optimize imports * chore: revert changes to instance and region ids * refactor: MetaKvBackend range * fix: remove some wrap * refactor: initiation of catalog * fix: next range request start key * fix: mock delete range * refactor: simplify range response handling Co-authored-by: jiachun <jiachun_fjc@163.com> Co-authored-by: luofucong <luofucong@greptime.com> Co-authored-by: fys <1113014250@qq.com> Co-authored-by: Jiachun Feng <jiachun_feng@proton.me>
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -659,11 +659,14 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
name = "catalog"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-grpc",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
@@ -675,6 +678,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"lazy_static",
|
||||
"log-store",
|
||||
"meta-client",
|
||||
"object-store",
|
||||
"opendal",
|
||||
"regex",
|
||||
|
||||
@@ -5,10 +5,13 @@ edition = "2021"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
api = { path = "../api" }
|
||||
arc-swap = "1.0"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
common-catalog = { path = "../common/catalog" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-grpc = { path = "../common/grpc" }
|
||||
common-query = { path = "../common/query" }
|
||||
common-recordbatch = { path = "../common/recordbatch" }
|
||||
common-runtime = { path = "../common/runtime" }
|
||||
@@ -21,6 +24,7 @@ datatypes = { path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
lazy_static = "1.4"
|
||||
meta-client = { path = "../meta-client" }
|
||||
opendal = "0.17"
|
||||
regex = "1.6"
|
||||
serde = "1.0"
|
||||
|
||||
@@ -126,6 +126,15 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Local and remote catalog data are inconsistent, msg: {}", msg))]
|
||||
CatalogStateInconsistent { msg: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to perform metasrv operation, source: {}", source))]
|
||||
MetaSrv {
|
||||
#[snafu(backtrace)]
|
||||
source: meta_client::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -138,7 +147,8 @@ impl ErrorExt for Error {
|
||||
| Error::TableNotFound { .. }
|
||||
| Error::IllegalManagerState { .. }
|
||||
| Error::CatalogNotFound { .. }
|
||||
| Error::InvalidEntryType { .. } => StatusCode::Unexpected,
|
||||
| Error::InvalidEntryType { .. }
|
||||
| Error::CatalogStateInconsistent { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::SystemCatalog { .. }
|
||||
| Error::EmptyValue
|
||||
@@ -156,9 +166,9 @@ impl ErrorExt for Error {
|
||||
| Error::CreateSystemCatalog { source, .. }
|
||||
| Error::InsertTableRecord { source, .. }
|
||||
| Error::OpenTable { source, .. }
|
||||
| Error::CreateTable { source, .. }
|
||||
| Error::SystemCatalogTableScan { source } => source.status_code(),
|
||||
|
||||
| Error::CreateTable { source, .. } => source.status_code(),
|
||||
Error::MetaSrv { source, .. } => source.status_code(),
|
||||
Error::SystemCatalogTableScan { source } => source.status_code(),
|
||||
Error::SystemCatalogTableScanExec { source } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ pub use crate::schema::{SchemaProvider, SchemaProviderRef};
|
||||
|
||||
pub mod error;
|
||||
pub mod local;
|
||||
pub mod remote;
|
||||
pub mod schema;
|
||||
pub mod system;
|
||||
pub mod tables;
|
||||
|
||||
94
src/catalog/src/remote.rs
Normal file
94
src/catalog/src/remote.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
use std::fmt::Debug;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use client::MetaKvBackend;
|
||||
use futures::Stream;
|
||||
use futures_util::StreamExt;
|
||||
pub use manager::{RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider};
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
mod client;
|
||||
mod manager;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Kv(pub Vec<u8>, pub Vec<u8>);
|
||||
|
||||
pub type ValueIter<'a, E> = Pin<Box<dyn Stream<Item = Result<Kv, E>> + Send + 'a>>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait KvBackend: Send + Sync {
|
||||
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, crate::error::Error>
|
||||
where
|
||||
'a: 'b;
|
||||
|
||||
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), crate::error::Error>;
|
||||
|
||||
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), crate::error::Error>;
|
||||
|
||||
async fn delete(&self, key: &[u8]) -> Result<(), Error> {
|
||||
self.delete_range(key, &[]).await
|
||||
}
|
||||
|
||||
/// Default get is implemented based on `range` method.
|
||||
async fn get(&self, key: &[u8]) -> Result<Option<Kv>, Error> {
|
||||
let mut iter = self.range(key);
|
||||
while let Some(r) = iter.next().await {
|
||||
let kv = r?;
|
||||
if kv.0 == key {
|
||||
return Ok(Some(kv));
|
||||
}
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
pub type KvBackendRef = Arc<dyn KvBackend>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use async_stream::stream;
|
||||
|
||||
use super::*;
|
||||
|
||||
struct MockKvBackend {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl KvBackend for MockKvBackend {
|
||||
fn range<'a, 'b>(&'a self, _key: &[u8]) -> ValueIter<'b, Error>
|
||||
where
|
||||
'a: 'b,
|
||||
{
|
||||
Box::pin(stream!({
|
||||
for i in 0..3 {
|
||||
yield Ok(Kv(
|
||||
i.to_string().as_bytes().to_vec(),
|
||||
i.to_string().as_bytes().to_vec(),
|
||||
))
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
async fn set(&self, _key: &[u8], _val: &[u8]) -> Result<(), Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete_range(&self, _key: &[u8], _end: &[u8]) -> Result<(), Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get() {
|
||||
let backend = MockKvBackend {};
|
||||
let result = backend.get(0.to_string().as_bytes()).await;
|
||||
assert_eq!(0.to_string().as_bytes(), result.unwrap().unwrap().0);
|
||||
let result = backend.get(1.to_string().as_bytes()).await;
|
||||
assert_eq!(1.to_string().as_bytes(), result.unwrap().unwrap().0);
|
||||
let result = backend.get(2.to_string().as_bytes()).await;
|
||||
assert_eq!(2.to_string().as_bytes(), result.unwrap().unwrap().0);
|
||||
let result = backend.get(3.to_string().as_bytes()).await;
|
||||
assert!(result.unwrap().is_none());
|
||||
}
|
||||
}
|
||||
71
src/catalog/src/remote/client.rs
Normal file
71
src/catalog/src/remote/client.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use async_stream::stream;
|
||||
use common_telemetry::info;
|
||||
use meta_client::client::MetaClient;
|
||||
use meta_client::rpc::{DeleteRangeRequest, PutRequest, RangeRequest};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{Error, MetaSrvSnafu};
|
||||
use crate::remote::{Kv, KvBackend, ValueIter};
|
||||
#[derive(Debug)]
|
||||
pub struct MetaKvBackend {
|
||||
pub client: MetaClient,
|
||||
}
|
||||
|
||||
/// Implement `KvBackend` trait for `MetaKvBackend` instead of opendal's `Accessor` since
|
||||
/// `MetaClient`'s range method can return both keys and values, which can reduce IO overhead
|
||||
/// comparing to `Accessor`'s list and get method.
|
||||
#[async_trait::async_trait]
|
||||
impl KvBackend for MetaKvBackend {
|
||||
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error>
|
||||
where
|
||||
'a: 'b,
|
||||
{
|
||||
let key = key.to_vec();
|
||||
Box::pin(stream!({
|
||||
let mut resp = self
|
||||
.client
|
||||
.range(RangeRequest::new().with_prefix(key))
|
||||
.await
|
||||
.context(MetaSrvSnafu)?;
|
||||
let kvs = resp.take_kvs();
|
||||
for mut kv in kvs.into_iter() {
|
||||
yield Ok(Kv(kv.take_key(), kv.take_value()))
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
async fn get(&self, key: &[u8]) -> Result<Option<Kv>, Error> {
|
||||
let mut response = self
|
||||
.client
|
||||
.range(RangeRequest::new().with_key(key))
|
||||
.await
|
||||
.context(MetaSrvSnafu)?;
|
||||
Ok(response
|
||||
.take_kvs()
|
||||
.get_mut(0)
|
||||
.map(|kv| Kv(kv.take_key(), kv.take_value())))
|
||||
}
|
||||
|
||||
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
let req = PutRequest::new()
|
||||
.with_key(key.to_vec())
|
||||
.with_value(val.to_vec());
|
||||
let _ = self.client.put(req).await.context(MetaSrvSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> {
|
||||
let req = DeleteRangeRequest::new().with_range(key.to_vec(), end.to_vec());
|
||||
let resp = self.client.delete_range(req).await.context(MetaSrvSnafu)?;
|
||||
info!(
|
||||
"Delete range, key: {}, end: {}, deleted: {}",
|
||||
String::from_utf8_lossy(key),
|
||||
String::from_utf8_lossy(end),
|
||||
resp.deleted()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
669
src/catalog/src/remote/manager.rs
Normal file
669
src/catalog/src/remote/manager.rs
Normal file
@@ -0,0 +1,669 @@
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use async_stream::stream;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
|
||||
use common_catalog::{
|
||||
build_catalog_prefix, build_schema_prefix, build_table_prefix, CatalogKey, CatalogValue,
|
||||
SchemaKey, SchemaValue, TableKey, TableValue,
|
||||
};
|
||||
use common_telemetry::{debug, info};
|
||||
use datatypes::schema::Schema;
|
||||
use futures::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::metadata::{TableId, TableVersion};
|
||||
use table::requests::{CreateTableRequest, OpenTableRequest};
|
||||
use table::TableRef;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu,
|
||||
SchemaNotFoundSnafu, TableExistsSnafu,
|
||||
};
|
||||
use crate::remote::{Kv, KvBackendRef};
|
||||
use crate::{
|
||||
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// Catalog manager based on metasrv.
|
||||
pub struct RemoteCatalogManager {
|
||||
node_id: u64,
|
||||
backend: KvBackendRef,
|
||||
catalogs: Arc<ArcSwap<HashMap<String, CatalogProviderRef>>>,
|
||||
next_table_id: Arc<AtomicU32>,
|
||||
engine: TableEngineRef,
|
||||
system_table_requests: Mutex<Vec<RegisterSystemTableRequest>>,
|
||||
mutex: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
impl RemoteCatalogManager {
|
||||
pub fn new(engine: TableEngineRef, node_id: u64, backend: KvBackendRef) -> Self {
|
||||
Self {
|
||||
engine,
|
||||
node_id,
|
||||
backend,
|
||||
catalogs: Default::default(),
|
||||
next_table_id: Default::default(),
|
||||
system_table_requests: Default::default(),
|
||||
mutex: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_catalog_key(&self, catalog_name: impl AsRef<str>) -> CatalogKey {
|
||||
CatalogKey {
|
||||
catalog_name: catalog_name.as_ref().to_string(),
|
||||
node_id: self.node_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn new_catalog_provider(&self, catalog_name: &str) -> CatalogProviderRef {
|
||||
Arc::new(RemoteCatalogProvider {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
node_id: self.node_id,
|
||||
backend: self.backend.clone(),
|
||||
schemas: Default::default(),
|
||||
mutex: Default::default(),
|
||||
}) as _
|
||||
}
|
||||
|
||||
fn new_schema_provider(&self, catalog_name: &str, schema_name: &str) -> SchemaProviderRef {
|
||||
Arc::new(RemoteSchemaProvider {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
tables: Default::default(),
|
||||
node_id: self.node_id,
|
||||
backend: self.backend.clone(),
|
||||
mutex: Default::default(),
|
||||
}) as _
|
||||
}
|
||||
|
||||
async fn iter_remote_catalogs(
|
||||
&self,
|
||||
) -> Pin<Box<dyn Stream<Item = Result<CatalogKey>> + Send + '_>> {
|
||||
let catalog_range_prefix = build_catalog_prefix();
|
||||
info!("catalog_range_prefix: {}", catalog_range_prefix);
|
||||
let mut catalogs = self.backend.range(catalog_range_prefix.as_bytes());
|
||||
Box::pin(stream!({
|
||||
while let Some(r) = catalogs.next().await {
|
||||
let Kv(k, _) = r?;
|
||||
if !k.starts_with(catalog_range_prefix.as_bytes()) {
|
||||
debug!("Ignoring non-catalog key: {}", String::from_utf8_lossy(&k));
|
||||
continue;
|
||||
}
|
||||
let key = CatalogKey::parse(&String::from_utf8_lossy(&k))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
if key.node_id == self.node_id {
|
||||
yield Ok(key)
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
async fn iter_remote_schemas(
|
||||
&self,
|
||||
catalog_name: &str,
|
||||
) -> Pin<Box<dyn Stream<Item = Result<SchemaKey>> + Send + '_>> {
|
||||
let schema_prefix = build_schema_prefix(catalog_name);
|
||||
let mut schemas = self.backend.range(schema_prefix.as_bytes());
|
||||
|
||||
Box::pin(stream!({
|
||||
while let Some(r) = schemas.next().await {
|
||||
let Kv(k, _) = r?;
|
||||
if !k.starts_with(schema_prefix.as_bytes()) {
|
||||
debug!("Ignoring non-schema key: {}", String::from_utf8_lossy(&k));
|
||||
continue;
|
||||
}
|
||||
|
||||
let schema_key = SchemaKey::parse(&String::from_utf8_lossy(&k))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
|
||||
if schema_key.node_id == self.node_id {
|
||||
yield Ok(schema_key)
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
/// Iterate over all table entries on metasrv
|
||||
/// TODO(hl): table entries with different version is not currently considered.
|
||||
/// Ideally deprecated table entry must be deleted when deregistering from catalog.
|
||||
async fn iter_remote_tables(
|
||||
&self,
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
) -> Pin<Box<dyn Stream<Item = Result<(TableKey, TableValue)>> + Send + '_>> {
|
||||
let table_prefix = build_table_prefix(catalog_name, schema_name);
|
||||
let mut tables = self.backend.range(table_prefix.as_bytes());
|
||||
Box::pin(stream!({
|
||||
while let Some(r) = tables.next().await {
|
||||
let Kv(k, v) = r?;
|
||||
if !k.starts_with(table_prefix.as_bytes()) {
|
||||
debug!("Ignoring non-table prefix: {}", String::from_utf8_lossy(&k));
|
||||
continue;
|
||||
}
|
||||
let table_key = TableKey::parse(&String::from_utf8_lossy(&k))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
let table_value = TableValue::parse(&String::from_utf8_lossy(&v))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
|
||||
if table_value.node_id == self.node_id {
|
||||
yield Ok((table_key, table_value))
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
/// Fetch catalogs/schemas/tables from remote catalog manager along with max table id allocated.
|
||||
async fn initiate_catalogs(&self) -> Result<(HashMap<String, CatalogProviderRef>, TableId)> {
|
||||
let mut res = HashMap::new();
|
||||
let max_table_id = MIN_USER_TABLE_ID;
|
||||
|
||||
// initiate default catalog and schema
|
||||
let default_catalog = self.initiate_default_catalog().await?;
|
||||
res.insert(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
|
||||
info!("Default catalog and schema registered");
|
||||
|
||||
let mut catalogs = self.iter_remote_catalogs().await;
|
||||
while let Some(r) = catalogs.next().await {
|
||||
let CatalogKey { catalog_name, .. } = r?;
|
||||
info!("Fetch catalog from metasrv: {}", catalog_name);
|
||||
let catalog = res
|
||||
.entry(catalog_name.clone())
|
||||
.or_insert_with(|| self.new_catalog_provider(&catalog_name))
|
||||
.clone();
|
||||
|
||||
self.initiate_schemas(catalog_name, catalog, max_table_id)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok((res, max_table_id))
|
||||
}
|
||||
|
||||
async fn initiate_schemas(
|
||||
&self,
|
||||
catalog_name: String,
|
||||
catalog: CatalogProviderRef,
|
||||
max_table_id: TableId,
|
||||
) -> Result<()> {
|
||||
let mut schemas = self.iter_remote_schemas(&catalog_name).await;
|
||||
while let Some(r) = schemas.next().await {
|
||||
let SchemaKey {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
..
|
||||
} = r?;
|
||||
info!("Found schema: {}.{}", catalog_name, schema_name);
|
||||
let schema = match catalog.schema(&schema_name)? {
|
||||
None => {
|
||||
let schema = self.new_schema_provider(&catalog_name, &schema_name);
|
||||
catalog.register_schema(schema_name.clone(), schema.clone())?;
|
||||
info!("Registered schema: {}", &schema_name);
|
||||
schema
|
||||
}
|
||||
Some(schema) => schema,
|
||||
};
|
||||
|
||||
info!(
|
||||
"Fetch schema from metasrv: {}.{}",
|
||||
&catalog_name, &schema_name
|
||||
);
|
||||
self.initiate_tables(&catalog_name, &schema_name, schema, max_table_id)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initiates all tables inside a catalog by fetching data from metasrv.
|
||||
async fn initiate_tables<'a>(
|
||||
&'a self,
|
||||
catalog_name: &'a str,
|
||||
schema_name: &'a str,
|
||||
schema: SchemaProviderRef,
|
||||
mut max_table_id: TableId,
|
||||
) -> Result<()> {
|
||||
let mut tables = self.iter_remote_tables(catalog_name, schema_name).await;
|
||||
while let Some(r) = tables.next().await {
|
||||
let (table_key, table_value) = r?;
|
||||
let table_ref = self.open_or_create_table(&table_key, &table_value).await?;
|
||||
schema.register_table(table_key.table_name.to_string(), table_ref)?;
|
||||
info!("Registered table {}", &table_key.table_name);
|
||||
if table_value.id > max_table_id {
|
||||
info!("Max table id: {} -> {}", max_table_id, table_value.id);
|
||||
max_table_id = table_value.id;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn initiate_default_catalog(&self) -> Result<CatalogProviderRef> {
|
||||
let default_catalog = self.new_catalog_provider(DEFAULT_CATALOG_NAME);
|
||||
let default_schema = self.new_schema_provider(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
|
||||
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?;
|
||||
let schema_key = SchemaKey {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
node_id: self.node_id,
|
||||
}
|
||||
.to_string();
|
||||
self.backend
|
||||
.set(
|
||||
schema_key.as_bytes(),
|
||||
&SchemaValue {}
|
||||
.to_bytes()
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
info!("Registered default schema");
|
||||
|
||||
let catalog_key = CatalogKey {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
node_id: self.node_id,
|
||||
}
|
||||
.to_string();
|
||||
self.backend
|
||||
.set(
|
||||
catalog_key.as_bytes(),
|
||||
&CatalogValue {}
|
||||
.to_bytes()
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
info!("Registered default catalog");
|
||||
Ok(default_catalog)
|
||||
}
|
||||
|
||||
async fn open_or_create_table(
|
||||
&self,
|
||||
table_key: &TableKey,
|
||||
table_value: &TableValue,
|
||||
) -> Result<TableRef> {
|
||||
let context = EngineContext {};
|
||||
let TableKey {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
..
|
||||
} = table_key;
|
||||
|
||||
let TableValue { id, meta, .. } = table_value;
|
||||
|
||||
let request = OpenTableRequest {
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
table_id: *id,
|
||||
};
|
||||
match self
|
||||
.engine
|
||||
.open_table(&context, request)
|
||||
.await
|
||||
.with_context(|_| OpenTableSnafu {
|
||||
table_info: format!("{}.{}.{}, id:{}", catalog_name, schema_name, table_name, id,),
|
||||
})? {
|
||||
Some(table) => Ok(table),
|
||||
None => {
|
||||
let req = CreateTableRequest {
|
||||
id: *id,
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
desc: None,
|
||||
schema: Arc::new(Schema::new(meta.schema.column_schemas.clone())),
|
||||
primary_key_indices: meta.primary_key_indices.clone(),
|
||||
create_if_not_exists: true,
|
||||
table_options: meta.options.clone(),
|
||||
};
|
||||
|
||||
self.engine
|
||||
.create_table(&context, req)
|
||||
.await
|
||||
.context(CreateTableSnafu {
|
||||
table_info: format!(
|
||||
"{}.{}.{}, id:{}",
|
||||
&catalog_name, &schema_name, &table_name, id
|
||||
),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CatalogManager for RemoteCatalogManager {
|
||||
async fn start(&self) -> Result<()> {
|
||||
let (catalogs, max_table_id) = self.initiate_catalogs().await?;
|
||||
info!(
|
||||
"Initialized catalogs: {:?}",
|
||||
catalogs.keys().cloned().collect::<Vec<_>>()
|
||||
);
|
||||
self.catalogs.store(Arc::new(catalogs));
|
||||
self.next_table_id
|
||||
.store(max_table_id + 1, Ordering::Relaxed);
|
||||
info!("Max table id allocated: {}", max_table_id);
|
||||
|
||||
let mut system_table_requests = self.system_table_requests.lock().await;
|
||||
handle_system_table_request(self, self.engine.clone(), &mut system_table_requests).await?;
|
||||
info!("All system table opened");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn next_table_id(&self) -> TableId {
|
||||
self.next_table_id.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
|
||||
let catalog_name = request.catalog;
|
||||
let schema_name = request.schema;
|
||||
let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu {
|
||||
catalog_name: &catalog_name,
|
||||
})?;
|
||||
let schema_provider =
|
||||
catalog_provider
|
||||
.schema(&schema_name)?
|
||||
.with_context(|| SchemaNotFoundSnafu {
|
||||
schema_info: format!("{}.{}", &catalog_name, &schema_name),
|
||||
})?;
|
||||
if schema_provider.table_exist(&request.table_name)? {
|
||||
return TableExistsSnafu {
|
||||
table: format!("{}.{}.{}", &catalog_name, &schema_name, &request.table_name),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
schema_provider.register_table(request.table_name, request.table)?;
|
||||
Ok(1)
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
|
||||
let mut requests = self.system_table_requests.lock().await;
|
||||
requests.push(request);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn table(
|
||||
&self,
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
) -> Result<Option<TableRef>> {
|
||||
let catalog = self
|
||||
.catalog(catalog_name)?
|
||||
.with_context(|| CatalogNotFoundSnafu { catalog_name })?;
|
||||
let schema = catalog
|
||||
.schema(schema_name)?
|
||||
.with_context(|| SchemaNotFoundSnafu {
|
||||
schema_info: format!("{}.{}", catalog_name, schema_name),
|
||||
})?;
|
||||
schema.table(table_name)
|
||||
}
|
||||
}
|
||||
|
||||
impl CatalogList for RemoteCatalogManager {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn register_catalog(
|
||||
&self,
|
||||
name: String,
|
||||
catalog: CatalogProviderRef,
|
||||
) -> Result<Option<CatalogProviderRef>> {
|
||||
let key = self.build_catalog_key(&name).to_string();
|
||||
let backend = self.backend.clone();
|
||||
let mutex = self.mutex.clone();
|
||||
let catalogs = self.catalogs.clone();
|
||||
|
||||
std::thread::spawn(|| {
|
||||
common_runtime::block_on_write(async move {
|
||||
let _guard = mutex.lock().await;
|
||||
backend
|
||||
.set(
|
||||
key.as_bytes(),
|
||||
&CatalogValue {}
|
||||
.to_bytes()
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
let prev_catalogs = catalogs.load();
|
||||
let mut new_catalogs = HashMap::with_capacity(prev_catalogs.len() + 1);
|
||||
new_catalogs.clone_from(&prev_catalogs);
|
||||
let prev = new_catalogs.insert(name, catalog);
|
||||
catalogs.store(Arc::new(new_catalogs));
|
||||
Ok(prev)
|
||||
})
|
||||
})
|
||||
.join()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// List all catalogs from metasrv
|
||||
fn catalog_names(&self) -> Result<Vec<String>> {
|
||||
Ok(self.catalogs.load().keys().cloned().collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
/// Read catalog info of given name from metasrv.
|
||||
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>> {
|
||||
Ok(self.catalogs.load().get(name).cloned())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RemoteCatalogProvider {
|
||||
catalog_name: String,
|
||||
node_id: u64,
|
||||
backend: KvBackendRef,
|
||||
schemas: Arc<ArcSwap<HashMap<String, SchemaProviderRef>>>,
|
||||
mutex: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
impl RemoteCatalogProvider {
|
||||
pub fn new(catalog_name: String, node_id: u64, backend: KvBackendRef) -> Self {
|
||||
Self {
|
||||
catalog_name,
|
||||
node_id,
|
||||
backend,
|
||||
schemas: Default::default(),
|
||||
mutex: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_schema_key(&self, schema_name: impl AsRef<str>) -> SchemaKey {
|
||||
SchemaKey {
|
||||
catalog_name: self.catalog_name.clone(),
|
||||
schema_name: schema_name.as_ref().to_string(),
|
||||
node_id: self.node_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CatalogProvider for RemoteCatalogProvider {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema_names(&self) -> Result<Vec<String>> {
|
||||
Ok(self.schemas.load().keys().cloned().collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
fn register_schema(
|
||||
&self,
|
||||
name: String,
|
||||
schema: SchemaProviderRef,
|
||||
) -> Result<Option<SchemaProviderRef>> {
|
||||
let key = self.build_schema_key(&name).to_string();
|
||||
let backend = self.backend.clone();
|
||||
let mutex = self.mutex.clone();
|
||||
let schemas = self.schemas.clone();
|
||||
|
||||
std::thread::spawn(|| {
|
||||
common_runtime::block_on_write(async move {
|
||||
let _guard = mutex.lock().await;
|
||||
backend
|
||||
.set(
|
||||
key.as_bytes(),
|
||||
&SchemaValue {}
|
||||
.to_bytes()
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
let prev_schemas = schemas.load();
|
||||
let mut new_schemas = HashMap::with_capacity(prev_schemas.len() + 1);
|
||||
new_schemas.clone_from(&prev_schemas);
|
||||
let prev_schema = new_schemas.insert(name, schema);
|
||||
schemas.store(Arc::new(new_schemas));
|
||||
Ok(prev_schema)
|
||||
})
|
||||
})
|
||||
.join()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
|
||||
Ok(self.schemas.load().get(name).cloned())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RemoteSchemaProvider {
|
||||
catalog_name: String,
|
||||
schema_name: String,
|
||||
node_id: u64,
|
||||
backend: KvBackendRef,
|
||||
tables: Arc<ArcSwap<HashMap<String, TableRef>>>,
|
||||
mutex: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
impl RemoteSchemaProvider {
|
||||
pub fn new(
|
||||
catalog_name: String,
|
||||
schema_name: String,
|
||||
node_id: u64,
|
||||
backend: KvBackendRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
node_id,
|
||||
backend,
|
||||
tables: Default::default(),
|
||||
mutex: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_table_key(
|
||||
&self,
|
||||
table_name: impl AsRef<str>,
|
||||
table_version: TableVersion,
|
||||
) -> TableKey {
|
||||
TableKey {
|
||||
catalog_name: self.catalog_name.clone(),
|
||||
schema_name: self.schema_name.clone(),
|
||||
table_name: table_name.as_ref().to_string(),
|
||||
version: table_version,
|
||||
node_id: self.node_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SchemaProvider for RemoteSchemaProvider {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn table_names(&self) -> Result<Vec<String>> {
|
||||
Ok(self.tables.load().keys().cloned().collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
fn table(&self, name: &str) -> Result<Option<TableRef>> {
|
||||
Ok(self.tables.load().get(name).cloned())
|
||||
}
|
||||
|
||||
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
|
||||
let table_info = table.table_info();
|
||||
let table_version = table_info.ident.version;
|
||||
let table_value = TableValue {
|
||||
meta: table_info.meta.clone().into(),
|
||||
id: table_info.ident.table_id,
|
||||
node_id: self.node_id,
|
||||
regions_ids: vec![],
|
||||
};
|
||||
let backend = self.backend.clone();
|
||||
let mutex = self.mutex.clone();
|
||||
let tables = self.tables.clone();
|
||||
|
||||
let table_key = self
|
||||
.build_table_key(name.clone(), table_version)
|
||||
.to_string();
|
||||
|
||||
let prev = std::thread::spawn(move || {
|
||||
common_runtime::block_on_read(async move {
|
||||
let _guard = mutex.lock().await;
|
||||
backend
|
||||
.set(
|
||||
table_key.as_bytes(),
|
||||
&table_value.as_bytes().context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
debug!(
|
||||
"Successfully set catalog table entry, key: {}, table value: {:?}",
|
||||
table_key, table_value
|
||||
);
|
||||
|
||||
let prev_tables = tables.load();
|
||||
let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1);
|
||||
new_tables.clone_from(&prev_tables);
|
||||
let prev = new_tables.insert(name, table);
|
||||
tables.store(Arc::new(new_tables));
|
||||
Ok(prev)
|
||||
})
|
||||
})
|
||||
.join()
|
||||
.unwrap();
|
||||
prev
|
||||
}
|
||||
|
||||
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
|
||||
let table_version = match self.tables.load().get(name) {
|
||||
None => return Ok(None),
|
||||
Some(t) => t.table_info().ident.version,
|
||||
};
|
||||
|
||||
let table_name = name.to_string();
|
||||
let table_key = self.build_table_key(&table_name, table_version).to_string();
|
||||
|
||||
let backend = self.backend.clone();
|
||||
let mutex = self.mutex.clone();
|
||||
let tables = self.tables.clone();
|
||||
|
||||
let prev = std::thread::spawn(move || {
|
||||
common_runtime::block_on_read(async move {
|
||||
let _guard = mutex.lock().await;
|
||||
backend.delete(table_key.as_bytes()).await?;
|
||||
debug!(
|
||||
"Successfully deleted catalog table entry, key: {}",
|
||||
table_key
|
||||
);
|
||||
|
||||
let prev_tables = tables.load();
|
||||
let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1);
|
||||
new_tables.clone_from(&prev_tables);
|
||||
let prev = new_tables.remove(&table_name);
|
||||
tables.store(Arc::new(new_tables));
|
||||
Ok(prev)
|
||||
})
|
||||
})
|
||||
.join()
|
||||
.unwrap();
|
||||
prev
|
||||
}
|
||||
|
||||
/// Checks if table exists in schema provider based on locally opened table map.
|
||||
fn table_exist(&self, name: &str) -> Result<bool> {
|
||||
Ok(self.tables.load().contains_key(name))
|
||||
}
|
||||
}
|
||||
163
src/catalog/tests/mock.rs
Normal file
163
src/catalog/tests/mock.rs
Normal file
@@ -0,0 +1,163 @@
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::stream;
|
||||
use catalog::error::Error;
|
||||
use catalog::remote::{Kv, KvBackend, ValueIter};
|
||||
use common_recordbatch::RecordBatch;
|
||||
use common_telemetry::logging::info;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::vectors::StringVector;
|
||||
use serde::Serializer;
|
||||
use table::engine::{EngineContext, TableEngine};
|
||||
use table::metadata::TableId;
|
||||
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
|
||||
use table::test_util::MemTable;
|
||||
use table::TableRef;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MockKvBackend {
|
||||
map: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl Display for MockKvBackend {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
futures::executor::block_on(async {
|
||||
let map = self.map.read().await;
|
||||
for (k, v) in map.iter() {
|
||||
f.serialize_str(&String::from_utf8_lossy(k))?;
|
||||
f.serialize_str(" -> ")?;
|
||||
f.serialize_str(&String::from_utf8_lossy(v))?;
|
||||
f.serialize_str("\n")?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl KvBackend for MockKvBackend {
|
||||
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error>
|
||||
where
|
||||
'a: 'b,
|
||||
{
|
||||
let prefix = key.to_vec();
|
||||
let prefix_string = String::from_utf8_lossy(&prefix).to_string();
|
||||
Box::pin(stream!({
|
||||
let maps = self.map.read().await.clone();
|
||||
for (k, v) in maps.range(prefix.clone()..) {
|
||||
let key_string = String::from_utf8_lossy(k).to_string();
|
||||
let matches = key_string.starts_with(&prefix_string);
|
||||
if matches {
|
||||
yield Ok(Kv(k.clone(), v.clone()))
|
||||
} else {
|
||||
info!("Stream finished");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
let mut map = self.map.write().await;
|
||||
map.insert(key.to_vec(), val.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> {
|
||||
let start = key.to_vec();
|
||||
let end = end.to_vec();
|
||||
let range = start..end;
|
||||
|
||||
let mut map = self.map.write().await;
|
||||
map.retain(|k, _| !range.contains(k));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MockTableEngine {
|
||||
tables: RwLock<HashMap<String, TableRef>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TableEngine for MockTableEngine {
|
||||
fn name(&self) -> &str {
|
||||
"MockTableEngine"
|
||||
}
|
||||
|
||||
/// Create a table with only one column
|
||||
async fn create_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
request: CreateTableRequest,
|
||||
) -> table::Result<TableRef> {
|
||||
let table_name = request.table_name.clone();
|
||||
let catalog_name = request.catalog_name.clone();
|
||||
let schema_name = request.schema_name.clone();
|
||||
|
||||
let default_table_id = "0".to_owned();
|
||||
let table_id = TableId::from_str(
|
||||
request
|
||||
.table_options
|
||||
.get("table_id")
|
||||
.unwrap_or(&default_table_id),
|
||||
)
|
||||
.unwrap();
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"name",
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
)]));
|
||||
|
||||
let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _];
|
||||
let record_batch = RecordBatch::new(schema, data).unwrap();
|
||||
let table: TableRef = Arc::new(MemTable::new_with_catalog(
|
||||
&table_name,
|
||||
record_batch,
|
||||
table_id,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
)) as Arc<_>;
|
||||
|
||||
let mut tables = self.tables.write().await;
|
||||
tables.insert(table_name, table.clone() as TableRef);
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
async fn open_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> table::Result<Option<TableRef>> {
|
||||
Ok(self.tables.read().await.get(&request.table_name).cloned())
|
||||
}
|
||||
|
||||
async fn alter_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
_request: AlterTableRequest,
|
||||
) -> table::Result<TableRef> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn get_table(&self, _ctx: &EngineContext, name: &str) -> table::Result<Option<TableRef>> {
|
||||
futures::executor::block_on(async { Ok(self.tables.read().await.get(name).cloned()) })
|
||||
}
|
||||
|
||||
fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool {
|
||||
futures::executor::block_on(async { self.tables.read().await.contains_key(name) })
|
||||
}
|
||||
|
||||
async fn drop_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
_request: DropTableRequest,
|
||||
) -> table::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
274
src/catalog/tests/remote_catalog_tests.rs
Normal file
274
src/catalog/tests/remote_catalog_tests.rs
Normal file
@@ -0,0 +1,274 @@
|
||||
#![feature(assert_matches)]
|
||||
|
||||
mod mock;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::remote::{
|
||||
KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider,
|
||||
};
|
||||
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
|
||||
use datatypes::schema::Schema;
|
||||
use futures_util::StreamExt;
|
||||
use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::requests::CreateTableRequest;
|
||||
|
||||
use crate::mock::{MockKvBackend, MockTableEngine};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_backend() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let node_id = 42;
|
||||
let backend = MockKvBackend::default();
|
||||
|
||||
let default_catalog_key = CatalogKey {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
node_id,
|
||||
}
|
||||
.to_string();
|
||||
|
||||
backend
|
||||
.set(
|
||||
default_catalog_key.as_bytes(),
|
||||
&CatalogValue {}.to_bytes().unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let schema_key = SchemaKey {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
node_id,
|
||||
}
|
||||
.to_string();
|
||||
backend
|
||||
.set(schema_key.as_bytes(), &SchemaValue {}.to_bytes().unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut iter = backend.range("__c-".as_bytes());
|
||||
let mut res = HashSet::new();
|
||||
while let Some(r) = iter.next().await {
|
||||
let kv = r.unwrap();
|
||||
res.insert(String::from_utf8_lossy(&kv.0).to_string());
|
||||
}
|
||||
assert_eq!(
|
||||
vec!["__c-greptime-42".to_string()],
|
||||
res.into_iter().collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
async fn prepare_components(node_id: u64) -> (KvBackendRef, TableEngineRef, CatalogManagerRef) {
|
||||
let backend = Arc::new(MockKvBackend::default()) as KvBackendRef;
|
||||
let table_engine = Arc::new(MockTableEngine::default());
|
||||
let catalog_manager =
|
||||
RemoteCatalogManager::new(table_engine.clone(), node_id, backend.clone());
|
||||
catalog_manager.start().await.unwrap();
|
||||
(backend, table_engine, Arc::new(catalog_manager))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remote_catalog_default() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let node_id = 42;
|
||||
let (_, _, catalog_manager) = prepare_components(node_id).await;
|
||||
assert_eq!(
|
||||
vec![DEFAULT_CATALOG_NAME.to_string()],
|
||||
catalog_manager.catalog_names().unwrap()
|
||||
);
|
||||
|
||||
let default_catalog = catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
vec![DEFAULT_SCHEMA_NAME.to_string()],
|
||||
default_catalog.schema_names().unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remote_catalog_register_nonexistent() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let node_id = 42;
|
||||
let (_, table_engine, catalog_manager) = prepare_components(node_id).await;
|
||||
// register a new table with an nonexistent catalog
|
||||
let catalog_name = "nonexistent_catalog".to_string();
|
||||
let schema_name = "nonexistent_schema".to_string();
|
||||
let table_name = "fail_table".to_string();
|
||||
// this schema has no effect
|
||||
let table_schema = Arc::new(Schema::new(vec![]));
|
||||
let table = table_engine
|
||||
.create_table(
|
||||
&EngineContext {},
|
||||
CreateTableRequest {
|
||||
id: 1,
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
desc: None,
|
||||
schema: table_schema.clone(),
|
||||
primary_key_indices: vec![],
|
||||
create_if_not_exists: false,
|
||||
table_options: Default::default(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let reg_req = RegisterTableRequest {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table_name,
|
||||
table_id: 1,
|
||||
table,
|
||||
};
|
||||
let res = catalog_manager.register_table(reg_req).await;
|
||||
|
||||
// because nonexistent_catalog does not exist yet.
|
||||
assert_matches!(
|
||||
res.err().unwrap(),
|
||||
catalog::error::Error::CatalogNotFound { .. }
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_register_table() {
|
||||
let node_id = 42;
|
||||
let (_, table_engine, catalog_manager) = prepare_components(node_id).await;
|
||||
let default_catalog = catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
vec![DEFAULT_SCHEMA_NAME.to_string()],
|
||||
default_catalog.schema_names().unwrap()
|
||||
);
|
||||
|
||||
let default_schema = default_catalog
|
||||
.schema(DEFAULT_SCHEMA_NAME)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(Vec::<String>::new(), default_schema.table_names().unwrap());
|
||||
|
||||
// register a new table with an nonexistent catalog
|
||||
let catalog_name = DEFAULT_CATALOG_NAME.to_string();
|
||||
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
|
||||
let table_name = "test_table".to_string();
|
||||
let table_id = 1;
|
||||
// this schema has no effect
|
||||
let table_schema = Arc::new(Schema::new(vec![]));
|
||||
let table = table_engine
|
||||
.create_table(
|
||||
&EngineContext {},
|
||||
CreateTableRequest {
|
||||
id: table_id,
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
desc: None,
|
||||
schema: table_schema.clone(),
|
||||
primary_key_indices: vec![],
|
||||
create_if_not_exists: false,
|
||||
table_options: Default::default(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let reg_req = RegisterTableRequest {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table_name: table_name.clone(),
|
||||
table_id,
|
||||
table,
|
||||
};
|
||||
assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap());
|
||||
assert_eq!(vec![table_name], default_schema.table_names().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_register_catalog_schema_table() {
|
||||
let node_id = 42;
|
||||
let (backend, table_engine, catalog_manager) = prepare_components(node_id).await;
|
||||
|
||||
let catalog_name = "test_catalog".to_string();
|
||||
let schema_name = "nonexistent_schema".to_string();
|
||||
let catalog = Arc::new(RemoteCatalogProvider::new(
|
||||
catalog_name.clone(),
|
||||
node_id,
|
||||
backend.clone(),
|
||||
));
|
||||
|
||||
// register catalog to catalog manager
|
||||
catalog_manager
|
||||
.register_catalog(catalog_name.clone(), catalog)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
HashSet::<String>::from_iter(
|
||||
vec![DEFAULT_CATALOG_NAME.to_string(), catalog_name.clone()].into_iter()
|
||||
),
|
||||
HashSet::from_iter(catalog_manager.catalog_names().unwrap().into_iter())
|
||||
);
|
||||
|
||||
let table_to_register = table_engine
|
||||
.create_table(
|
||||
&EngineContext {},
|
||||
CreateTableRequest {
|
||||
id: 2,
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: "".to_string(),
|
||||
desc: None,
|
||||
schema: Arc::new(Schema::new(vec![])),
|
||||
primary_key_indices: vec![],
|
||||
create_if_not_exists: false,
|
||||
table_options: Default::default(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let reg_req = RegisterTableRequest {
|
||||
catalog: catalog_name.clone(),
|
||||
schema: schema_name.clone(),
|
||||
table_name: " fail_table".to_string(),
|
||||
table_id: 2,
|
||||
table: table_to_register,
|
||||
};
|
||||
// this register will fail since schema does not exist yet
|
||||
assert_matches!(
|
||||
catalog_manager
|
||||
.register_table(reg_req.clone())
|
||||
.await
|
||||
.unwrap_err(),
|
||||
catalog::error::Error::SchemaNotFound { .. }
|
||||
);
|
||||
|
||||
let new_catalog = catalog_manager
|
||||
.catalog(&catalog_name)
|
||||
.unwrap()
|
||||
.expect("catalog should exist since it's already registered");
|
||||
let schema = Arc::new(RemoteSchemaProvider::new(
|
||||
catalog_name.clone(),
|
||||
schema_name.clone(),
|
||||
node_id,
|
||||
backend.clone(),
|
||||
));
|
||||
|
||||
let prev = new_catalog
|
||||
.register_schema(schema_name.clone(), schema.clone())
|
||||
.expect("Register schema should not fail");
|
||||
assert!(prev.is_none());
|
||||
assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap());
|
||||
|
||||
assert_eq!(
|
||||
HashSet::from([schema_name.clone()]),
|
||||
new_catalog.schema_names().unwrap().into_iter().collect()
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,9 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
source: serde_json::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse node id: {}", key))]
|
||||
ParseNodeId { key: String, backtrace: Backtrace },
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -30,6 +33,7 @@ impl ErrorExt for Error {
|
||||
Error::InvalidCatalog { .. }
|
||||
| Error::DeserializeCatalogEntryValue { .. }
|
||||
| Error::SerializeCatalogEntryValue { .. } => StatusCode::Unexpected,
|
||||
Error::ParseNodeId { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,20 +9,18 @@ use table::metadata::{RawTableMeta, TableId, TableVersion};
|
||||
|
||||
use crate::consts::{CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_KEY_PREFIX};
|
||||
use crate::error::{
|
||||
DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu,
|
||||
DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, ParseNodeIdSnafu,
|
||||
SerializeCatalogEntryValueSnafu,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
static ref CATALOG_KEY_PATTERN: Regex = Regex::new(&format!(
|
||||
"^{}-([a-zA-Z_]+)-([a-zA-Z_]+)$",
|
||||
CATALOG_KEY_PREFIX
|
||||
))
|
||||
.unwrap();
|
||||
static ref CATALOG_KEY_PATTERN: Regex =
|
||||
Regex::new(&format!("^{}-([a-zA-Z_]+)-([0-9]+)$", CATALOG_KEY_PREFIX)).unwrap();
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref SCHEMA_KEY_PATTERN: Regex = Regex::new(&format!(
|
||||
"^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)$",
|
||||
"^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)$",
|
||||
SCHEMA_KEY_PREFIX
|
||||
))
|
||||
.unwrap();
|
||||
@@ -30,7 +28,7 @@ lazy_static! {
|
||||
|
||||
lazy_static! {
|
||||
static ref TABLE_KEY_PATTERN: Regex = Regex::new(&format!(
|
||||
"^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)-([a-zA-Z_]+)$",
|
||||
"^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)-([0-9]+)$",
|
||||
TABLE_KEY_PREFIX
|
||||
))
|
||||
.unwrap();
|
||||
@@ -58,7 +56,7 @@ pub struct TableKey {
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub version: TableVersion,
|
||||
pub node_id: String,
|
||||
pub node_id: u64,
|
||||
}
|
||||
|
||||
impl Display for TableKey {
|
||||
@@ -73,7 +71,7 @@ impl Display for TableKey {
|
||||
f.write_str("-")?;
|
||||
f.serialize_u64(self.version)?;
|
||||
f.write_str("-")?;
|
||||
f.write_str(&self.node_id)
|
||||
f.serialize_u64(self.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,12 +85,15 @@ impl TableKey {
|
||||
|
||||
let version =
|
||||
u64::from_str(&captures[4]).map_err(|_| InvalidCatalogSnafu { key }.build())?;
|
||||
let node_id_str = captures[5].to_string();
|
||||
let node_id = u64::from_str(&node_id_str)
|
||||
.map_err(|_| ParseNodeIdSnafu { key: node_id_str }.build())?;
|
||||
Ok(Self {
|
||||
catalog_name: captures[1].to_string(),
|
||||
schema_name: captures[2].to_string(),
|
||||
table_name: captures[3].to_string(),
|
||||
version,
|
||||
node_id: captures[5].to_string(),
|
||||
node_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -100,7 +101,8 @@ impl TableKey {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct TableValue {
|
||||
pub id: TableId,
|
||||
pub node_id: String,
|
||||
pub node_id: u64,
|
||||
pub regions_ids: Vec<u64>,
|
||||
pub meta: RawTableMeta,
|
||||
}
|
||||
|
||||
@@ -119,7 +121,7 @@ impl TableValue {
|
||||
|
||||
pub struct CatalogKey {
|
||||
pub catalog_name: String,
|
||||
pub node_id: String,
|
||||
pub node_id: u64,
|
||||
}
|
||||
|
||||
impl Display for CatalogKey {
|
||||
@@ -128,7 +130,7 @@ impl Display for CatalogKey {
|
||||
f.write_str("-")?;
|
||||
f.write_str(&self.catalog_name)?;
|
||||
f.write_str("-")?;
|
||||
f.write_str(&self.node_id)
|
||||
f.serialize_u64(self.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,9 +141,14 @@ impl CatalogKey {
|
||||
.captures(key)
|
||||
.context(InvalidCatalogSnafu { key })?;
|
||||
ensure!(captures.len() == 3, InvalidCatalogSnafu { key });
|
||||
|
||||
let node_id_str = captures[2].to_string();
|
||||
let node_id = u64::from_str(&node_id_str)
|
||||
.map_err(|_| ParseNodeIdSnafu { key: node_id_str }.build())?;
|
||||
|
||||
Ok(Self {
|
||||
catalog_name: captures[1].to_string(),
|
||||
node_id: captures[2].to_string(),
|
||||
node_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -160,7 +167,7 @@ impl CatalogValue {
|
||||
pub struct SchemaKey {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub node_id: String,
|
||||
pub node_id: u64,
|
||||
}
|
||||
|
||||
impl Display for SchemaKey {
|
||||
@@ -171,7 +178,7 @@ impl Display for SchemaKey {
|
||||
f.write_str("-")?;
|
||||
f.write_str(&self.schema_name)?;
|
||||
f.write_str("-")?;
|
||||
f.write_str(&self.node_id)
|
||||
f.serialize_u64(self.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -183,10 +190,14 @@ impl SchemaKey {
|
||||
.context(InvalidCatalogSnafu { key })?;
|
||||
ensure!(captures.len() == 4, InvalidCatalogSnafu { key });
|
||||
|
||||
let node_id_str = captures[3].to_string();
|
||||
let node_id = u64::from_str(&node_id_str)
|
||||
.map_err(|_| ParseNodeIdSnafu { key: node_id_str }.build())?;
|
||||
|
||||
Ok(Self {
|
||||
catalog_name: captures[1].to_string(),
|
||||
schema_name: captures[2].to_string(),
|
||||
node_id: captures[3].to_string(),
|
||||
node_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -211,31 +222,31 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_parse_catalog_key() {
|
||||
let key = "__c-C-N";
|
||||
let key = "__c-C-2";
|
||||
let catalog_key = CatalogKey::parse(key).unwrap();
|
||||
assert_eq!("C", catalog_key.catalog_name);
|
||||
assert_eq!("N", catalog_key.node_id);
|
||||
assert_eq!(2, catalog_key.node_id);
|
||||
assert_eq!(key, catalog_key.to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_schema_key() {
|
||||
let key = "__s-C-S-N";
|
||||
let key = "__s-C-S-3";
|
||||
let schema_key = SchemaKey::parse(key).unwrap();
|
||||
assert_eq!("C", schema_key.catalog_name);
|
||||
assert_eq!("S", schema_key.schema_name);
|
||||
assert_eq!("N", schema_key.node_id);
|
||||
assert_eq!(3, schema_key.node_id);
|
||||
assert_eq!(key, schema_key.to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_table_key() {
|
||||
let key = "__t-C-S-T-42-N";
|
||||
let key = "__t-C-S-T-42-1";
|
||||
let entry = TableKey::parse(key).unwrap();
|
||||
assert_eq!("C", entry.catalog_name);
|
||||
assert_eq!("S", entry.schema_name);
|
||||
assert_eq!("T", entry.table_name);
|
||||
assert_eq!("N", entry.node_id);
|
||||
assert_eq!(1, entry.node_id);
|
||||
assert_eq!(42, entry.version);
|
||||
assert_eq!(key, &entry.to_string());
|
||||
}
|
||||
@@ -271,7 +282,8 @@ mod tests {
|
||||
|
||||
let value = TableValue {
|
||||
id: 42,
|
||||
node_id: "localhost".to_string(),
|
||||
node_id: 32,
|
||||
regions_ids: vec![1, 2, 3],
|
||||
meta,
|
||||
};
|
||||
let serialized = serde_json::to_string(&value).unwrap();
|
||||
|
||||
@@ -18,7 +18,7 @@ tonic = "0.8"
|
||||
|
||||
[dev-dependencies]
|
||||
futures = "0.3"
|
||||
meta-srv = { path = "../meta-srv", features = ["mock"]}
|
||||
meta-srv = { path = "../meta-srv", features = ["mock"] }
|
||||
tower = "0.4"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
Reference in New Issue
Block a user