From 7cf6c2bd5ccf48862338a571beecff184ad3a161 Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 17 Jul 2023 11:32:46 +0800 Subject: [PATCH] refactor: trying to replace TableGlobalValue, part 1 (#1956) * refactor: trying to replace TableGlobalValue, part 1 * fix: resolve PR comments --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/helper.rs | 2 + src/catalog/src/lib.rs | 1 - src/catalog/src/remote/manager.rs | 8 +- src/catalog/tests/remote_catalog_tests.rs | 2 +- src/client/src/database.rs | 2 + src/common/grpc-expr/src/insert.rs | 1 + src/common/meta/src/error.rs | 8 + src/{catalog => common/meta}/src/helper.rs | 0 src/common/meta/src/key/table_info.rs | 89 +++++- src/common/meta/src/key/table_name.rs | 91 +++++- src/common/meta/src/key/table_region.rs | 70 ++++- src/common/meta/src/lib.rs | 1 + src/common/meta/src/rpc/router.rs | 181 ++---------- src/frontend/src/catalog.rs | 128 +++++---- .../handler/invalidate_table_cache.rs | 34 +-- src/frontend/src/heartbeat/handler/tests.rs | 2 +- src/frontend/src/instance.rs | 1 - src/frontend/src/instance/distributed.rs | 14 +- .../src/instance/distributed/inserter.rs | 60 ++-- src/frontend/src/table.rs | 217 +------------- src/meta-client/examples/meta_client.rs | 64 ----- src/meta-client/src/client.rs | 110 +------ src/meta-client/src/client/router.rs | 28 +- src/meta-client/src/mocks.rs | 6 - .../src/handler/region_lease_handler.rs | 118 +++++--- src/meta-srv/src/keys.rs | 5 - src/meta-srv/src/lock.rs | 6 +- src/meta-srv/src/lock/keys.rs | 5 - src/meta-srv/src/metadata_service.rs | 4 +- src/meta-srv/src/metasrv/builder.rs | 1 - src/meta-srv/src/procedure/create_table.rs | 2 +- src/meta-srv/src/procedure/region_failover.rs | 92 +++--- .../region_failover/update_metadata.rs | 144 +++++----- src/meta-srv/src/procedure/utils.rs | 2 +- src/meta-srv/src/service/admin.rs | 2 - src/meta-srv/src/service/admin/meta.rs | 82 ++++-- src/meta-srv/src/service/ddl.rs | 5 +- src/meta-srv/src/service/router.rs | 269 ++---------------- src/meta-srv/src/table_routes.rs | 237 +++++---------- src/partition/src/route.rs | 1 + tests-integration/src/grpc.rs | 51 ++-- tests-integration/src/instance.rs | 30 +- tests-integration/src/tests.rs | 1 - tests-integration/tests/region_failover.rs | 2 +- 46 files changed, 810 insertions(+), 1373 deletions(-) rename src/{catalog => common/meta}/src/helper.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 4348785381..c59bd9e789 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4135,7 +4135,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9b26cca2132df83916692cc6fc092a26a1a76f0b#9b26cca2132df83916692cc6fc092a26a1a76f0b" +source = "git+https://github.com/MichaelScofield/greptime-proto.git?rev=a98b81b660080e13f01b9cedc9778de9aa1c19b9#a98b81b660080e13f01b9cedc9778de9aa1c19b9" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index de889a5179..c500f1c61a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git etcd-client = "0.11" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9b26cca2132df83916692cc6fc092a26a1a76f0b" } +greptime-proto = { git = "https://github.com/MichaelScofield/greptime-proto.git", rev = "a98b81b660080e13f01b9cedc9778de9aa1c19b9" } itertools = "0.10" lazy_static = "1.4" parquet = "40.0" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 8fb6e4609a..d666decfcc 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -71,6 +71,7 @@ impl From for ConcreteDataType { ColumnDataType::TimestampNanosecond => { ConcreteDataType::timestamp_nanosecond_datatype() } + _ => unimplemented!("Implemented in #1961"), } } } @@ -189,6 +190,7 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values ts_nanosecond_values: Vec::with_capacity(capacity), ..Default::default() }, + _ => unimplemented!("Implemented in #1961"), } } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 3331cb9e1e..2f808f697c 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -32,7 +32,6 @@ use table::TableRef; use crate::error::{CreateTableSnafu, Result}; pub mod error; -pub mod helper; pub mod information_schema; pub mod local; mod metrics; diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 46de3929b2..b7ff3f7d4e 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -17,6 +17,10 @@ use std::sync::Arc; use async_trait::async_trait; use common_catalog::consts::MITO_ENGINE; +use common_meta::helper::{ + build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, + TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, +}; use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; @@ -36,10 +40,6 @@ use crate::error::{ TableEngineNotFoundSnafu, TableExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnimplementedSnafu, }; -use crate::helper::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, - TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, -}; use crate::local::MemoryCatalogManager; use crate::remote::region_alive_keeper::RegionAliveKeepers; use crate::{ diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index a37c4f0a64..075234da3e 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -22,12 +22,12 @@ mod tests { use std::time::Duration; use catalog::error::Error; - use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use catalog::remote::mock::MockTableEngine; use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; + use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 47ebad0c7b..e2bfed224e 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -162,6 +162,8 @@ impl Database { schema: self.schema.clone(), authorization: self.ctx.auth_header.clone(), dbname: self.dbname.clone(), + trace_id: None, + span_id: None, }), request: Some(request), } diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index d964921856..6b6e9e38be 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -194,6 +194,7 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve Timestamp::new_nanosecond(*v) )) } + _ => unimplemented!("Implemented in #1961"), } } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index ef33c5c3b4..4c0aa34e1f 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -92,6 +92,12 @@ pub enum Error { err_msg: String, location: Location, }, + + #[snafu(display("Invalid catalog value, source: {}", source))] + InvalidCatalogValue { + source: common_catalog::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -118,6 +124,8 @@ impl ErrorExt for Error { } MetaSrv { source, .. } => source.status_code(), + + InvalidCatalogValue { source, .. } => source.status_code(), } } diff --git a/src/catalog/src/helper.rs b/src/common/meta/src/helper.rs similarity index 100% rename from src/catalog/src/helper.rs rename to src/common/meta/src/helper.rs diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 9f3e9974b8..ba46ab8932 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -13,13 +13,16 @@ // limitations under the License. use serde::{Deserialize, Serialize}; +use snafu::ResultExt; use table::metadata::{RawTableInfo, TableId}; use super::TABLE_INFO_KEY_PREFIX; -use crate::error::Result; +use crate::error::{InvalidCatalogValueSnafu, Result}; +use crate::helper::{TableGlobalKey, TableGlobalValue}; use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::KvBackendRef; -use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest}; +use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, PutRequest}; +use crate::table_name::TableName; pub struct TableInfoKey { table_id: TableId, @@ -52,6 +55,60 @@ impl TableInfoManager { Self { kv_backend } } + // TODO(LFC): Remove this method when table metadata refactor is done. + pub async fn get_old(&self, table_name: &TableName) -> Result> { + let table_global_key = TableGlobalKey { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + }; + self.kv_backend + .get(table_global_key.to_string().as_bytes()) + .await? + .map(|kv| TableGlobalValue::from_bytes(kv.value)) + .transpose() + .map(|v| { + v.map(|v| TableInfoValue { + table_info: v.table_info, + version: 0, + }) + }) + .context(InvalidCatalogValueSnafu) + } + + // TODO(LFC): Remove this method when table metadata refactor is done. + pub async fn put_old(&self, table_info: RawTableInfo) -> Result<()> { + let key = TableGlobalKey { + catalog_name: table_info.catalog_name.clone(), + schema_name: table_info.schema_name.clone(), + table_name: table_info.name.clone(), + } + .to_string(); + let raw_key = key.as_bytes(); + + let regions_id_map = self + .kv_backend + .get(raw_key) + .await? + .map(|kv| TableGlobalValue::from_bytes(kv.value())) + .transpose() + .context(InvalidCatalogValueSnafu)? + .map(|v| v.regions_id_map) + .unwrap_or_default(); + + let raw_value = TableGlobalValue { + node_id: 0, + regions_id_map, + table_info, + } + .as_bytes() + .context(InvalidCatalogValueSnafu)?; + + let req = PutRequest::new().with_key(raw_key).with_value(raw_value); + self.kv_backend.put(req).await?; + Ok(()) + } + pub async fn get(&self, table_id: TableId) -> Result> { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); @@ -66,15 +123,16 @@ impl TableInfoManager { /// with key is the same as `expect`, the value will be updated to `val`. /// /// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())` - /// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec))` - /// will be returned, the `Err(Vec)` indicates the current associated value of key. + /// - If associated value is not the same as `expect`, no value will be updated and an + /// `Ok(Err(Option))` will be returned. The `Option` indicates + /// the current associated value of key. /// - If any error happens during operation, an `Err(Error)` will be returned. pub async fn compare_and_put( &self, table_id: TableId, expect: Option, table_info: RawTableInfo, - ) -> Result>>> { + ) -> Result>> { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); @@ -98,7 +156,10 @@ impl TableInfoManager { Ok(if resp.success { Ok(()) } else { - Err(resp.prev_kv.map(|x| x.value)) + Err(resp + .prev_kv + .map(|x| TableInfoValue::try_from_raw_value(x.value)) + .transpose()?) }) } @@ -152,7 +213,21 @@ mod tests { ); assert!(manager.get(4).await.unwrap().is_none()); + // test cas failed, current value is not set let table_info = new_table_info(4); + let result = manager + .compare_and_put( + 4, + Some(TableInfoValue { + table_info: table_info.clone(), + version: 0, + }), + table_info.clone(), + ) + .await + .unwrap(); + assert!(result.unwrap_err().is_none()); + let result = manager .compare_and_put(4, None, table_info.clone()) .await @@ -165,7 +240,7 @@ mod tests { .compare_and_put(4, None, new_table_info.clone()) .await .unwrap(); - let actual = TableInfoValue::try_from_raw_value(result.unwrap_err().unwrap()).unwrap(); + let actual = result.unwrap_err().unwrap(); assert_eq!( actual, TableInfoValue { diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 4c1f1eda8b..2c16db0b92 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -15,18 +15,19 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; use super::{TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX}; -use crate::error::{InvalidTableMetadataSnafu, Result}; +use crate::error::{Error, InvalidCatalogValueSnafu, InvalidTableMetadataSnafu, Result}; +use crate::helper::{build_table_global_prefix, TableGlobalKey, TableGlobalValue}; use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest}; use crate::table_name::TableName; -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct TableNameKey<'a> { pub catalog: &'a str, pub schema: &'a str, @@ -88,6 +89,34 @@ impl<'a> From<&'a TableName> for TableNameKey<'a> { } } +impl From> for TableName { + fn from(value: TableNameKey<'_>) -> Self { + Self { + catalog_name: value.catalog.to_string(), + schema_name: value.schema.to_string(), + table_name: value.table.to_string(), + } + } +} + +impl<'a> TryFrom<&'a str> for TableNameKey<'a> { + type Error = Error; + + fn try_from(s: &'a str) -> Result { + let captures = TABLE_NAME_KEY_PATTERN + .captures(s) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Illegal TableNameKey format: '{s}'"), + })?; + // Safety: pass the regex check above + Ok(Self { + catalog: captures.get(1).unwrap().as_str(), + schema: captures.get(2).unwrap().as_str(), + table: captures.get(3).unwrap().as_str(), + }) + } +} + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] pub struct TableNameValue { table_id: TableId, @@ -118,7 +147,12 @@ impl TableNameManager { Self { kv_backend } } - pub async fn create(&self, key: &TableNameKey<'_>, table_id: TableId) -> Result { + /// Creates a new table name entry. Returns the current [TableNameValue] if the entry already existed. + pub async fn create( + &self, + key: &TableNameKey<'_>, + table_id: TableId, + ) -> Result> { let raw_key = key.as_raw_key(); let value = TableNameValue::new(table_id); let raw_value = value.try_as_raw_value()?; @@ -126,7 +160,46 @@ impl TableNameManager { .with_key(raw_key) .with_value(raw_value); let result = self.kv_backend.compare_and_put(req).await?; - Ok(result.success) + Ok(if result.success { + None + } else { + result + .prev_kv + .map(|x| TableNameValue::try_from_raw_value(x.value)) + .transpose()? + }) + } + + // TODO(LFC): Remove this method when table metadata refactor is done. + pub async fn get_old(&self, key: &TableNameKey<'_>) -> Result> { + let table_global_key = TableGlobalKey { + catalog_name: key.catalog.to_string(), + schema_name: key.schema.to_string(), + table_name: key.table.to_string(), + }; + self.kv_backend + .get(table_global_key.to_string().as_bytes()) + .await? + .map(|kv| TableGlobalValue::from_bytes(kv.value())) + .transpose() + .map(|v| v.map(|v| TableNameValue::new(v.table_id()))) + .context(InvalidCatalogValueSnafu) + } + + // TODO(LFC): Remove this method when table metadata refactor is done. + pub async fn tables_old(&self, catalog: &str, schema: &str) -> Result> { + let key = build_table_global_prefix(catalog, schema); + let req = RangeRequest::new().with_prefix(key.as_bytes()); + + let resp = self.kv_backend.range(req).await?; + + let mut table_names = Vec::with_capacity(resp.kvs.len()); + for kv in resp.kvs { + let key = TableGlobalKey::parse(String::from_utf8_lossy(kv.key())) + .context(InvalidCatalogValueSnafu)?; + table_names.push(key.table_name); + } + Ok(table_names) } pub async fn get(&self, key: &TableNameKey<'_>) -> Result> { @@ -175,12 +248,14 @@ mod tests { for i in 1..=3 { let table_name = format!("table_{}", i); let key = TableNameKey::new("my_catalog", "my_schema", &table_name); - assert!(manager.create(&key, i).await.unwrap()); + assert!(manager.create(&key, i).await.unwrap().is_none()); } let key = TableNameKey::new("my_catalog", "my_schema", "my_table"); - assert!(manager.create(&key, 99).await.unwrap()); - assert!(!manager.create(&key, 99).await.unwrap()); + assert!(manager.create(&key, 99).await.unwrap().is_none()); + + let curr = manager.create(&key, 9).await.unwrap(); + assert_eq!(Some(TableNameValue::new(99)), curr); let value = manager.get(&key).await.unwrap().unwrap(); assert_eq!(value.table_id(), 99); diff --git a/src/common/meta/src/key/table_region.rs b/src/common/meta/src/key/table_region.rs index 50c17a2443..21beeb5817 100644 --- a/src/common/meta/src/key/table_region.rs +++ b/src/common/meta/src/key/table_region.rs @@ -15,14 +15,17 @@ use std::collections::BTreeMap; use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionNumber; use table::metadata::TableId; use super::TABLE_REGION_KEY_PREFIX; -use crate::error::Result; +use crate::error::{InvalidCatalogValueSnafu, InvalidTableMetadataSnafu, Result}; +use crate::helper::{TableGlobalKey, TableGlobalValue}; use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::KvBackendRef; -use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest}; +use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, PutRequest}; +use crate::table_name::TableName; use crate::DatanodeId; pub type RegionDistribution = BTreeMap>; @@ -68,6 +71,69 @@ impl TableRegionManager { .transpose() } + // TODO(LFC): Remove this method when table metadata refactor is done. + pub async fn get_old(&self, table_name: &TableName) -> Result> { + let key = TableGlobalKey { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + } + .to_string(); + let raw_key = key.as_bytes(); + + self.kv_backend + .get(raw_key) + .await? + .map(|kv| TableGlobalValue::from_bytes(kv.value())) + .transpose() + .map(|v| { + v.map(|v| TableRegionValue { + region_distribution: v.regions_id_map.into_iter().collect(), + version: 0, + }) + }) + .context(InvalidCatalogValueSnafu) + } + + // TODO(LFC): Remove this method when table metadata refactor is done. + pub async fn put_old( + &self, + table_name: &TableName, + region_distribution: RegionDistribution, + ) -> Result<()> { + let key = TableGlobalKey { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + } + .to_string(); + let raw_key = key.as_bytes(); + + let table_info = self + .kv_backend + .get(raw_key) + .await? + .map(|kv| TableGlobalValue::from_bytes(kv.value())) + .transpose() + .context(InvalidCatalogValueSnafu)? + .map(|v| v.table_info) + .with_context(|| InvalidTableMetadataSnafu { + err_msg: format!("table global value for {table_name} is empty"), + })?; + + let raw_value = TableGlobalValue { + node_id: 0, + regions_id_map: region_distribution.into_iter().collect(), + table_info, + } + .as_bytes() + .context(InvalidCatalogValueSnafu)?; + + let req = PutRequest::new().with_key(raw_key).with_value(raw_value); + self.kv_backend.put(req).await?; + Ok(()) + } + /// Compare and put value of key. `expect` is the expected value, if backend's current value associated /// with key is the same as `expect`, the value will be updated to `val`. /// diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index aa93de1df1..5906708c36 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -16,6 +16,7 @@ pub mod error; pub mod heartbeat; +pub mod helper; pub mod ident; pub mod instruction; pub mod key; diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index e2b44108fc..b850a31247 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -15,61 +15,24 @@ use std::collections::{HashMap, HashSet}; use api::v1::meta::{ - CreateRequest as PbCreateRequest, DeleteRequest as PbDeleteRequest, Partition as PbPartition, - Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute, + Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute, RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable, - TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue, + TableId as PbTableId, TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue, }; use serde::{Deserialize, Serialize, Serializer}; -use snafu::{OptionExt, ResultExt}; +use snafu::OptionExt; use store_api::storage::{RegionId, RegionNumber}; -use table::metadata::RawTableInfo; +use table::metadata::TableId; use crate::error::{self, Result}; use crate::peer::Peer; use crate::rpc::util; use crate::table_name::TableName; -#[derive(Debug, Clone)] -pub struct CreateRequest<'a> { - pub table_name: TableName, - pub partitions: Vec, - pub table_info: &'a RawTableInfo, -} - -impl TryFrom> for PbCreateRequest { - type Error = error::Error; - - fn try_from(mut req: CreateRequest) -> Result { - Ok(Self { - header: None, - table_name: Some(req.table_name.into()), - partitions: req.partitions.drain(..).map(Into::into).collect(), - table_info: serde_json::to_vec(&req.table_info).context(error::SerdeJsonSnafu)?, - }) - } -} - -impl<'a> CreateRequest<'a> { - #[inline] - pub fn new(table_name: TableName, table_info: &'a RawTableInfo) -> Self { - Self { - table_name, - partitions: vec![], - table_info, - } - } - - #[inline] - pub fn add_partition(mut self, partition: Partition) -> Self { - self.partitions.push(partition); - self - } -} - #[derive(Debug, Clone, Default)] pub struct RouteRequest { pub table_names: Vec, + pub table_ids: Vec, } impl From for PbRouteRequest { @@ -77,15 +40,17 @@ impl From for PbRouteRequest { Self { header: None, table_names: req.table_names.drain(..).map(Into::into).collect(), + table_ids: req.table_ids.drain(..).map(|id| PbTableId { id }).collect(), } } } impl RouteRequest { #[inline] - pub fn new() -> Self { + pub fn new(table_id: TableId) -> Self { Self { table_names: vec![], + table_ids: vec![table_id], } } @@ -96,27 +61,6 @@ impl RouteRequest { } } -#[derive(Debug, Clone)] -pub struct DeleteRequest { - pub table_name: TableName, -} - -impl From for PbDeleteRequest { - fn from(req: DeleteRequest) -> Self { - Self { - header: None, - table_name: Some(req.table_name.into()), - } - } -} - -impl DeleteRequest { - #[inline] - pub fn new(table_name: TableName) -> Self { - Self { table_name } - } -} - #[derive(Debug, Clone)] pub struct RouteResponse { pub table_routes: Vec, @@ -423,97 +367,13 @@ impl From for Partition { #[cfg(test)] mod tests { use api::v1::meta::{ - DeleteRequest as PbDeleteRequest, Partition as PbPartition, Peer as PbPeer, - Region as PbRegion, RegionRoute as PbRegionRoute, RouteRequest as PbRouteRequest, - RouteResponse as PbRouteResponse, Table as PbTable, TableName as PbTableName, - TableRoute as PbTableRoute, + Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute, + RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable, + TableName as PbTableName, TableRoute as PbTableRoute, }; - use chrono::DateTime; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, RawSchema}; - use table::metadata::{RawTableMeta, TableIdent, TableType}; - use table::requests::TableOptions; use super::*; - fn new_table_info() -> RawTableInfo { - RawTableInfo { - ident: TableIdent { - table_id: 0, - version: 0, - }, - name: "t1".to_string(), - desc: None, - catalog_name: "c1".to_string(), - schema_name: "s1".to_string(), - meta: RawTableMeta { - schema: RawSchema { - column_schemas: vec![ - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - ColumnSchema::new("c1", ConcreteDataType::string_datatype(), true), - ColumnSchema::new("c2", ConcreteDataType::string_datatype(), true), - ], - timestamp_index: Some(0), - version: 0, - }, - primary_key_indices: vec![], - value_indices: vec![], - engine: "mito".to_string(), - next_column_id: 0, - region_numbers: vec![], - engine_options: HashMap::new(), - options: TableOptions::default(), - created_on: DateTime::default(), - }, - table_type: TableType::Base, - } - } - - #[test] - fn test_create_request_trans() { - let req = CreateRequest { - table_name: TableName::new("c1", "s1", "t1"), - partitions: vec![ - Partition { - column_list: vec![b"c1".to_vec(), b"c2".to_vec()], - value_list: vec![b"v1".to_vec(), b"v2".to_vec()], - }, - Partition { - column_list: vec![b"c1".to_vec(), b"c2".to_vec()], - value_list: vec![b"v11".to_vec(), b"v22".to_vec()], - }, - ], - table_info: &new_table_info(), - }; - let into_req: PbCreateRequest = req.try_into().unwrap(); - - assert!(into_req.header.is_none()); - let table_name = into_req.table_name; - assert_eq!("c1", table_name.as_ref().unwrap().catalog_name); - assert_eq!("s1", table_name.as_ref().unwrap().schema_name); - assert_eq!("t1", table_name.as_ref().unwrap().table_name); - assert_eq!( - vec![b"c1".to_vec(), b"c2".to_vec()], - into_req.partitions.get(0).unwrap().column_list - ); - assert_eq!( - vec![b"v1".to_vec(), b"v2".to_vec()], - into_req.partitions.get(0).unwrap().value_list - ); - assert_eq!( - vec![b"c1".to_vec(), b"c2".to_vec()], - into_req.partitions.get(1).unwrap().column_list - ); - assert_eq!( - vec![b"v11".to_vec(), b"v22".to_vec()], - into_req.partitions.get(1).unwrap().value_list - ); - } - #[test] fn test_route_request_trans() { let req = RouteRequest { @@ -521,6 +381,7 @@ mod tests { TableName::new("c1", "s1", "t1"), TableName::new("c2", "s2", "t2"), ], + table_ids: vec![1, 2, 3], }; let into_req: PbRouteRequest = req.into(); @@ -532,20 +393,10 @@ mod tests { assert_eq!("c2", into_req.table_names.get(1).unwrap().catalog_name); assert_eq!("s2", into_req.table_names.get(1).unwrap().schema_name); assert_eq!("t2", into_req.table_names.get(1).unwrap().table_name); - } - - #[test] - fn test_delete_request_trans() { - let req = DeleteRequest { - table_name: TableName::new("c1", "s1", "t1"), - }; - - let into_req: PbDeleteRequest = req.into(); - - assert!(into_req.header.is_none()); - assert_eq!("c1", into_req.table_name.as_ref().unwrap().catalog_name); - assert_eq!("s1", into_req.table_name.as_ref().unwrap().schema_name); - assert_eq!("t1", into_req.table_name.as_ref().unwrap().table_name); + assert_eq!( + (1..=3).map(|id| PbTableId { id }).collect::>(), + into_req.table_ids + ); } #[test] diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 08e2396223..21891336b1 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -21,10 +21,6 @@ use catalog::error::{ self as catalog_err, InternalSnafu, InvalidCatalogValueSnafu, InvalidSystemTableDefSnafu, Result as CatalogResult, TableMetadataManagerSnafu, UnimplementedSnafu, }; -use catalog::helper::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, - TableGlobalKey, TableGlobalValue, -}; use catalog::information_schema::InformationSchemaProvider; use catalog::remote::KvCacheInvalidatorRef; use catalog::{ @@ -34,14 +30,21 @@ use catalog::{ use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME}; use common_error::ext::BoxedError; -use common_meta::key::TableMetadataManagerRef; +use common_meta::helper::{ + build_catalog_prefix, build_schema_prefix, CatalogKey, SchemaKey, TableGlobalKey, +}; +use common_meta::key::table_info::TableInfoKey; +use common_meta::key::table_name::TableNameKey; +use common_meta::key::table_region::TableRegionKey; +use common_meta::key::{TableMetaKey, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::rpc::store::RangeRequest; use common_meta::rpc::KeyValue; use common_meta::table_name::TableName; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use partition::manager::PartitionRuleManagerRef; use snafu::prelude::*; +use table::metadata::TableId; use table::table::numbers::NumbersTable; use table::TableRef; @@ -110,7 +113,13 @@ impl FrontendCatalogManager { self.backend_cache_invalidator.invalidate_key(key).await; } - pub async fn invalidate_table(&self, catalog: &str, schema: &str, table: &str) { + pub async fn invalidate_table( + &self, + catalog: &str, + schema: &str, + table: &str, + table_id: TableId, + ) { let tg_key = TableGlobalKey { catalog_name: catalog.into(), schema_name: schema.into(), @@ -121,6 +130,38 @@ impl FrontendCatalogManager { let tg_key = tg_key.as_bytes(); self.backend_cache_invalidator.invalidate_key(tg_key).await; + + let key = TableNameKey::new(catalog, schema, table); + self.backend_cache_invalidator + .invalidate_key(&key.as_raw_key()) + .await; + debug!( + "invalidated cache key: {}", + String::from_utf8_lossy(&key.as_raw_key()) + ); + + let key = TableInfoKey::new(table_id); + self.backend_cache_invalidator + .invalidate_key(&key.as_raw_key()) + .await; + debug!( + "invalidated cache key: {}", + String::from_utf8_lossy(&key.as_raw_key()) + ); + + let key = TableRegionKey::new(table_id); + self.backend_cache_invalidator + .invalidate_key(&key.as_raw_key()) + .await; + debug!( + "invalidated cache key: {}", + String::from_utf8_lossy(&key.as_raw_key()) + ); + + self.partition_manager + .table_routes() + .invalidate_table_route(&TableName::new(catalog, schema, table)) + .await; } } @@ -141,12 +182,7 @@ impl CatalogManager for FrontendCatalogManager { Ok(true) } - async fn deregister_table(&self, request: DeregisterTableRequest) -> CatalogResult<()> { - let table_name = TableName::new(request.catalog, request.schema, request.table_name); - self.partition_manager - .table_routes() - .invalidate_table_route(&table_name) - .await; + async fn deregister_table(&self, _request: DeregisterTableRequest) -> CatalogResult<()> { Ok(()) } @@ -304,29 +340,17 @@ impl CatalogManager for FrontendCatalogManager { } async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult> { - let mut tables = vec![]; + let mut tables = self + .table_metadata_manager + .table_name_manager() + .tables_old(catalog, schema) + .await + .context(TableMetadataManagerSnafu)?; + if catalog == DEFAULT_CATALOG_NAME && schema == DEFAULT_SCHEMA_NAME { tables.push("numbers".to_string()); } - let key = build_table_global_prefix(catalog, schema); - let req = RangeRequest::new().with_prefix(key.as_bytes()); - let iter = self - .backend - .range(req) - .await - .context(TableMetadataManagerSnafu)? - .kvs - .into_iter(); - - let result = iter - .map(|KeyValue { key: k, value: _ }| { - let key = TableGlobalKey::parse(String::from_utf8_lossy(&k)) - .context(InvalidCatalogValueSnafu)?; - Ok(key.table_name) - }) - .collect::>>()?; - tables.extend(result); Ok(tables) } @@ -360,17 +384,13 @@ impl CatalogManager for FrontendCatalogManager { } async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult { - let table_global_key = TableGlobalKey { - catalog_name: catalog.to_string(), - schema_name: schema.to_string(), - table_name: table.to_string(), - }; - Ok(self - .backend() - .get(table_global_key.to_string().as_bytes()) + let key = TableNameKey::new(catalog, schema, table); + self.table_metadata_manager + .table_name_manager() + .get_old(&key) .await - .context(TableMetadataManagerSnafu)? - .is_some()) + .context(TableMetadataManagerSnafu) + .map(|x| x.is_some()) } async fn table( @@ -400,15 +420,20 @@ impl CatalogManager for FrontendCatalogManager { return provider.table(table_name); } - let table_global_key = TableGlobalKey { - catalog_name: catalog.to_string(), - schema_name: schema.to_string(), - table_name: table_name.to_string(), - }; - let Some(kv) = self.backend().get(table_global_key.to_string().as_bytes()).await.context(TableMetadataManagerSnafu)? else { - return Ok(None); - }; - let v = TableGlobalValue::from_bytes(kv.value).context(InvalidCatalogValueSnafu)?; + let key = TableNameKey::new(catalog, schema, table_name); + let Some(table_name_value) = self.table_metadata_manager + .table_name_manager() + .get_old(&key) + .await + .context(TableMetadataManagerSnafu)? else { return Ok(None) }; + let _table_id = table_name_value.table_id(); + + let Some(v) = self.table_metadata_manager + .table_info_manager() + .get_old(&key.into()) + .await + .context(TableMetadataManagerSnafu)? else { return Ok(None) }; + let table_info = Arc::new( v.table_info .try_into() @@ -418,7 +443,6 @@ impl CatalogManager for FrontendCatalogManager { TableName::new(catalog, schema, table_name), table_info, Arc::new(self.clone()), - self.table_metadata_manager.clone(), )); Ok(Some(table)) } diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 38de5c20e6..d85fe4f1f7 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -13,14 +13,16 @@ // limitations under the License. use async_trait::async_trait; -use catalog::helper::TableGlobalKey; use catalog::remote::KvCacheInvalidatorRef; use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; +use common_meta::helper::TableGlobalKey; use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::key::table_region::TableRegionKey; +use common_meta::key::TableMetaKey; use common_meta::table_name::TableName; use common_telemetry::{error, info}; use partition::manager::TableRouteCacheInvalidatorRef; @@ -48,15 +50,8 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { let mailbox = ctx.mailbox.clone(); let self_ref = self.clone(); - let TableIdent { - catalog, - schema, - table, - .. - } = table_ident; - let _handle = common_runtime::spawn_bg(async move { - self_ref.invalidate_table(&catalog, &schema, &table).await; + self_ref.invalidate_table_cache(table_ident).await; if let Err(e) = mailbox .send(( @@ -87,11 +82,11 @@ impl InvalidateTableCacheHandler { } } - async fn invalidate_table(&self, catalog_name: &str, schema_name: &str, table_name: &str) { + async fn invalidate_table_cache(&self, table_ident: TableIdent) { let tg_key = TableGlobalKey { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), + catalog_name: table_ident.catalog.clone(), + schema_name: table_ident.schema.clone(), + table_name: table_ident.table.clone(), } .to_string(); info!("invalidate table cache: {}", tg_key); @@ -99,14 +94,15 @@ impl InvalidateTableCacheHandler { self.backend_cache_invalidator.invalidate_key(tg_key).await; - let table = &TableName { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - }; + let key = &TableRegionKey::new(table_ident.table_id).as_raw_key(); + self.backend_cache_invalidator.invalidate_key(key).await; self.table_route_cache_invalidator - .invalidate_table_route(table) + .invalidate_table_route(&TableName::new( + table_ident.catalog, + table_ident.schema, + table_ident.table, + )) .await; } } diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index 411da89377..2d712f8a8e 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -17,12 +17,12 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use api::v1::meta::HeartbeatResponse; -use catalog::helper::TableGlobalKey; use catalog::remote::KvCacheInvalidator; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; +use common_meta::helper::TableGlobalKey; use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::table_name::TableName; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 7c2de4eae2..1a907bff19 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -163,7 +163,6 @@ impl Instance { meta_client.clone(), Arc::new(catalog_manager.clone()), datanode_clients.clone(), - table_metadata_manager.clone(), ); let dist_instance = Arc::new(dist_instance); diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index dd2be04286..4b83432bcd 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -25,7 +25,6 @@ use api::v1::{ FlushTableExpr, InsertRequests, }; use async_trait::async_trait; -use catalog::helper::{SchemaKey, SchemaValue}; use catalog::{CatalogManager, RegisterTableRequest}; use chrono::DateTime; use client::client_manager::DatanodeClients; @@ -33,7 +32,7 @@ use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; -use common_meta::key::TableMetadataManagerRef; +use common_meta::helper::{SchemaKey, SchemaValue}; use common_meta::peer::Peer; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition as MetaPartition, RouteRequest}; @@ -82,7 +81,6 @@ pub struct DistInstance { meta_client: Arc, catalog_manager: Arc, datanode_clients: Arc, - table_metadata_manager: TableMetadataManagerRef, } impl DistInstance { @@ -90,13 +88,11 @@ impl DistInstance { meta_client: Arc, catalog_manager: Arc, datanode_clients: Arc, - table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { meta_client, catalog_manager, datanode_clients, - table_metadata_manager, } } @@ -132,7 +128,6 @@ impl DistInstance { table_name.clone(), table_info, self.catalog_manager.clone(), - self.table_metadata_manager.clone(), )); let request = RegisterTableRequest { @@ -161,6 +156,7 @@ impl DistInstance { &table_name.catalog_name, &table_name.schema_name, &table_name.table_name, + table_id, ) .await; @@ -194,6 +190,7 @@ impl DistInstance { &table_name.catalog_name, &table_name.schema_name, &table_name.table_name, + table_id, ) .await; @@ -265,7 +262,7 @@ impl DistInstance { table_name: &TableName, region_number: Option, ) -> Result> { - let _ = self + let table = self .catalog_manager .table( &table_name.catalog_name, @@ -277,11 +274,13 @@ impl DistInstance { .with_context(|| TableNotFoundSnafu { table_name: table_name.to_string(), })?; + let table_id = table.table_info().table_id(); let route_response = self .meta_client .route(RouteRequest { table_names: vec![table_name.clone()], + table_ids: vec![table_id], }) .await .context(RequestMetaSnafu)?; @@ -521,6 +520,7 @@ impl DistInstance { &table_info.catalog_name, &table_info.schema_name, &table_info.name, + table_info.table_id(), ) .await; diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index f41b9c9fc9..cc2beb977c 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -176,19 +176,19 @@ impl DistInserter { mod tests { use api::v1::column::{SemanticType, Values}; use api::v1::{Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; - use catalog::helper::{ - CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, - }; use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_meta::key::TableMetadataManager; + use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; + use common_meta::key::table_name::TableNameKey; + use common_meta::key::table_region::RegionDistribution; + use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackend, KvBackendRef}; use common_meta::rpc::store::PutRequest; use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema}; use datatypes::vectors::Int32Vector; - use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder}; use super::*; use crate::heartbeat::handler::tests::MockKvCacheInvalidator; @@ -219,13 +219,10 @@ mod tests { backend } - async fn create_testing_table(backend: &KvBackendRef, table_name: &str) { - let table_global_key = TableGlobalKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - }; - + async fn create_testing_table( + table_name: &str, + table_metadata_manager: &TableMetadataManagerRef, + ) { let schema = Arc::new(Schema::new(vec![ ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) .with_time_index(true) @@ -243,20 +240,35 @@ mod tests { .build() .unwrap(); - let table_info = TableInfoBuilder::new(table_name, table_meta) + let table_id = 1; + let table_info: RawTableInfo = TableInfoBuilder::new(table_name, table_meta) + .table_id(table_id) .build() - .unwrap(); + .unwrap() + .into(); - let table_global_value = TableGlobalValue { - node_id: 1, - regions_id_map: HashMap::from([(1, vec![1]), (2, vec![2]), (3, vec![3])]), - table_info: table_info.into(), - }; + let key = TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); + assert!(table_metadata_manager + .table_name_manager() + .create(&key, table_id) + .await + .unwrap() + .is_none()); - let req = PutRequest::new() - .with_key(table_global_key.to_string().as_bytes()) - .with_value(table_global_value.as_bytes().unwrap()); - backend.put(req).await.unwrap(); + assert!(table_metadata_manager + .table_info_manager() + .put_old(table_info) + .await + .is_ok()); + + assert!(table_metadata_manager + .table_region_manager() + .put_old( + &key.into(), + RegionDistribution::from([(1, vec![1]), (2, vec![2]), (3, vec![3])]), + ) + .await + .is_ok()); } #[tokio::test] @@ -265,7 +277,7 @@ mod tests { let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone())); let table_name = "one_column_partitioning_table"; - create_testing_table(&backend, table_name).await; + create_testing_table(table_name, &table_metadata_manager).await; let catalog_manager = Arc::new(FrontendCatalogManager::new( backend, diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 67c6279808..67a66714df 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -17,13 +17,9 @@ use std::iter; use std::pin::Pin; use std::sync::Arc; -use api::v1::AlterExpr; use async_trait::async_trait; -use catalog::helper::{TableGlobalKey, TableGlobalValue}; use client::Database; use common_error::ext::BoxedError; -use common_meta::key::{TableMetadataManagerRef, TableRouteKey}; -use common_meta::rpc::store::{MoveValueRequest, PutRequest}; use common_meta::table_name::TableName; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; @@ -36,7 +32,6 @@ use common_recordbatch::error::{ use common_recordbatch::{ RecordBatch, RecordBatchStreamAdaptor, RecordBatches, SendableRecordBatchStream, }; -use common_telemetry::debug; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::{ Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream, @@ -44,21 +39,17 @@ use datafusion::physical_plan::{ use datafusion_common::DataFusionError; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use futures_util::{Stream, StreamExt}; -use partition::manager::TableRouteCacheInvalidator; use partition::splitter::WriteSplitter; use snafu::prelude::*; use store_api::storage::{RegionNumber, ScanRequest}; use table::error::TableOperationSnafu; -use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; -use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest}; -use table::table::AlterContext; +use table::metadata::{FilterPushDownType, TableInfoRef}; +use table::requests::{DeleteRequest, InsertRequest}; use table::Table; use tokio::sync::RwLock; use crate::catalog::FrontendCatalogManager; -use crate::error::{ - self, FindDatanodeSnafu, FindTableRouteSnafu, Result, TableMetadataManagerSnafu, -}; +use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, Result}; use crate::instance::distributed::inserter::DistInserter; use crate::table::delete::to_grpc_delete_request; use crate::table::scan::{DatanodeInstance, TableScanPlan}; @@ -72,8 +63,6 @@ pub struct DistTable { table_name: TableName, table_info: TableInfoRef, catalog_manager: Arc, - #[allow(unused)] - table_metadata_manager: TableMetadataManagerRef, } #[async_trait] @@ -179,13 +168,6 @@ impl Table for DistTable { Ok(vec![FilterPushDownType::Inexact; filters.len()]) } - async fn alter(&self, context: AlterContext, request: &AlterTableRequest) -> table::Result<()> { - self.handle_alter(context, request) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu) - } - async fn delete(&self, request: DeleteRequest) -> table::Result { let partition_manager = self.catalog_manager.partition_manager(); @@ -244,205 +226,14 @@ impl DistTable { table_name: TableName, table_info: TableInfoRef, catalog_manager: Arc, - table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { table_name, table_info, catalog_manager, - table_metadata_manager, } } - pub async fn table_global_value( - &self, - key: &TableGlobalKey, - ) -> Result> { - let raw = self - .catalog_manager - .backend() - .get(key.to_string().as_bytes()) - .await - .context(TableMetadataManagerSnafu)?; - Ok(if let Some(raw) = raw { - Some(TableGlobalValue::from_bytes(raw.value).context(error::CatalogEntrySerdeSnafu)?) - } else { - None - }) - } - - async fn set_table_global_value( - &self, - key: TableGlobalKey, - value: TableGlobalValue, - ) -> Result<()> { - let value = value.as_bytes().context(error::CatalogEntrySerdeSnafu)?; - let req = PutRequest::new() - .with_key(key.to_string().as_bytes()) - .with_value(value); - let _ = self - .catalog_manager - .backend() - .put(req) - .await - .context(TableMetadataManagerSnafu)?; - Ok(()) - } - - async fn delete_table_global_value(&self, key: TableGlobalKey) -> Result<()> { - let _ = self - .catalog_manager - .backend() - .delete(key.to_string().as_bytes(), false) - .await - .context(TableMetadataManagerSnafu)?; - Ok(()) - } - - async fn move_table_route_value( - &self, - catalog_name: &str, - schema_name: &str, - table_id: u32, - old_table_name: &str, - new_table_name: &str, - ) -> Result<()> { - let old_key = TableRouteKey { - table_id, - catalog_name, - schema_name, - table_name: old_table_name, - } - .to_string(); - - let new_key = TableRouteKey { - table_id, - catalog_name, - schema_name, - table_name: new_table_name, - } - .to_string(); - - let req = MoveValueRequest::new(old_key.as_bytes(), new_key.as_bytes()); - self.catalog_manager - .backend() - .move_value(req) - .await - .context(TableMetadataManagerSnafu)?; - - self.catalog_manager - .partition_manager() - .invalidate_table_route(&TableName { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: old_table_name.to_string(), - }) - .await; - - Ok(()) - } - - async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> { - let AlterTableRequest { - catalog_name, - schema_name, - table_name, - alter_kind, - table_id: _table_id, - .. - } = request; - - let alter_expr = context - .get::() - .context(error::ContextValueNotFoundSnafu { key: "AlterExpr" })?; - - self.alter_by_expr(alter_expr).await?; - - let table_info = self.table_info(); - let new_meta = table_info - .meta - .builder_with_alter_kind(table_name, &request.alter_kind) - .context(error::TableSnafu)? - .build() - .context(error::BuildTableMetaSnafu { - table_name: table_name.clone(), - })?; - - let mut new_info = TableInfo::clone(&*table_info); - new_info.ident.version = table_info.ident.version + 1; - new_info.meta = new_meta; - - let key = TableGlobalKey { - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: table_name.clone(), - }; - let mut value = self - .table_global_value(&key) - .await? - .context(error::TableNotFoundSnafu { table_name })?; - - value.table_info = new_info.into(); - - if let AlterKind::RenameTable { new_table_name } = alter_kind { - let new_key = TableGlobalKey { - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: new_table_name.clone(), - }; - self.set_table_global_value(new_key, value).await?; - self.delete_table_global_value(key).await?; - self.move_table_route_value( - catalog_name, - schema_name, - table_info.ident.table_id, - table_name, - new_table_name, - ) - .await?; - Ok(()) - } else { - self.set_table_global_value(key, value).await - } - } - - /// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between - /// [`table::requests::AlterTableRequest`] and [`AlterExpr`]. - async fn alter_by_expr(&self, expr: &AlterExpr) -> Result<()> { - let table_routes = self - .catalog_manager - .partition_manager() - .find_table_route(&self.table_name) - .await - .with_context(|_| error::FindTableRouteSnafu { - table_name: self.table_name.to_string(), - })?; - let leaders = table_routes.find_leaders(); - ensure!( - !leaders.is_empty(), - error::LeaderNotFoundSnafu { - table: format!( - "{:?}.{:?}.{}", - expr.catalog_name, expr.schema_name, expr.table_name - ) - } - ); - - let datanode_clients = self.catalog_manager.datanode_clients(); - for datanode in leaders { - let client = datanode_clients.get_client(&datanode).await; - let db = Database::new(&expr.catalog_name, &expr.schema_name, client); - debug!("Sending {:?} to {:?}", expr, db); - let result = db - .alter(expr.clone()) - .await - .context(error::RequestDatanodeSnafu)?; - debug!("Alter table result: {:?}", result); - // TODO(hl): We should further check and track alter result in some global DDL task tracker - } - Ok(()) - } - async fn find_datanode_instances( &self, regions: &[RegionNumber], @@ -698,7 +489,7 @@ pub(crate) mod test { ); let table_route = TableRoute::new( Table { - id: 1, + id: 2, table_name: table_name.clone(), table_schema: vec![], }, diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index 88958eaaf6..304199812e 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -12,23 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::time::Duration; use api::v1::meta::{HeartbeatRequest, Peer, Role}; -use chrono::DateTime; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::rpc::router::{CreateRequest, Partition}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest, }; -use common_meta::table_name::TableName; -use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, RawSchema}; use meta_client::client::MetaClientBuilder; -use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; -use table::requests::TableOptions; use tracing::{event, subscriber, Level}; use tracing_subscriber::FmtSubscriber; @@ -78,25 +70,6 @@ async fn run() { } }); - let p1 = Partition { - column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()], - value_list: vec![b"k1".to_vec(), b"k2".to_vec()], - }; - - let p2 = Partition { - column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()], - value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()], - }; - - let table_name = TableName::new("test_catalog", "test_schema", "test_table"); - let table_info = new_table_info(); - let create_req = CreateRequest::new(table_name, &table_info) - .add_partition(p1) - .add_partition(p2); - - let res = meta_client.create_route(create_req).await; - event!(Level::INFO, "create_route result: {:#?}", res); - // put let put = PutRequest::new() .with_key(b"key1".to_vec()) @@ -173,40 +146,3 @@ async fn run() { let res = meta_client.batch_get(batch_get).await.unwrap(); event!(Level::INFO, "batch get result: {:#?}", res); } - -fn new_table_info() -> RawTableInfo { - RawTableInfo { - ident: TableIdent { - table_id: 0, - version: 0, - }, - name: "test_table".to_string(), - desc: None, - catalog_name: "test_catalog".to_string(), - schema_name: "test_schema".to_string(), - meta: RawTableMeta { - schema: RawSchema { - column_schemas: vec![ - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true), - ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true), - ], - timestamp_index: Some(0), - version: 0, - }, - primary_key_indices: vec![], - value_indices: vec![], - engine: "mito".to_string(), - next_column_id: 0, - region_numbers: vec![], - engine_options: HashMap::new(), - options: TableOptions::default(), - created_on: DateTime::default(), - }, - table_type: TableType::Base, - } -} diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index e394aa2a09..d82078145d 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -24,7 +24,7 @@ use api::v1::meta::Role; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; -use common_meta::rpc::router::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse}; +use common_meta::rpc::router::{RouteRequest, RouteResponse}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, @@ -41,7 +41,7 @@ use store::Client as StoreClient; pub use self::heartbeat::{HeartbeatSender, HeartbeatStream}; use crate::error; -use crate::error::{ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Result}; +use crate::error::{ConvertMetaResponseSnafu, Result}; pub type Id = (u64, u64); @@ -224,21 +224,6 @@ impl MetaClient { self.heartbeat_client()?.heartbeat().await } - /// Provides routing information for distributed create table requests. - /// - /// When a distributed create table request is received, this method returns - /// a list of `datanode` addresses that are generated based on the partition - /// information contained in the request and using some intelligent policies, - /// such as load-based. - pub async fn create_route(&self, req: CreateRequest<'_>) -> Result { - let req = req.try_into().context(ConvertMetaRequestSnafu)?; - self.router_client()? - .create(req) - .await? - .try_into() - .context(ConvertMetaResponseSnafu) - } - /// Fetch routing information for tables. The smallest unit is the complete /// routing information(all regions) of a table. /// @@ -266,17 +251,6 @@ impl MetaClient { .context(ConvertMetaResponseSnafu) } - /// Can be called repeatedly, the first call will delete and return the - /// table of routing information, the nth call can still return the - /// deleted route information. - pub async fn delete_route(&self, req: DeleteRequest) -> Result { - self.router_client()? - .delete(req.into()) - .await? - .try_into() - .context(ConvertMetaResponseSnafu) - } - /// Range gets the keys in the range from the key-value store. pub async fn range(&self, req: RangeRequest) -> Result { self.store_client()? @@ -424,20 +398,10 @@ impl MetaClient { #[cfg(test)] mod tests { - use std::collections::HashMap; - use std::sync::Arc; - use api::v1::meta::{HeartbeatRequest, Peer}; - use chrono::DateTime; - use common_meta::rpc::router::Partition; - use common_meta::table_name::TableName; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, RawSchema}; use meta_srv::metasrv::SelectorContext; use meta_srv::selector::{Namespace, Selector}; use meta_srv::Result as MetaResult; - use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; - use table::requests::TableOptions; use super::*; use crate::mocks; @@ -547,39 +511,6 @@ mod tests { assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } - fn new_table_info(table_name: &TableName) -> RawTableInfo { - RawTableInfo { - ident: TableIdent { - table_id: 0, - version: 0, - }, - name: table_name.table_name.clone(), - desc: None, - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - meta: RawTableMeta { - schema: RawSchema { - column_schemas: vec![ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - )], - timestamp_index: Some(0), - version: 0, - }, - primary_key_indices: vec![], - value_indices: vec![], - engine: "mito".to_string(), - next_column_id: 0, - region_numbers: vec![], - engine_options: HashMap::new(), - options: TableOptions::default(), - created_on: DateTime::default(), - }, - table_type: TableType::Base, - } - } - #[tokio::test] async fn test_not_start_router_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; @@ -589,10 +520,8 @@ mod tests { .build(); meta_client.start(urls).await.unwrap(); - let table_name = TableName::new("c", "s", "t"); - let table_info = new_table_info(&table_name); - let req = CreateRequest::new(table_name, &table_info); - let res = meta_client.create_route(req).await; + let req = RouteRequest::new(1); + let res = meta_client.route(req).await; assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } @@ -671,37 +600,6 @@ mod tests { } } - #[tokio::test] - async fn test_route() { - let selector = Arc::new(MockSelector {}); - let client = mocks::mock_client_with_memorystore_and_selector(selector).await; - - let p1 = Partition { - column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()], - value_list: vec![b"k1".to_vec(), b"k2".to_vec()], - }; - let p2 = Partition { - column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()], - value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()], - }; - let table_name = TableName::new("test_catalog", "test_schema", "test_table"); - let table_info = new_table_info(&table_name); - let req = CreateRequest::new(table_name.clone(), &table_info) - .add_partition(p1) - .add_partition(p2); - - let res = client.create_route(req).await.unwrap(); - assert_eq!(1, res.table_routes.len()); - - let req = RouteRequest::new().add_table_name(table_name.clone()); - let res = client.route(req).await.unwrap(); - assert!(!res.table_routes.is_empty()); - - let req = DeleteRequest::new(table_name.clone()); - let res = client.delete_route(req).await; - let _ = res.unwrap(); - } - #[tokio::test] async fn test_range_get() { let tc = new_client("test_range_get").await; diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs index a32020e9cf..66fd94f684 100644 --- a/src/meta-client/src/client/router.rs +++ b/src/meta-client/src/client/router.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::router_client::RouterClient; -use api::v1::meta::{CreateRequest, DeleteRequest, Role, RouteRequest, RouteResponse}; +use api::v1::meta::{Role, RouteRequest, RouteResponse}; use common_grpc::channel_manager::ChannelManager; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::RwLock; @@ -57,20 +57,10 @@ impl Client { inner.is_started() } - pub async fn create(&self, req: CreateRequest) -> Result { - let inner = self.inner.read().await; - inner.create(req).await - } - pub async fn route(&self, req: RouteRequest) -> Result { let inner = self.inner.read().await; inner.route(req).await } - - pub async fn delete(&self, req: DeleteRequest) -> Result { - let inner = self.inner.read().await; - inner.delete(req).await - } } #[derive(Debug)] @@ -105,14 +95,6 @@ impl Inner { Ok(()) } - async fn create(&self, mut req: CreateRequest) -> Result { - let mut client = self.random_client()?; - req.set_header(self.id, self.role); - let res = client.create(req).await.context(error::TonicStatusSnafu)?; - - Ok(res.into_inner()) - } - async fn route(&self, mut req: RouteRequest) -> Result { let mut client = self.random_client()?; req.set_header(self.id, self.role); @@ -121,14 +103,6 @@ impl Inner { Ok(res.into_inner()) } - async fn delete(&self, mut req: DeleteRequest) -> Result { - let mut client = self.random_client()?; - req.set_header(self.id, self.role); - let res = client.delete(req).await.context(error::TonicStatusSnafu)?; - - Ok(res.into_inner()) - } - fn random_client(&self) -> Result> { let len = self.peers.len(); let peer = lb::random_get(len, |i| Some(&self.peers[i])).context( diff --git a/src/meta-client/src/mocks.rs b/src/meta-client/src/mocks.rs index 943b9d579d..3054e84be3 100644 --- a/src/meta-client/src/mocks.rs +++ b/src/meta-client/src/mocks.rs @@ -13,7 +13,6 @@ // limitations under the License. use api::v1::meta::Role; -use meta_srv::metasrv::SelectorRef; use meta_srv::mocks as server_mock; use meta_srv::mocks::MockInfo; @@ -30,11 +29,6 @@ pub async fn mock_client_with_etcdstore(addr: &str) -> MetaClient { mock_client_by(mock_info).await } -pub async fn mock_client_with_memorystore_and_selector(selector: SelectorRef) -> MetaClient { - let mock_info = server_mock::mock_with_memstore_and_selector(selector).await; - mock_client_by(mock_info).await -} - pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient { let MockInfo { server_addr, diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 0d0997e2d8..a6229afc6e 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -17,18 +17,19 @@ use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; -use catalog::helper::TableGlobalKey; use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManagerRef; +use common_meta::table_name::TableName; use common_meta::ClusterId; +use common_telemetry::warn; +use snafu::ResultExt; use store_api::storage::{RegionId, RegionNumber}; +use table::metadata::TableId; -use crate::error::Result; +use crate::error::{Result, TableMetadataManagerSnafu}; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; use crate::procedure::region_failover::{RegionFailoverKey, RegionFailoverManager}; -use crate::service::store::kv::KvStoreRef; -use crate::table_routes; /// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus /// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second). @@ -36,25 +37,44 @@ use crate::table_routes; pub(crate) const REGION_LEASE_SECONDS: u64 = 20; pub(crate) struct RegionLeaseHandler { - kv_store: KvStoreRef, region_failover_manager: Option>, - #[allow(unused)] table_metadata_manager: TableMetadataManagerRef, } impl RegionLeaseHandler { pub(crate) fn new( - kv_store: KvStoreRef, region_failover_manager: Option>, table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { - kv_store, region_failover_manager, table_metadata_manager, } } + async fn find_table_ident( + &self, + table_id: TableId, + table_name: &TableName, + ) -> Result> { + let value = self + .table_metadata_manager + .table_info_manager() + .get_old(table_name) + .await + .context(TableMetadataManagerSnafu)?; + Ok(value.map(|x| { + let table_info = &x.table_info; + TableIdent { + catalog: table_info.catalog_name.clone(), + schema: table_info.schema_name.clone(), + table: table_info.name.clone(), + table_id, + engine: table_info.meta.engine.clone(), + } + })) + } + /// Filter out the regions that are currently in failover. /// It's meaningless to extend the lease of a region if it is in failover. fn filter_failover_regions( @@ -96,46 +116,55 @@ impl HeartbeatHandler for RegionLeaseHandler { acc: &mut HeartbeatAccumulator, ) -> Result<()> { let Some(stat) = acc.stat.as_ref() else { return Ok(()) }; + let datanode_id = stat.id; let mut datanode_regions = HashMap::new(); stat.region_stats.iter().for_each(|x| { - let key = TableGlobalKey { - catalog_name: x.catalog.to_string(), - schema_name: x.schema.to_string(), - table_name: x.table.to_string(), - }; + let region_id: RegionId = x.id.into(); + let table_id = region_id.table_id(); + let table_name = TableName::new( + x.catalog.to_string(), + x.schema.to_string(), + x.table.to_string(), + ); datanode_regions - .entry(key) + .entry((table_id, table_name)) .or_insert_with(Vec::new) .push(RegionId::from(x.id).region_number()); }); - // TODO(LFC): Retrieve table global values from some cache here. - let table_global_values = table_routes::batch_get_table_global_value( - &self.kv_store, - datanode_regions.keys().collect::>(), - ) - .await?; - let mut region_leases = Vec::with_capacity(datanode_regions.len()); - for (table_global_key, local_regions) in datanode_regions { - let Some(Some(table_global_value)) = table_global_values.get(&table_global_key) else { continue }; + for ((table_id, table_name), local_regions) in datanode_regions { + let Some(table_ident) = self.find_table_ident(table_id, &table_name).await? else { + warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ + Reason: table not found."); + continue; + }; - let Some(global_regions) = table_global_value.regions_id_map.get(&stat.id) else { continue }; + let Some(table_region_value) = self + .table_metadata_manager + .table_region_manager() + .get_old(&table_name) + .await + .context(TableMetadataManagerSnafu)? else { + warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ + Reason: table region value not found."); + continue; + }; + let Some(global_regions) = table_region_value + .region_distribution + .get(&datanode_id) else { + warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ + Reason: not expected to place the region on it."); + continue; + }; - // Filter out the designated regions from table global metadata for the given table on the given Datanode. + // Filter out the designated regions from table info value for the given table on the given Datanode. let designated_regions = local_regions .into_iter() .filter(|x| global_regions.contains(x)) .collect::>(); - let table_ident = TableIdent { - catalog: table_global_key.catalog_name.to_string(), - schema: table_global_key.schema_name.to_string(), - table: table_global_key.table_name.to_string(), - table_id: table_global_value.table_id(), - engine: table_global_value.engine().to_string(), - }; let designated_regions = self.filter_failover_regions(stat.cluster_id, &table_ident, designated_regions); @@ -160,7 +189,7 @@ mod test { use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; use crate::service::store::kv::KvBackendAdapter; - use crate::test_util; + use crate::{table_routes, test_util}; #[tokio::test] async fn test_handle_region_lease() { @@ -171,17 +200,22 @@ mod test { .kv_store .clone(); + let table_id = 1; let table_name = "my_table"; let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( kv_store.clone(), ))); - let _ = table_routes::tests::prepare_table_global_value(&kv_store, table_name).await; + table_routes::tests::prepare_table_region_and_info_value( + &table_metadata_manager, + table_name, + ) + .await; let table_ident = TableIdent { catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), table: table_name.to_string(), - table_id: 1, + table_id, engine: "mito".to_string(), }; let _ = region_failover_manager @@ -194,11 +228,8 @@ mod test { region_number: 1, }); - let handler = RegionLeaseHandler::new( - kv_store, - Some(region_failover_manager), - table_metadata_manager, - ); + let handler = + RegionLeaseHandler::new(Some(region_failover_manager), table_metadata_manager); let req = HeartbeatRequest { duration_since_epoch: 1234, @@ -210,9 +241,10 @@ mod test { let ctx = &mut metasrv.new_ctx(); let acc = &mut HeartbeatAccumulator::default(); - let new_region_stat = |region_id: u64| -> RegionStat { + let new_region_stat = |region_number: RegionNumber| -> RegionStat { + let region_id = RegionId::new(table_id, region_number); RegionStat { - id: region_id, + id: region_id.as_u64(), catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), table: table_name.to_string(), @@ -228,7 +260,7 @@ mod test { handler.handle(&req, ctx, acc).await.unwrap(); - // region 1 is during failover and region 3 is not in table global value, + // region 1 is during failover and region 3 is not in table region value, // so only region 2's lease is extended. assert_eq!(acc.region_leases.len(), 1); let lease = acc.region_leases.remove(0); diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index 1b5f13d0d6..3553ffcf4e 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -24,7 +24,6 @@ use crate::error; use crate::error::Result; use crate::handler::node_stat::Stat; -pub(crate) const REMOVED_PREFIX: &str = "__removed"; pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; @@ -128,10 +127,6 @@ impl TryFrom for Vec { } } -pub(crate) fn to_removed_key(key: &str) -> String { - format!("{REMOVED_PREFIX}-{key}") -} - pub fn build_table_route_prefix(catalog: impl AsRef, schema: impl AsRef) -> String { format!( "{}-{}-{}-", diff --git a/src/meta-srv/src/lock.rs b/src/meta-srv/src/lock.rs index ded2cd3483..fb607b192b 100644 --- a/src/meta-srv/src/lock.rs +++ b/src/meta-srv/src/lock.rs @@ -52,14 +52,14 @@ pub trait DistLock: Send + Sync { pub type DistLockRef = Arc; -pub(crate) struct DistLockGuard<'a> { +pub struct DistLockGuard<'a> { lock: &'a DistLockRef, name: Vec, key: Option, } impl<'a> DistLockGuard<'a> { - pub(crate) fn new(lock: &'a DistLockRef, name: Vec) -> Self { + pub fn new(lock: &'a DistLockRef, name: Vec) -> Self { Self { lock, name, @@ -67,7 +67,7 @@ impl<'a> DistLockGuard<'a> { } } - pub(crate) async fn lock(&mut self) -> Result<()> { + pub async fn lock(&mut self) -> Result<()> { if self.key.is_some() { return Ok(()); } diff --git a/src/meta-srv/src/lock/keys.rs b/src/meta-srv/src/lock/keys.rs index 8020ab768b..513b308ac0 100644 --- a/src/meta-srv/src/lock/keys.rs +++ b/src/meta-srv/src/lock/keys.rs @@ -15,7 +15,6 @@ //! All keys used for distributed locking in the Metasrv. //! Place them in this unified module for better maintenance. -use common_meta::table_name::TableName; use common_meta::RegionIdent; use crate::lock::Key; @@ -31,7 +30,3 @@ pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key { ) .into_bytes() } - -pub(crate) fn table_creation_lock_key(table_name: &TableName) -> Key { - format!("table_creation_lock_({})", table_name).into_bytes() -} diff --git a/src/meta-srv/src/metadata_service.rs b/src/meta-srv/src/metadata_service.rs index f27020a5ba..137b6942d9 100644 --- a/src/meta-srv/src/metadata_service.rs +++ b/src/meta-srv/src/metadata_service.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use async_trait::async_trait; -use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; +use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use common_meta::rpc::store::CompareAndPutRequest; use common_telemetry::{info, timer}; use metrics::increment_counter; @@ -119,7 +119,7 @@ impl MetadataService for DefaultMetadataService { mod tests { use std::sync::Arc; - use catalog::helper::{CatalogKey, SchemaKey}; + use common_meta::helper::{CatalogKey, SchemaKey}; use super::{DefaultMetadataService, MetadataService}; use crate::service::store::kv::KvStoreRef; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 7b857df16c..a8bcc066a7 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -234,7 +234,6 @@ impl MetaSrvBuilder { }; let region_lease_handler = RegionLeaseHandler::new( - kv_store.clone(), region_failover_handler .as_ref() .map(|x| x.region_failover_manager().clone()), diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/meta-srv/src/procedure/create_table.rs index c1f08d1085..8e641e494d 100644 --- a/src/meta-srv/src/procedure/create_table.rs +++ b/src/meta-srv/src/procedure/create_table.rs @@ -14,10 +14,10 @@ use api::v1::meta::TableRouteValue; use async_trait::async_trait; -use catalog::helper::TableGlobalKey; use client::Database; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use common_meta::helper::TableGlobalKey; use common_meta::key::TableRouteKey; use common_meta::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use common_meta::rpc::ddl::CreateTableTask; diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 175dfc6a76..1fd02e24c9 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -25,9 +25,9 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use async_trait::async_trait; -use catalog::helper::TableGlobalKey; use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManagerRef; +use common_meta::table_name::TableName; use common_meta::{ClusterId, RegionIdent}; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -42,7 +42,7 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::storage::RegionNumber; -use crate::error::{Error, RegisterProcedureLoaderSnafu, Result}; +use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu}; use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::service::mailbox::MailboxRef; @@ -204,17 +204,17 @@ impl RegionFailoverManager { async fn table_exists(&self, failed_region: &RegionIdent) -> Result { let table_ident = &failed_region.table_ident; - let table_global_key = TableGlobalKey { - catalog_name: table_ident.catalog.clone(), - schema_name: table_ident.schema.clone(), - table_name: table_ident.table.clone(), - }; - let table_global_value = self - .selector_ctx - .kv_store - .get(&table_global_key.to_raw_key()) - .await?; - Ok(table_global_value.is_some()) + Ok(self + .table_metadata_manager + .table_region_manager() + .get_old(&TableName::new( + &table_ident.catalog, + &table_ident.schema, + &table_ident.table, + )) + .await + .context(TableMetadataManagerSnafu)? + .is_some()) } } @@ -376,7 +376,6 @@ mod tests { use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader}; - use catalog::helper::TableGlobalKey; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; @@ -422,20 +421,21 @@ mod tests { impl TestingEnv { pub async fn failed_region(&self, region_number: u32) -> RegionIdent { - let table = "my_table"; - let key = TableGlobalKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table.to_string(), - }; - let value = - table_routes::get_table_global_value(&self.context.selector_ctx.kv_store, &key) - .await - .unwrap() - .unwrap(); + let value = self + .context + .table_metadata_manager + .table_region_manager() + .get_old(&TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "my_table", + )) + .await + .unwrap() + .unwrap(); let failed_datanode = value - .regions_id_map + .region_distribution .iter() .find_map(|(&datanode_id, regions)| { if regions.contains(®ion_number) { @@ -454,7 +454,7 @@ mod tests { engine: MITO_ENGINE.to_string(), catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), - table: table.to_string(), + table: "my_table".to_string(), }, } } @@ -489,8 +489,21 @@ mod tests { let table_metadata_manager = Arc::new(TableMetadataManager::new( KvBackendAdapter::wrap(kv_store.clone()), )); - let (_, table_global_value) = - table_routes::tests::prepare_table_global_value(&kv_store, table).await; + table_routes::tests::prepare_table_region_and_info_value( + &table_metadata_manager, + table, + ) + .await; + let table_region_value = table_metadata_manager + .table_region_manager() + .get_old(&TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + table, + )) + .await + .unwrap() + .unwrap(); let _ = table_routes::tests::prepare_table_route_value(&kv_store, table).await; @@ -511,7 +524,7 @@ mod tests { let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence); let selector = self.selector.unwrap_or_else(|| { - let nodes = (1..=table_global_value.regions_id_map.len()) + let nodes = (1..=table_region_value.region_distribution.len()) .map(|id| Peer { id: id as u64, addr: "".to_string(), @@ -650,24 +663,27 @@ mod tests { ); // Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode. - let key = TableGlobalKey { - catalog_name: failed_region.table_ident.catalog.clone(), - schema_name: failed_region.table_ident.schema.clone(), - table_name: failed_region.table_ident.table.clone(), - }; - let value = table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key) + let value = env + .context + .table_metadata_manager + .table_region_manager() + .get_old(&TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "my_table", + )) .await .unwrap() .unwrap(); assert_eq!( value - .regions_id_map + .region_distribution .get(&failed_region.datanode_id) .unwrap(), &vec![2] ); assert!(value - .regions_id_map + .region_distribution .get(&candidate_rx.recv().await.unwrap()) .unwrap() .contains(&1)); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index c6d6e9abb6..aafefec9d2 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{TableName, TableRouteValue}; +use api::v1::meta::{TableName as PbTableName, TableRouteValue}; use async_trait::async_trait; -use catalog::helper::TableGlobalKey; use common_meta::key::TableRouteKey; use common_meta::peer::Peer; use common_meta::rpc::router::TableRoute; +use common_meta::table_name::TableName; use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; @@ -26,8 +26,8 @@ use snafu::{OptionExt, ResultExt}; use super::invalidate_cache::InvalidateCache; use super::{RegionFailoverContext, State}; use crate::error::{ - CorruptedTableRouteSnafu, Result, RetryLaterSnafu, TableNotFoundSnafu, - TableRouteConversionSnafu, + CorruptedTableRouteSnafu, Result, RetryLaterSnafu, TableMetadataManagerSnafu, + TableNotFoundSnafu, TableRouteConversionSnafu, }; use crate::lock::keys::table_metadata_lock_key; use crate::lock::Opts; @@ -43,7 +43,7 @@ impl UpdateRegionMetadata { Self { candidate } } - /// Updates the metadata of the table. Specifically, the [TableGlobalValue] and [TableRouteValue]. + /// Updates the metadata of the table. async fn update_metadata( &self, ctx: &RegionFailoverContext, @@ -52,54 +52,60 @@ impl UpdateRegionMetadata { let key = table_metadata_lock_key(failed_region); let key = ctx.dist_lock.lock(key, Opts::default()).await?; - self.update_table_global_value(ctx, failed_region).await?; + self.update_table_region_value(ctx, failed_region).await?; + self.update_table_route(ctx, failed_region).await?; ctx.dist_lock.unlock(key).await?; Ok(()) } - async fn update_table_global_value( + async fn update_table_region_value( &self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result<()> { - let key = TableGlobalKey { - catalog_name: failed_region.table_ident.catalog.clone(), - schema_name: failed_region.table_ident.schema.clone(), - table_name: failed_region.table_ident.table.clone(), - }; - let mut value = table_routes::get_table_global_value(&ctx.selector_ctx.kv_store, &key) - .await? + let table_ident = &failed_region.table_ident; + let table_id = table_ident.table_id; + let table_name = TableName::new( + &table_ident.catalog, + &table_ident.schema, + &table_ident.table, + ); + let value = ctx + .table_metadata_manager + .table_region_manager() + .get_old(&table_name) + .await + .context(TableMetadataManagerSnafu)? .with_context(|| TableNotFoundSnafu { - name: common_catalog::format_full_table_name( - &key.catalog_name, - &key.schema_name, - &key.table_name, - ), + name: table_ident.to_string(), })?; + let mut region_distribution = value.region_distribution.clone(); - if let Some(mut region_numbers) = value.regions_id_map.remove(&failed_region.datanode_id) { + if let Some(mut region_numbers) = region_distribution.remove(&failed_region.datanode_id) { region_numbers.retain(|x| *x != failed_region.region_number); if !region_numbers.is_empty() { - let _ = value - .regions_id_map - .insert(failed_region.datanode_id, region_numbers); + region_distribution.insert(failed_region.datanode_id, region_numbers); } } - let region_numbers = value - .regions_id_map + let region_numbers = region_distribution .entry(self.candidate.id) .or_insert_with(Vec::new); region_numbers.push(failed_region.region_number); - table_routes::put_table_global_value(&ctx.selector_ctx.kv_store, &key, &value).await?; + ctx.table_metadata_manager + .table_region_manager() + .put_old(&table_name, region_distribution.clone()) + .await + .context(TableMetadataManagerSnafu)?; + info!( - "Region mappings in table global value (key = '{key}') are updated to {:?}. \ + "Region distribution of table (id = {table_id}) is updated to {:?}. \ Failed region {} was on Datanode {}.", - value.regions_id_map, failed_region.region_number, failed_region.datanode_id, + region_distribution, failed_region.region_number, failed_region.datanode_id, ); Ok(()) } @@ -109,7 +115,7 @@ impl UpdateRegionMetadata { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result<()> { - let table_name = TableName { + let table_name = PbTableName { catalog_name: failed_region.table_ident.catalog.clone(), schema_name: failed_region.table_ident.schema.clone(), table_name: failed_region.table_ident.table.clone(), @@ -210,8 +216,10 @@ impl State for UpdateRegionMetadata { #[cfg(test)] mod tests { use api::v1::meta::TableRouteValue; - use catalog::helper::TableGlobalValue; + use common_meta::key::table_region::TableRegionValue; use common_meta::key::TableRouteKey; + use common_meta::DatanodeId; + use store_api::storage::RegionNumber; use super::super::tests::{TestingEnv, TestingEnvBuilder}; use super::{State, *}; @@ -232,40 +240,34 @@ mod tests { } #[tokio::test] - async fn test_update_table_global_value() { + async fn test_update_table_info_value() { common_telemetry::init_default_ut_logging(); - async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> TableGlobalValue { + async fn test( + env: TestingEnv, + failed_region: RegionNumber, + candidate: DatanodeId, + ) -> TableRegionValue { let failed_region = env.failed_region(failed_region).await; - let key = TableGlobalKey { - catalog_name: failed_region.table_ident.catalog.clone(), - schema_name: failed_region.table_ident.schema.clone(), - table_name: failed_region.table_ident.table.clone(), - }; - - let original = - table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key) - .await - .unwrap() - .unwrap(); - let state = UpdateRegionMetadata::new(Peer::new(candidate, "")); state - .update_table_global_value(&env.context, &failed_region) + .update_table_region_value(&env.context, &failed_region) .await .unwrap(); - let updated = - table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key) - .await - .unwrap() - .unwrap(); - - // verifies that other data stay untouched - assert_eq!(original.node_id, updated.node_id); - assert_eq!(original.table_info, updated.table_info); - updated + let table_ident = failed_region.table_ident; + env.context + .table_metadata_manager + .table_region_manager() + .get_old(&TableName::new( + &table_ident.catalog, + &table_ident.schema, + &table_ident.table, + )) + .await + .unwrap() + .unwrap() } // Region distribution: @@ -278,7 +280,7 @@ mod tests { let env = TestingEnvBuilder::new().build().await; let updated = test(env, 1, 2).await; - let new_region_id_map = updated.regions_id_map; + let new_region_id_map = updated.region_distribution; assert_eq!(new_region_id_map.len(), 3); assert_eq!(new_region_id_map.get(&1), Some(&vec![2])); assert_eq!(new_region_id_map.get(&2), Some(&vec![3, 1])); @@ -288,7 +290,7 @@ mod tests { let env = TestingEnvBuilder::new().build().await; let updated = test(env, 3, 3).await; - let new_region_id_map = updated.regions_id_map; + let new_region_id_map = updated.region_distribution; assert_eq!(new_region_id_map.len(), 2); assert_eq!(new_region_id_map.get(&1), Some(&vec![1, 2])); assert_eq!(new_region_id_map.get(&3), Some(&vec![4, 3])); @@ -297,7 +299,7 @@ mod tests { let env = TestingEnvBuilder::new().build().await; let updated = test(env, 1, 4).await; - let new_region_id_map = updated.regions_id_map; + let new_region_id_map = updated.region_distribution; assert_eq!(new_region_id_map.len(), 4); assert_eq!(new_region_id_map.get(&1), Some(&vec![2])); assert_eq!(new_region_id_map.get(&2), Some(&vec![3])); @@ -308,7 +310,7 @@ mod tests { let env = TestingEnvBuilder::new().build().await; let updated = test(env, 3, 4).await; - let new_region_id_map = updated.regions_id_map; + let new_region_id_map = updated.region_distribution; assert_eq!(new_region_id_map.len(), 3); assert_eq!(new_region_id_map.get(&1), Some(&vec![1, 2])); assert_eq!(new_region_id_map.get(&3), Some(&vec![4])); @@ -435,7 +437,7 @@ mod tests { async fn test_update_metadata_concurrently() { common_telemetry::init_default_ut_logging(); - // Test the correctness of concurrently updating the region distribution in table global + // Test the correctness of concurrently updating the region distribution in table region // value, and region routes in table route value. Region 1 moves to Datanode 2; region 2 // moves to Datanode 3. // @@ -512,19 +514,15 @@ mod tests { assert_eq!(peers.len(), 2); assert_eq!(actual, expected); - let table_global_key = TableGlobalKey { - catalog_name, - schema_name, - table_name, - }; - let table_global_value = table_routes::get_table_global_value( - &env.context.selector_ctx.kv_store, - &table_global_key, - ) - .await - .unwrap() - .unwrap(); - let map = table_global_value.regions_id_map; + let map = env + .context + .table_metadata_manager + .table_region_manager() + .get_old(&TableName::new(&catalog_name, &schema_name, &table_name)) + .await + .unwrap() + .unwrap() + .region_distribution; assert_eq!(map.len(), 2); assert_eq!(map.get(&2), Some(&vec![3, 1])); assert_eq!(map.get(&3), Some(&vec![4, 2])); diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 3823135169..16cfa7f17a 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::meta::TableRouteValue; -use catalog::helper::TableGlobalKey; +use common_meta::helper::TableGlobalKey; use common_meta::key::TableRouteKey; use common_meta::peer::Peer; use common_meta::rpc::router::TableRoute; diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index a2c4b314c6..63f1a21019 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -64,7 +64,6 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let router = router.route( "/tables", meta::TablesHandler { - kv_store: meta_srv.kv_store().clone(), table_metadata_manager: meta_srv.table_metadata_manager().clone(), }, ); @@ -72,7 +71,6 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let router = router.route( "/table", meta::TableHandler { - kv_store: meta_srv.kv_store().clone(), table_metadata_manager: meta_srv.table_metadata_manager().clone(), }, ); diff --git a/src/meta-srv/src/service/admin/meta.rs b/src/meta-srv/src/service/admin/meta.rs index 3a8c5c9e7c..8530b589f6 100644 --- a/src/meta-srv/src/service/admin/meta.rs +++ b/src/meta-srv/src/service/admin/meta.rs @@ -14,9 +14,8 @@ use std::collections::HashMap; -use catalog::helper::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, TABLE_GLOBAL_KEY_PREFIX, -}; +use common_meta::helper::{build_catalog_prefix, build_schema_prefix}; +use common_meta::key::table_name::TableNameKey; use common_meta::key::TableMetadataManagerRef; use common_meta::rpc::store::{RangeRequest, RangeResponse}; use common_meta::util; @@ -24,7 +23,7 @@ use snafu::{OptionExt, ResultExt}; use tonic::codegen::http; use crate::error; -use crate::error::Result; +use crate::error::{Result, TableMetadataManagerSnafu}; use crate::service::admin::HttpHandler; use crate::service::store::kv::KvStoreRef; @@ -37,12 +36,10 @@ pub struct SchemasHandler { } pub struct TablesHandler { - pub kv_store: KvStoreRef, pub table_metadata_manager: TableMetadataManagerRef, } pub struct TableHandler { - pub kv_store: KvStoreRef, pub table_metadata_manager: TableMetadataManagerRef, } @@ -87,8 +84,15 @@ impl HttpHandler for TablesHandler { .context(error::MissingRequiredParameterSnafu { param: "schema_name", })?; - get_http_response_by_prefix(build_table_global_prefix(catalog, schema), &self.kv_store) + + let tables = self + .table_metadata_manager + .table_name_manager() + .tables_old(catalog, schema) .await + .context(TableMetadataManagerSnafu)?; + + to_http_response(tables) } } @@ -99,22 +103,56 @@ impl HttpHandler for TableHandler { _: &str, params: &HashMap, ) -> Result> { - let table_name = params - .get("full_table_name") - .map(|full_table_name| full_table_name.replace('.', "-")) - .context(error::MissingRequiredParameterSnafu { - param: "full_table_name", - })?; - let table_key = format!("{TABLE_GLOBAL_KEY_PREFIX}-{table_name}"); + let table_name = + params + .get("full_table_name") + .context(error::MissingRequiredParameterSnafu { + param: "full_table_name", + })?; - let response = self.kv_store.get(table_key.as_bytes()).await?; - let mut value: String = "Not found result".to_string(); - if let Some(key_value) = response { - value = String::from_utf8(key_value.value).context(error::InvalidUtf8ValueSnafu)?; + let key: TableNameKey = table_name + .as_str() + .try_into() + .context(TableMetadataManagerSnafu)?; + + let mut result = HashMap::with_capacity(2); + + let table_id = self + .table_metadata_manager + .table_name_manager() + .get_old(&key) + .await + .context(TableMetadataManagerSnafu)? + .map(|x| x.table_id()); + + if let Some(_table_id) = table_id { + let table_info_value = self + .table_metadata_manager + .table_info_manager() + .get_old(&key.into()) + .await + .context(TableMetadataManagerSnafu)? + .map(|x| format!("{x:?}")) + .unwrap_or_else(|| "Not Found".to_string()); + result.insert("table_info_value", table_info_value); } + + if let Some(_table_id) = table_id { + let table_region_value = self + .table_metadata_manager + .table_region_manager() + .get_old(&key.into()) + .await + .context(TableMetadataManagerSnafu)? + .map(|x| format!("{x:?}")) + .unwrap_or_else(|| "Not Found".to_string()); + result.insert("table_region_value", table_region_value); + } + http::Response::builder() .status(http::StatusCode::OK) - .body(value) + // Safety: HashMap is definitely "serde-json"-able. + .body(serde_json::to_string(&result).unwrap()) .context(error::InvalidHttpBodySnafu) } } @@ -125,6 +163,10 @@ async fn get_http_response_by_prefix( kv_store: &KvStoreRef, ) -> Result> { let keys = get_keys_by_prefix(key_prefix, kv_store).await?; + to_http_response(keys) +} + +fn to_http_response(keys: Vec) -> Result> { let body = serde_json::to_string(&keys).context(error::SerializeToJsonSnafu { input: format!("{keys:?}"), })?; @@ -164,7 +206,7 @@ async fn get_keys_by_prefix(key_prefix: String, kv_store: &KvStoreRef) -> Result mod tests { use std::sync::Arc; - use catalog::helper::{ + use common_meta::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, TableGlobalKey, }; diff --git a/src/meta-srv/src/service/ddl.rs b/src/meta-srv/src/service/ddl.rs index 8292ff0e38..5d0ca6b914 100644 --- a/src/meta-srv/src/service/ddl.rs +++ b/src/meta-srv/src/service/ddl.rs @@ -14,11 +14,10 @@ use api::v1::meta::{ ddl_task_server, Partition, Region, RegionRoute, SubmitDdlTaskRequest, SubmitDdlTaskResponse, - Table, TableRoute, + Table, TableId, TableRoute, }; -use api::v1::TableId; -use catalog::helper::TableGlobalKey; use common_grpc_expr::alter_expr_to_request; +use common_meta::helper::TableGlobalKey; use common_meta::key::TableRouteKey; use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DdlTask, DropTableTask}; use common_meta::rpc::router; diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 3cdf69a118..0f57a762e3 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -15,91 +15,26 @@ use std::collections::HashMap; use api::v1::meta::{ - router_server, CreateRequest, DeleteRequest, Error, Peer, PeerDict, Region, RegionRoute, - ResponseHeader, RouteRequest, RouteResponse, Table, TableRoute, TableRouteValue, + router_server, Peer, PeerDict, ResponseHeader, RouteRequest, RouteResponse, TableRoute, + TableRouteValue, }; -use catalog::helper::{TableGlobalKey, TableGlobalValue}; -use common_meta::key::TableRouteKey; -use common_meta::rpc::store::BatchPutRequest; -use common_meta::rpc::KeyValue; +use common_meta::helper::TableGlobalValue; +use common_meta::key::table_info::TableInfoValue; use common_meta::table_name::TableName; -use common_telemetry::{timer, warn}; -use snafu::{ensure, OptionExt, ResultExt}; +use common_telemetry::timer; +use snafu::{OptionExt, ResultExt}; use table::metadata::RawTableInfo; use tonic::{Request, Response}; use crate::error; -use crate::error::Result; -use crate::lock::{keys, DistLockGuard}; -use crate::metasrv::{Context, MetaSrv, SelectorContext, SelectorRef}; +use crate::error::{Result, TableMetadataManagerSnafu}; +use crate::metasrv::{Context, MetaSrv}; use crate::metrics::METRIC_META_ROUTE_REQUEST; -use crate::sequence::SequenceRef; use crate::service::GrpcResult; -use crate::table_routes::{ - fetch_tables, get_table_global_value, remove_table_global_value, remove_table_route_value, - table_route_key, -}; +use crate::table_routes::fetch_tables; #[async_trait::async_trait] impl router_server::Router for MetaSrv { - async fn create(&self, req: Request) -> GrpcResult { - let req = req.into_inner(); - - let CreateRequest { - header, table_name, .. - } = &req; - let table_name: TableName = table_name - .as_ref() - .context(error::EmptyTableNameSnafu)? - .clone() - .into(); - - // TODO(LFC): Use procedure to create table, and get rid of locks here. - let mut guard = DistLockGuard::new(self.lock(), keys::table_creation_lock_key(&table_name)); - guard.lock().await?; - - let table_global_key = TableGlobalKey { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - } - .to_string() - .into_bytes(); - ensure!( - self.kv_store().get(&table_global_key).await?.is_none(), - error::TableAlreadyExistsSnafu { - table_name: table_name.to_string(), - } - ); - - let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); - - let _timer = timer!( - METRIC_META_ROUTE_REQUEST, - &[ - ("op", "create".to_string()), - ("cluster_id", cluster_id.to_string()) - ] - ); - - let ctx = SelectorContext { - datanode_lease_secs: self.options().datanode_lease_secs, - server_addr: self.options().server_addr.clone(), - kv_store: self.kv_store().clone(), - meta_peer_client: self.meta_peer_client().clone(), - catalog: Some(table_name.catalog_name.clone()), - schema: Some(table_name.schema_name.clone()), - table: Some(table_name.table_name.clone()), - }; - - let selector = self.selector(); - let table_id_sequence = self.table_id_sequence(); - - let res = handle_create(req, ctx, selector, table_id_sequence).await?; - - Ok(Response::new(res)) - } - async fn route(&self, req: Request) -> GrpcResult { let req = req.into_inner(); let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id); @@ -117,143 +52,6 @@ impl router_server::Router for MetaSrv { Ok(Response::new(res)) } - - async fn delete(&self, req: Request) -> GrpcResult { - let req = req.into_inner(); - let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id); - - let _timer = timer!( - METRIC_META_ROUTE_REQUEST, - &[ - ("op", "delete".to_string()), - ("cluster_id", cluster_id.to_string()) - ] - ); - - let ctx = self.new_ctx(); - let res = handle_delete(req, ctx).await?; - - Ok(Response::new(res)) - } -} - -async fn handle_create( - req: CreateRequest, - ctx: SelectorContext, - selector: &SelectorRef, - table_id_sequence: &SequenceRef, -) -> Result { - let CreateRequest { - header, - table_name, - partitions, - table_info, - } = req; - let table_name = table_name.context(error::EmptyTableNameSnafu)?; - - let mut table_info: RawTableInfo = - serde_json::from_slice(&table_info).with_context(|_| error::DeserializeFromJsonSnafu { - input: format!( - "Corrupted table info: {}", - String::from_utf8_lossy(&table_info) - ), - })?; - - let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); - let mut peers = selector.select(cluster_id, &ctx).await?; - - if peers.len() < partitions.len() { - warn!("Create table failed due to no enough available datanodes, table: {table_name:?}, partition number: {}, datanode number: {}", partitions.len(), peers.len()); - return Ok(RouteResponse { - header: Some(ResponseHeader::failed( - cluster_id, - Error::not_enough_available_datanodes(partitions.len(), peers.len()), - )), - ..Default::default() - }); - } - - // We don't need to keep all peers, just truncate it to the number of partitions. - // If the peers are not enough, some peers will be used for multiple partitions. - peers.truncate(partitions.len()); - - let id = table_id_sequence.next().await?; - table_info.ident.table_id = id as u32; - - let table_route_key = TableRouteKey::with_table_name(id as _, &table_name) - .to_string() - .into_bytes(); - - let table = Table { - id, - table_name: Some(table_name.clone()), - ..Default::default() - }; - let mut region_routes = Vec::with_capacity(partitions.len()); - for (i, partition) in partitions.into_iter().enumerate() { - let region = Region { - id: i as u64, - partition: Some(partition), - ..Default::default() - }; - let region_route = RegionRoute { - region: Some(region), - leader_peer_index: (i % peers.len()) as u64, - follower_peer_indexes: vec![], // follower_peers is not supported at the moment - }; - region_routes.push(region_route); - } - let table_route = TableRoute { - table: Some(table), - region_routes, - }; - - // save table route data into meta store - let table_route_value = TableRouteValue { - peers: peers.clone(), - table_route: Some(table_route.clone()), - }; - - let table_global_key = TableGlobalKey { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - } - .to_string() - .into_bytes(); - - let table_global_value = create_table_global_value(&table_route_value, table_info)? - .as_bytes() - .context(error::InvalidCatalogValueSnafu)?; - - let req = BatchPutRequest { - kvs: vec![ - KeyValue { - key: table_global_key, - value: table_global_value, - }, - KeyValue { - key: table_route_key, - value: table_route_value.into(), - }, - ], - prev_kv: true, - }; - - let resp = ctx.kv_store.batch_put(req).await?; - if !resp.prev_kvs.is_empty() { - warn!( - "Caution: table meta values are replaced! \ - Maybe last creation procedure for table {table_name:?} was aborted?" - ); - } - - let header = Some(ResponseHeader::success(cluster_id)); - Ok(RouteResponse { - header, - peers, - table_routes: vec![table_route], - }) } pub(crate) fn create_table_global_value( @@ -298,14 +96,16 @@ async fn handle_route(req: RouteRequest, ctx: Context) -> Result let RouteRequest { header, table_names, + table_ids: _, } = req; let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); - let table_global_keys = table_names.into_iter().map(|t| TableGlobalKey { - catalog_name: t.catalog_name, - schema_name: t.schema_name, - table_name: t.table_name, - }); - let tables = fetch_tables(&ctx.kv_store, table_global_keys).await?; + + let table_names = table_names + .into_iter() + .map(Into::into) + .collect::>(); + let tables = fetch_tables(&ctx, table_names).await?; + let (peers, table_routes) = fill_table_routes(tables)?; let header = Some(ResponseHeader::success(cluster_id)); @@ -316,39 +116,8 @@ async fn handle_route(req: RouteRequest, ctx: Context) -> Result }) } -async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result { - let DeleteRequest { header, table_name } = req; - let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); - let tgk = table_name - .map(|t| TableGlobalKey { - catalog_name: t.catalog_name, - schema_name: t.schema_name, - table_name: t.table_name, - }) - .context(error::EmptyTableNameSnafu)?; - - let tgv = get_table_global_value(&ctx.kv_store, &tgk) - .await? - .with_context(|| error::TableNotFoundSnafu { - name: format!("{tgk}"), - })?; - - let _ = remove_table_global_value(&ctx.kv_store, &tgk).await?; - - let trk = table_route_key(tgv.table_id(), &tgk); - let (_, trv) = remove_table_route_value(&ctx.kv_store, &trk).await?; - let (peers, table_routes) = fill_table_routes(vec![(tgv, trv)])?; - - let header = Some(ResponseHeader::success(cluster_id)); - Ok(RouteResponse { - header, - peers, - table_routes, - }) -} - pub(crate) fn fill_table_routes( - tables: Vec<(TableGlobalValue, TableRouteValue)>, + tables: Vec<(TableInfoValue, TableRouteValue)>, ) -> Result<(Vec, Vec)> { let mut peer_dict = PeerDict::default(); let mut table_routes = vec![]; @@ -370,7 +139,7 @@ pub(crate) fn fill_table_routes( } if let Some(table) = &mut table_route.table { - table.table_schema = tgv.as_bytes().context(error::InvalidCatalogValueSnafu)?; + table.table_schema = tgv.try_as_raw_value().context(TableMetadataManagerSnafu)?; } } if let Some(table_route) = table_route { diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 8d40f276d7..8730d67af4 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -12,20 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use api::v1::meta::TableRouteValue; -use catalog::helper::{TableGlobalKey, TableGlobalValue}; +use common_meta::helper::{TableGlobalKey, TableGlobalValue}; +use common_meta::key::table_info::TableInfoValue; use common_meta::key::TableRouteKey; -use common_meta::rpc::store::{BatchGetRequest, MoveValueRequest, PutRequest}; -use common_telemetry::warn; +use common_meta::rpc::store::PutRequest; +use common_meta::table_name::TableName; use snafu::{OptionExt, ResultExt}; use table::engine::TableReference; use crate::error::{ - DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, TableNotFoundSnafu, + DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, TableMetadataManagerSnafu, TableRouteNotFoundSnafu, }; +use crate::metasrv::Context; use crate::service::store::kv::KvStoreRef; pub async fn get_table_global_value( @@ -37,58 +37,6 @@ pub async fn get_table_global_value( .transpose() } -pub(crate) async fn batch_get_table_global_value( - kv_store: &KvStoreRef, - keys: Vec<&TableGlobalKey>, -) -> Result>> { - let req = BatchGetRequest { - keys: keys.iter().map(|x| x.to_raw_key()).collect::>(), - }; - let kvs = kv_store.batch_get(req).await?.kvs; - - let mut result = HashMap::with_capacity(kvs.len()); - for kv in kvs { - let key = TableGlobalKey::try_from_raw_key(kv.key()).context(InvalidCatalogValueSnafu)?; - let value = TableGlobalValue::from_bytes(kv.value()).context(InvalidCatalogValueSnafu)?; - let _ = result.insert(key, Some(value)); - } - - for key in keys { - if !result.contains_key(key) { - let _ = result.insert(key.clone(), None); - } - } - Ok(result) -} - -pub(crate) async fn put_table_global_value( - kv_store: &KvStoreRef, - key: &TableGlobalKey, - value: &TableGlobalValue, -) -> Result<()> { - let req = PutRequest { - key: key.to_raw_key(), - value: value.as_bytes().context(InvalidCatalogValueSnafu)?, - prev_kv: false, - }; - let _ = kv_store.put(req).await; - Ok(()) -} - -pub(crate) async fn remove_table_global_value( - kv_store: &KvStoreRef, - key: &TableGlobalKey, -) -> Result<(Vec, TableGlobalValue)> { - let key = key.to_string(); - let removed_key = crate::keys::to_removed_key(&key); - let kv = move_value(kv_store, key.as_bytes(), removed_key) - .await? - .context(TableNotFoundSnafu { name: key })?; - let value: TableGlobalValue = - TableGlobalValue::from_bytes(&kv.1).context(InvalidCatalogValueSnafu)?; - Ok((kv.0, value)) -} - pub(crate) async fn get_table_route_value( kv_store: &KvStoreRef, key: &TableRouteKey<'_>, @@ -116,35 +64,6 @@ pub(crate) async fn put_table_route_value( Ok(()) } -pub(crate) async fn remove_table_route_value( - kv_store: &KvStoreRef, - key: &TableRouteKey<'_>, -) -> Result<(Vec, TableRouteValue)> { - let from_key = key.to_string().into_bytes(); - let to_key = key.removed_key().into_bytes(); - let v = move_value(kv_store, from_key, to_key) - .await? - .context(TableRouteNotFoundSnafu { - key: key.to_string(), - })?; - let trv: TableRouteValue = v.1.as_slice().try_into().context(DecodeTableRouteSnafu)?; - - Ok((v.0, trv)) -} - -async fn move_value( - kv_store: &KvStoreRef, - from_key: impl Into>, - to_key: impl Into>, -) -> Result, Vec)>> { - let from_key = from_key.into(); - let to_key = to_key.into(); - let move_req = MoveValueRequest { from_key, to_key }; - let res = kv_store.move_value(move_req).await?; - - Ok(res.0.map(Into::into)) -} - pub(crate) fn table_route_key(table_id: u32, t: &TableGlobalKey) -> TableRouteKey<'_> { TableRouteKey { table_id, @@ -177,21 +96,30 @@ pub(crate) async fn fetch_table( } pub(crate) async fn fetch_tables( - kv_store: &KvStoreRef, - keys: impl Iterator, -) -> Result> { + ctx: &Context, + table_names: Vec, +) -> Result> { + let kv_store = &ctx.kv_store; + let mut tables = vec![]; // Maybe we can optimize the for loop in the future, but in general, // there won't be many keys, in fact, there is usually just one. - for tgk in keys { - let tgv = get_table_global_value(kv_store, &tgk).await?; - let Some(tgv) = tgv else { - warn!("Table global value is absent: {}", tgk); + for table_name in table_names { + let Some(tgv) = ctx.table_metadata_manager + .table_info_manager() + .get_old(&table_name) + .await + .context(TableMetadataManagerSnafu)? else { continue; }; + let table_info = &tgv.table_info; - let trk = table_route_key(tgv.table_id(), &tgk); - + let trk = TableRouteKey { + table_id: table_info.ident.table_id, + catalog_name: &table_info.catalog_name, + schema_name: &table_info.schema_name, + table_name: &table_info.name, + }; let trv = get_table_route_value(kv_store, &trk).await?; tables.push((tgv, trv)); @@ -205,9 +133,11 @@ pub(crate) mod tests { use std::collections::HashMap; use std::sync::Arc; - use api::v1::meta::{Peer, Region, RegionRoute, Table, TableName, TableRoute}; + use api::v1::meta::{Peer, Region, RegionRoute, Table, TableRoute}; use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; + use common_meta::key::table_region::RegionDistribution; + use common_meta::key::TableMetadataManagerRef; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; @@ -217,53 +147,52 @@ pub(crate) mod tests { use crate::error; use crate::service::store::memory::MemStore; - pub(crate) async fn prepare_table_global_value( - kv_store: &KvStoreRef, + pub(crate) async fn prepare_table_region_and_info_value( + table_metadata_manager: &TableMetadataManagerRef, table: &str, - ) -> (TableGlobalKey, TableGlobalValue) { + ) { + let table_info = RawTableInfo { + ident: TableIdent::new(1), + name: table.to_string(), + desc: None, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + meta: RawTableMeta { + schema: RawSchema::new(vec![ColumnSchema::new( + "a", + ConcreteDataType::string_datatype(), + true, + )]), + primary_key_indices: vec![], + value_indices: vec![], + engine: MITO_ENGINE.to_string(), + next_column_id: 1, + region_numbers: vec![1, 2, 3, 4], + engine_options: HashMap::new(), + options: TableOptions::default(), + created_on: DateTime::default(), + }, + table_type: TableType::Base, + }; + table_metadata_manager + .table_info_manager() + .put_old(table_info) + .await + .unwrap(); + // Region distribution: // Datanode => Regions // 1 => 1, 2 // 2 => 3 // 3 => 4 - let regions_id_map = HashMap::from([(1, vec![1, 2]), (2, vec![3]), (3, vec![4])]); - - let key = TableGlobalKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table.to_string(), - }; - let value = TableGlobalValue { - node_id: 1, - regions_id_map, - table_info: RawTableInfo { - ident: TableIdent::new(1), - name: table.to_string(), - desc: None, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - meta: RawTableMeta { - schema: RawSchema::new(vec![ColumnSchema::new( - "a", - ConcreteDataType::string_datatype(), - true, - )]), - primary_key_indices: vec![], - value_indices: vec![], - engine: MITO_ENGINE.to_string(), - next_column_id: 1, - region_numbers: vec![1, 2, 3, 4], - engine_options: HashMap::new(), - options: TableOptions::default(), - created_on: DateTime::default(), - }, - table_type: TableType::Base, - }, - }; - put_table_global_value(kv_store, &key, &value) + table_metadata_manager + .table_region_manager() + .put_old( + &TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table), + RegionDistribution::from([(1, vec![1, 2]), (2, vec![3]), (3, vec![4])]), + ) .await .unwrap(); - (key, value) } pub(crate) async fn prepare_table_route_value<'a>( @@ -299,11 +228,9 @@ pub(crate) mod tests { let table_route = TableRoute { table: Some(Table { id: 1, - table_name: Some(TableName { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table.to_string(), - }), + table_name: Some( + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table).into(), + ), table_schema: vec![], }), region_routes, @@ -347,34 +274,6 @@ pub(crate) mod tests { } } - #[tokio::test] - async fn test_put_and_get_table_global_value() { - let kv_store = Arc::new(MemStore::new()) as _; - - let not_exist_key = TableGlobalKey { - catalog_name: "not_exist_catalog".to_string(), - schema_name: "not_exist_schema".to_string(), - table_name: "not_exist_table".to_string(), - }; - assert!(get_table_global_value(&kv_store, ¬_exist_key) - .await - .unwrap() - .is_none()); - - let (key, value) = prepare_table_global_value(&kv_store, "my_table").await; - let actual = get_table_global_value(&kv_store, &key) - .await - .unwrap() - .unwrap(); - assert_eq!(actual, value); - - let keys = vec![¬_exist_key, &key]; - let result = batch_get_table_global_value(&kv_store, keys).await.unwrap(); - assert_eq!(result.len(), 2); - assert!(result.get(¬_exist_key).unwrap().is_none()); - assert_eq!(result.get(&key).unwrap().as_ref().unwrap(), &value); - } - #[tokio::test] async fn test_put_and_get_table_route_value() { let kv_store = Arc::new(MemStore::new()) as _; diff --git a/src/partition/src/route.rs b/src/partition/src/route.rs index 6554054151..13343069b1 100644 --- a/src/partition/src/route.rs +++ b/src/partition/src/route.rs @@ -66,6 +66,7 @@ impl TableRoutes { .meta_client .route(RouteRequest { table_names: vec![table_name.clone()], + table_ids: vec![], }) .await .context(error::RequestMetaSnafu)?; diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 4f5860edd0..d001b5e308 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -25,8 +25,8 @@ mod test { CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr, FlushTableExpr, InsertRequest, InsertRequests, QueryRequest, }; - use catalog::helper::{TableGlobalKey, TableGlobalValue}; - use common_catalog::consts::MITO_ENGINE; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; + use common_meta::table_name::TableName; use common_query::Output; use common_recordbatch::RecordBatches; use frontend::instance::Instance; @@ -35,6 +35,7 @@ mod test { use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContext; use store_api::storage::RegionNumber; + use table::Table; use tests::{has_parquet_file, test_region_dir}; use crate::tests; @@ -332,20 +333,22 @@ CREATE TABLE {table_name} ( .unwrap() .unwrap(); let table = table.as_any().downcast_ref::().unwrap(); + let table_id = table.table_info().table_id(); - let tgv = table - .table_global_value(&TableGlobalKey { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - }) + let table_region_value = instance + .table_metadata_manager() + .table_region_manager() + .get_old(&TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + table_name, + )) .await .unwrap() .unwrap(); - let table_id = tgv.table_id(); - let region_to_dn_map = tgv - .regions_id_map + let region_to_dn_map = table_region_value + .region_distribution .iter() .map(|(k, v)| (v[0], *k)) .collect::>(); @@ -603,25 +606,19 @@ CREATE TABLE {table_name} ( table_name: &str, expected_distribution: HashMap, ) { - let table = instance - .frontend() - .catalog_manager() - .table("greptime", "public", table_name) + let table_region_value = instance + .table_metadata_manager() + .table_region_manager() + .get_old(&TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + table_name, + )) .await .unwrap() .unwrap(); - let table = table.as_any().downcast_ref::().unwrap(); - - let TableGlobalValue { regions_id_map, .. } = table - .table_global_value(&TableGlobalKey { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - }) - .await - .unwrap() - .unwrap(); - let region_to_dn_map = regions_id_map + let region_to_dn_map = table_region_value + .region_distribution .iter() .map(|(k, v)| (v[0], *k)) .collect::>(); diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 21b1edd73b..de9ffd8815 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -19,13 +19,13 @@ mod tests { use std::sync::atomic::AtomicU32; use std::sync::Arc; - use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_base::Plugins; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_meta::table_name::TableName; use common_query::Output; use common_recordbatch::RecordBatches; use frontend::error::{self, Error, Result}; use frontend::instance::Instance; - use frontend::table::DistTable; use query::parser::QueryLanguageParser; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; use servers::query_handler::sql::SqlQueryHandler; @@ -177,25 +177,19 @@ mod tests { instance: &MockDistributedInstance, expected_distribution: HashMap, ) { - let table = instance - .frontend() - .catalog_manager() - .table("greptime", "public", "demo") + let table_region_value = instance + .table_metadata_manager() + .table_region_manager() + .get_old(&TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "demo", + )) .await .unwrap() .unwrap(); - let table = table.as_any().downcast_ref::().unwrap(); - - let TableGlobalValue { regions_id_map, .. } = table - .table_global_value(&TableGlobalKey { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: "demo".to_string(), - }) - .await - .unwrap() - .unwrap(); - let region_to_dn_map = regions_id_map + let region_to_dn_map = table_region_value + .region_distribution .iter() .map(|(k, v)| (v[0], *k)) .collect::>(); diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 26f0ae7a82..7d4d508eaa 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -51,7 +51,6 @@ impl MockDistributedInstance { &self.0.datanode_instances } - #[allow(unused)] pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { self.0.meta_srv.table_metadata_manager() } diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 34637c4938..917f81ec2b 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -17,9 +17,9 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Peer; -use catalog::helper::TableGlobalKey; use catalog::remote::CachedMetaKvBackend; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; +use common_meta::helper::TableGlobalKey; use common_meta::ident::TableIdent; use common_meta::rpc::router::TableRoute; use common_meta::rpc::KeyValue;