From 03e30652c833638f145a4a35e79c2f5c1d99c0ca Mon Sep 17 00:00:00 2001 From: LFC Date: Fri, 7 Jul 2023 13:27:43 +0800 Subject: [PATCH] refactor: TableNameKey and DatanodeTableKey (#1868) * refactor: TableNameKey and DatanodeTableKey --- Cargo.lock | 2 + Cargo.toml | 2 + src/common/meta/Cargo.toml | 2 + src/common/meta/src/error.rs | 8 +- src/common/meta/src/key.rs | 54 ++- src/common/meta/src/key/datanode_table.rs | 331 ++++++++++++++++++ src/common/meta/src/key/table_name.rs | 234 +++++++++++++ src/common/meta/src/key/table_route.rs | 5 +- src/frontend/src/table.rs | 4 +- src/meta-srv/src/procedure/create_table.rs | 3 +- .../region_failover/update_metadata.rs | 4 +- src/meta-srv/src/service/router.rs | 4 +- 12 files changed, 639 insertions(+), 14 deletions(-) create mode 100644 src/common/meta/src/key/datanode_table.rs create mode 100644 src/common/meta/src/key/table_name.rs diff --git a/Cargo.lock b/Cargo.lock index 80f793850e..d7ee941b3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1813,7 +1813,9 @@ dependencies = [ "common-time", "datatypes", "futures", + "lazy_static", "prost", + "regex", "serde", "serde_json", "snafu", diff --git a/Cargo.toml b/Cargo.toml index 0ad03c3a40..02e657ed0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,10 +75,12 @@ futures = "0.3" futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "917ead6274b4dccbaf33c59a7360646ba2f285a9" } itertools = "0.10" +lazy_static = "1.4" parquet = "40.0" paste = "1.0" prost = "0.11" rand = "0.8" +regex = "1.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index f96c2eb412..355d17e51a 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -14,7 +14,9 @@ common-runtime = { path = "../runtime" } common-telemetry = { path = "../telemetry" } common-time = { path = "../time" } futures.workspace = true +lazy_static.workspace = true prost.workspace = true +regex.workspace = true serde.workspace = true serde_json.workspace = true snafu.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 90bc6c03e1..9d541705b9 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -56,6 +56,9 @@ pub enum Error { #[snafu(display("Invalid protobuf message, err: {}", err_msg))] InvalidProtoMsg { err_msg: String, location: Location }, + #[snafu(display("Concurrent modify regions placement: {err_msg}"))] + ConcurrentModifyRegionsPlacement { err_msg: String, location: Location }, + #[snafu(display("Invalid table metadata, err: {}", err_msg))] InvalidTableMetadata { err_msg: String, location: Location }, @@ -85,7 +88,10 @@ impl ErrorExt for Error { | InvalidProtoMsg { .. } | InvalidTableMetadata { .. } => StatusCode::Unexpected, - SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, + SendMessage { .. } + | GetKvCache { .. } + | CacheNotGet { .. } + | ConcurrentModifyRegionsPlacement { .. } => StatusCode::Internal, EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => { StatusCode::Unexpected diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index ff029c8b75..c691bd0ea5 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -15,12 +15,22 @@ //! This mod defines all the keys used in the metadata store (Metasrv). //! Specifically, there are these kinds of keys: //! -//! 1. Table info key: `__table_info/{table_id}` +//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}` +//! - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that +//! belong to this Datanode. +//! - This key is primary used in the startup of Datanode, to let Datanode know which tables +//! and regions it should open. +//! +//! 2. Table info key: `__table_info/{table_id}` //! - The value is a [TableInfoValue] struct; it contains the whole table info (like column //! schemas). //! - This key is mainly used in constructing the table in Datanode and Frontend. //! -//! 2. Table region key: `__table_region/{table_id}` +//! 3. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}` +//! - The value is a [TableNameValue] struct; it contains the table id. +//! - Used in the table name to table id lookup. +//! +//! 4. Table region key: `__table_region/{table_id}` //! - The value is a [TableRegionValue] struct; it contains the region distribution of the //! table in the Datanodes. //! @@ -31,14 +41,20 @@ //! table metadata manager: [TableMetadataManager]. It contains all the managers defined above. //! It's recommended to just use this manager only. +pub mod datanode_table; pub mod table_info; +pub mod table_name; pub mod table_region; mod table_route; use std::sync::Arc; +use datanode_table::{DatanodeTableManager, DatanodeTableValue}; +use lazy_static::lazy_static; +use regex::Regex; use snafu::ResultExt; use table_info::{TableInfoManager, TableInfoValue}; +use table_name::{TableNameManager, TableNameValue}; use table_region::{TableRegionManager, TableRegionValue}; use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; @@ -47,9 +63,25 @@ use crate::kv_backend::KvBackendRef; pub const REMOVED_PREFIX: &str = "__removed"; +const TABLE_NAME_PATTERN: &str = "[a-zA-Z_:][a-zA-Z0-9_:]*"; + +const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; +const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; const TABLE_REGION_KEY_PREFIX: &str = "__table_region"; +lazy_static! { + static ref DATANODE_TABLE_KEY_PATTERN: Regex = + Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9])/([0-9])$")).unwrap(); +} + +lazy_static! { + static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!( + "^{TABLE_NAME_KEY_PREFIX}/({TABLE_NAME_PATTERN})/({TABLE_NAME_PATTERN})/({TABLE_NAME_PATTERN})$" + )) + .unwrap(); +} + pub fn to_removed_key(key: &str) -> String { format!("{REMOVED_PREFIX}-{key}") } @@ -61,18 +93,26 @@ pub trait TableMetaKey { pub type TableMetadataManagerRef = Arc; pub struct TableMetadataManager { + table_name_manager: TableNameManager, table_info_manager: TableInfoManager, table_region_manager: TableRegionManager, + datanode_table_manager: DatanodeTableManager, } impl TableMetadataManager { pub fn new(kv_backend: KvBackendRef) -> Self { TableMetadataManager { + table_name_manager: TableNameManager::new(kv_backend.clone()), table_info_manager: TableInfoManager::new(kv_backend.clone()), - table_region_manager: TableRegionManager::new(kv_backend), + table_region_manager: TableRegionManager::new(kv_backend.clone()), + datanode_table_manager: DatanodeTableManager::new(kv_backend), } } + pub fn table_name_manager(&self) -> &TableNameManager { + &self.table_name_manager + } + pub fn table_info_manager(&self) -> &TableInfoManager { &self.table_info_manager } @@ -80,6 +120,10 @@ impl TableMetadataManager { pub fn table_region_manager(&self) -> &TableRegionManager { &self.table_region_manager } + + pub fn datanode_table_manager(&self) -> &DatanodeTableManager { + &self.datanode_table_manager + } } macro_rules! impl_table_meta_value { @@ -104,8 +148,10 @@ macro_rules! impl_table_meta_value { } impl_table_meta_value! { + TableNameValue, TableInfoValue, - TableRegionValue + TableRegionValue, + DatanodeTableValue } #[cfg(test)] diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs new file mode 100644 index 0000000000..04bc2867b4 --- /dev/null +++ b/src/common/meta/src/key/datanode_table.rs @@ -0,0 +1,331 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::{StreamExt, TryStreamExt}; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use store_api::storage::RegionNumber; +use table::metadata::TableId; + +use super::{DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX}; +use crate::error::{ConcurrentModifyRegionsPlacementSnafu, InvalidTableMetadataSnafu, Result}; +use crate::key::{to_removed_key, TableMetaKey}; +use crate::kv_backend::KvBackendRef; +use crate::DatanodeId; + +struct DatanodeTableKey { + datanode_id: DatanodeId, + table_id: TableId, +} + +impl DatanodeTableKey { + fn new(datanode_id: DatanodeId, table_id: TableId) -> Self { + Self { + datanode_id, + table_id, + } + } + + fn prefix(datanode_id: DatanodeId) -> String { + format!("{}/{datanode_id}", DATANODE_TABLE_KEY_PREFIX) + } + + #[allow(unused)] + pub fn strip_table_id(raw_key: &[u8]) -> Result { + let key = String::from_utf8(raw_key.to_vec()).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "DatanodeTableKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(raw_key) + ), + } + .build() + })?; + let captures = + DATANODE_TABLE_KEY_PATTERN + .captures(&key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid DatanodeTableKey '{key}'"), + })?; + // Safety: pass the regex check above + let table_id = captures[2].parse::().unwrap(); + Ok(table_id) + } +} + +impl TableMetaKey for DatanodeTableKey { + fn as_raw_key(&self) -> Vec { + format!("{}/{}", Self::prefix(self.datanode_id), self.table_id).into_bytes() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct DatanodeTableValue { + pub table_id: TableId, + pub regions: Vec, + version: u64, +} + +impl DatanodeTableValue { + fn new(table_id: TableId, regions: Vec) -> Self { + Self { + table_id, + regions, + version: 0, + } + } +} + +pub struct DatanodeTableManager { + kv_backend: KvBackendRef, +} + +impl DatanodeTableManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + async fn get(&self, key: &DatanodeTableKey) -> Result> { + self.kv_backend + .get(&key.as_raw_key()) + .await? + .map(|kv| DatanodeTableValue::try_from_raw_value(kv.1)) + .transpose() + } + + pub async fn create( + &self, + datanode_id: DatanodeId, + table_id: TableId, + regions: Vec, + ) -> Result<()> { + let key = DatanodeTableKey::new(datanode_id, table_id).as_raw_key(); + let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?; + self.kv_backend + .compare_and_set(&key, &[], &val) + .await? + .map_err(|curr| { + let curr = if let Some(curr) = curr { + DatanodeTableValue::try_from_raw_value(curr).map_or_else( + |e| format!("invalid DatanodeTableValue for Datanode {datanode_id}: {e}"), + |v| format!("{v:?}"), + ) + } else { + "empty".to_string() + }; + ConcurrentModifyRegionsPlacementSnafu { + err_msg: format!("Datanode {datanode_id} already existed {curr}"), + } + .build() + }) + } + + pub async fn remove(&self, datanode_id: DatanodeId, table_id: TableId) -> Result<()> { + let key = DatanodeTableKey::new(datanode_id, table_id); + let removed_key = to_removed_key(&String::from_utf8_lossy(&key.as_raw_key())); + self.kv_backend + .move_value(&key.as_raw_key(), removed_key.as_bytes()) + .await + } + + // TODO(LFC): Use transaction to move region once the KvBackend and KvStore are merged into one. + pub async fn move_region( + &self, + from_datanode: DatanodeId, + to_datanode: DatanodeId, + table_id: TableId, + region: RegionNumber, + ) -> Result<()> { + let from_key = DatanodeTableKey::new(from_datanode, table_id); + let from_value = self.get(&from_key).await?; + if let Some(mut from_value) = from_value { + from_value.regions.retain(|x| *x != region); + from_value.version += 1; + self.kv_backend + .set(&from_key.as_raw_key(), &from_value.try_as_raw_value()?) + .await?; + } + + let to_key = DatanodeTableKey::new(to_datanode, table_id); + let to_value = self.get(&to_key).await?; + if let Some(mut to_value) = to_value { + to_value.regions.push(region); + to_value.version += 1; + self.kv_backend + .set(&to_key.as_raw_key(), &to_value.try_as_raw_value()?) + .await?; + } + Ok(()) + } + + pub async fn tables(&self, datanode_id: DatanodeId) -> Result> { + let prefix = DatanodeTableKey::prefix(datanode_id); + let table_ids = self + .kv_backend + .range(prefix.as_bytes()) + .map(|result| result.map(|kv| DatanodeTableValue::try_from_raw_value(kv.1))) + .try_collect::>() + .await? + .into_iter() + .collect::>>()?; + Ok(table_ids) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + + #[tokio::test] + async fn test_move_region() { + let manager = DatanodeTableManager::new(Arc::new(MemoryKvBackend::default())); + + assert!(manager.create(1, 1, vec![1, 2]).await.is_ok()); + assert!(manager.create(2, 1, vec![3, 4]).await.is_ok()); + + assert!(manager.move_region(1, 2, 1, 1).await.is_ok()); + + let value = manager + .get(&DatanodeTableKey::new(1, 1)) + .await + .unwrap() + .unwrap(); + assert_eq!( + value, + DatanodeTableValue { + table_id: 1, + regions: vec![2], + version: 1, + } + ); + + let value = manager + .get(&DatanodeTableKey::new(2, 1)) + .await + .unwrap() + .unwrap(); + assert_eq!( + value, + DatanodeTableValue { + table_id: 1, + regions: vec![3, 4, 1], + version: 1, + } + ); + } + + #[tokio::test] + async fn test_datanode_table_value_manager() { + let backend = Arc::new(MemoryKvBackend::default()); + let manager = DatanodeTableManager::new(backend.clone()); + + assert!(manager.create(1, 1, vec![1, 2, 3]).await.is_ok()); + assert!(manager.create(1, 2, vec![4, 5, 6]).await.is_ok()); + assert!(manager.create(2, 1, vec![4, 5, 6]).await.is_ok()); + assert!(manager.create(2, 2, vec![1, 2, 3]).await.is_ok()); + + let err_msg = manager + .create(1, 1, vec![4, 5, 6]) + .await + .unwrap_err() + .to_string(); + assert!(err_msg.contains("Concurrent modify regions placement: Datanode 1 already existed DatanodeTableValue { table_id: 1, regions: [1, 2, 3], version: 0 }")); + + let to_be_removed_key = DatanodeTableKey::new(2, 1); + let expected_value = DatanodeTableValue { + table_id: 1, + regions: vec![4, 5, 6], + version: 0, + }; + let value = manager.get(&to_be_removed_key).await.unwrap().unwrap(); + assert_eq!(value, expected_value); + + assert!(manager.remove(2, 1).await.is_ok()); + assert!(manager.get(&to_be_removed_key).await.unwrap().is_none()); + let kv = backend + .get(b"__removed-__dn_table/2/1") + .await + .unwrap() + .unwrap(); + assert_eq!(b"__removed-__dn_table/2/1", kv.0.as_slice()); + let value = DatanodeTableValue::try_from_raw_value(kv.1).unwrap(); + assert_eq!(value, expected_value); + + let values = manager.tables(1).await.unwrap(); + assert_eq!(values.len(), 2); + assert_eq!( + values[0], + DatanodeTableValue { + table_id: 1, + regions: vec![1, 2, 3], + version: 0, + } + ); + assert_eq!( + values[1], + DatanodeTableValue { + table_id: 2, + regions: vec![4, 5, 6], + version: 0, + } + ); + } + + #[test] + fn test_serde() { + let key = DatanodeTableKey { + datanode_id: 1, + table_id: 2, + }; + let raw_key = key.as_raw_key(); + assert_eq!(raw_key, b"__dn_table/1/2"); + + let value = DatanodeTableValue { + table_id: 42, + regions: vec![1, 2, 3], + version: 1, + }; + let literal = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#; + + let raw_value = value.try_as_raw_value().unwrap(); + assert_eq!(raw_value, literal); + + let actual = DatanodeTableValue::try_from_raw_value(literal.to_vec()).unwrap(); + assert_eq!(actual, value); + } + + #[test] + fn test_strip_table_id() { + fn test_err(raw_key: &[u8]) { + let result = DatanodeTableKey::strip_table_id(raw_key); + assert!(result.is_err()); + } + + test_err(b""); + test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string + test_err(b"invalid_prefix/1/2"); + test_err(b"__dn_table/"); + test_err(b"__dn_table/invalid_len_1"); + test_err(b"__dn_table/invalid_len_3/1/2"); + test_err(b"__dn_table/invalid_node_id/2"); + test_err(b"__dn_table/1/invalid_table_id"); + + let table_id = DatanodeTableKey::strip_table_id(b"__dn_table/1/2").unwrap(); + assert_eq!(table_id, 2); + } +} diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs new file mode 100644 index 0000000000..d7cfcb2308 --- /dev/null +++ b/src/common/meta/src/key/table_name.rs @@ -0,0 +1,234 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::{StreamExt, TryStreamExt}; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use table::metadata::TableId; + +use super::{TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX}; +use crate::error::{InvalidTableMetadataSnafu, Result}; +use crate::key::{to_removed_key, TableMetaKey}; +use crate::kv_backend::memory::MemoryKvBackend; +use crate::kv_backend::KvBackendRef; + +#[derive(Debug)] +pub struct TableNameKey<'a> { + pub catalog: &'a str, + pub schema: &'a str, + pub table: &'a str, +} + +impl<'a> TableNameKey<'a> { + pub fn new(catalog: &'a str, schema: &'a str, table: &'a str) -> Self { + Self { + catalog, + schema, + table, + } + } + + pub fn prefix_to_table(catalog: &str, schema: &str) -> String { + format!("{}/{}/{}", TABLE_NAME_KEY_PREFIX, catalog, schema) + } + + fn strip_table_name(raw_key: &[u8]) -> Result { + let key = String::from_utf8(raw_key.to_vec()).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "TableNameKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(raw_key) + ), + } + .build() + })?; + let captures = + TABLE_NAME_KEY_PATTERN + .captures(&key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableNameKey '{key}'"), + })?; + // Safety: pass the regex check above + Ok(captures[3].to_string()) + } +} + +impl TableMetaKey for TableNameKey<'_> { + fn as_raw_key(&self) -> Vec { + format!( + "{}/{}", + Self::prefix_to_table(self.catalog, self.schema), + self.table + ) + .into_bytes() + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +pub struct TableNameValue { + table_id: TableId, +} + +impl TableNameValue { + fn new(table_id: TableId) -> Self { + Self { table_id } + } + + pub fn table_id(&self) -> TableId { + self.table_id + } +} + +pub struct TableNameManager { + kv_backend: KvBackendRef, +} + +impl Default for TableNameManager { + fn default() -> Self { + Self::new(Arc::new(MemoryKvBackend::default())) + } +} + +impl TableNameManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + pub async fn create(&self, key: &TableNameKey<'_>, table_id: TableId) -> Result { + let raw_key = key.as_raw_key(); + let value = TableNameValue::new(table_id); + let raw_value = value.try_as_raw_value()?; + let result = self + .kv_backend + .compare_and_set(&raw_key, &[], &raw_value) + .await?; + Ok(matches!(result, Ok(()))) + } + + pub async fn get(&self, key: &TableNameKey<'_>) -> Result> { + let raw_key = key.as_raw_key(); + self.kv_backend + .get(&raw_key) + .await? + .map(|x| TableNameValue::try_from_raw_value(x.1)) + .transpose() + } + + pub async fn tables(&self, catalog: &str, schema: &str) -> Result> { + let key = TableNameKey::prefix_to_table(catalog, schema).into_bytes(); + let table_names = self + .kv_backend + .range(&key) + .map(|x| x.map(|kv| TableNameKey::strip_table_name(&kv.0))) + .try_collect::>() + .await? + .into_iter() + .collect::>>()?; + Ok(table_names) + } + + pub async fn remove(&self, key: &TableNameKey<'_>) -> Result<()> { + let raw_key = key.as_raw_key(); + let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); + self.kv_backend + .move_value(&raw_key, removed_key.as_bytes()) + .await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + + #[tokio::test] + async fn test_table_name_manager() { + let backend = Arc::new(MemoryKvBackend::default()); + let manager = TableNameManager::new(backend.clone()); + + for i in 1..=3 { + let table_name = format!("table_{}", i); + let key = TableNameKey::new("my_catalog", "my_schema", &table_name); + assert!(manager.create(&key, i).await.unwrap()); + } + + let key = TableNameKey::new("my_catalog", "my_schema", "my_table"); + assert!(manager.create(&key, 99).await.unwrap()); + assert!(!manager.create(&key, 99).await.unwrap()); + + let value = manager.get(&key).await.unwrap().unwrap(); + assert_eq!(value.table_id(), 99); + let not_existed = TableNameKey::new("x", "y", "z"); + assert!(manager.get(¬_existed).await.unwrap().is_none()); + + assert!(manager.remove(&key).await.is_ok()); + let kv = backend + .get(b"__removed-__table_name/my_catalog/my_schema/my_table") + .await + .unwrap() + .unwrap(); + let value = TableNameValue::try_from_raw_value(kv.1).unwrap(); + assert_eq!(value.table_id(), 99); + + let tables = manager.tables("my_catalog", "my_schema").await.unwrap(); + assert_eq!(tables.len(), 3); + assert_eq!(tables, vec!["table_1", "table_2", "table_3"]); + } + + #[test] + fn test_strip_table_name() { + fn test_err(raw_key: &[u8]) { + assert!(TableNameKey::strip_table_name(raw_key).is_err()); + } + + test_err(b""); + test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string + test_err(b"invalid_prefix/my_catalog/my_schema/my_table"); + test_err(b"__table_name/"); + test_err(b"__table_name/invalid_len_1"); + test_err(b"__table_name/invalid_len_2/x"); + test_err(b"__table_name/invalid_len_4/x/y/z"); + test_err(b"__table_name/000_invalid_catalog/y/z"); + test_err(b"__table_name/x/000_invalid_schema/z"); + test_err(b"__table_name/x/y/000_invalid_table"); + + let table_name = + TableNameKey::strip_table_name(b"__table_name/my_catalog/my_schema/my_table").unwrap(); + assert_eq!(table_name, "my_table"); + } + + #[test] + fn test_serde() { + let key = TableNameKey::new("my_catalog", "my_schema", "my_table"); + let raw_key = key.as_raw_key(); + assert_eq!( + b"__table_name/my_catalog/my_schema/my_table", + raw_key.as_slice() + ); + + let value = TableNameValue::new(1); + let literal = br#"{"table_id":1}"#; + + assert_eq!(value.try_as_raw_value().unwrap(), literal); + assert_eq!( + TableNameValue::try_from_raw_value(literal.to_vec()).unwrap(), + value + ); + } +} diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 90c414148a..cdc1cd7d10 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -15,20 +15,21 @@ use std::fmt::Display; use api::v1::meta::TableName; +use table::metadata::TableId; use crate::key::to_removed_key; pub const TABLE_ROUTE_PREFIX: &str = "__meta_table_route"; pub struct TableRouteKey<'a> { - pub table_id: u64, + pub table_id: TableId, pub catalog_name: &'a str, pub schema_name: &'a str, pub table_name: &'a str, } impl<'a> TableRouteKey<'a> { - pub fn with_table_name(table_id: u64, t: &'a TableName) -> Self { + pub fn with_table_name(table_id: TableId, t: &'a TableName) -> Self { Self { table_id, catalog_name: &t.catalog_name, diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index cd841a8fb8..0377400f0d 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -296,7 +296,7 @@ impl DistTable { new_table_name: &str, ) -> Result<()> { let old_key = TableRouteKey { - table_id: table_id.into(), + table_id, catalog_name, schema_name, table_name: old_table_name, @@ -304,7 +304,7 @@ impl DistTable { .to_string(); let new_key = TableRouteKey { - table_id: table_id.into(), + table_id, catalog_name, schema_name, table_name: new_table_name, diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/meta-srv/src/procedure/create_table.rs index f341d5ee05..aa7ba12c0e 100644 --- a/src/meta-srv/src/procedure/create_table.rs +++ b/src/meta-srv/src/procedure/create_table.rs @@ -30,6 +30,7 @@ use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use table::engine::TableReference; +use table::metadata::TableId; use crate::ddl::DdlContext; use crate::error::{self, Result}; @@ -107,7 +108,7 @@ impl CreateTableProcedure { async fn register_metadata(&self) -> Result<()> { let table_name = self.table_name(); - let table_id = self.creator.data.table_route.table.id; + let table_id = self.creator.data.table_route.table.id as TableId; let table_route_key = TableRouteKey::with_table_name(table_id, &table_name.clone().into()) .to_string() diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index f15272bf58..c6d6e9abb6 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -329,7 +329,7 @@ mod tests { .unwrap(); let key = TableRouteKey { - table_id: failed_region.table_ident.table_id as u64, + table_id: failed_region.table_ident.table_id, catalog_name: &failed_region.table_ident.catalog, schema_name: &failed_region.table_ident.schema, table_name: &failed_region.table_ident.table, @@ -465,7 +465,7 @@ mod tests { let catalog_name = failed_region_1.table_ident.catalog.clone(); let schema_name = failed_region_1.table_ident.schema.clone(); let table_name = failed_region_1.table_ident.table.clone(); - let table_id = failed_region_1.table_ident.table_id as u64; + let table_id = failed_region_1.table_ident.table_id; let _ = futures::future::join_all(vec![ tokio::spawn(async move { diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 1ca1345729..2fc4e12b67 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -181,7 +181,7 @@ async fn handle_create( let id = table_id_sequence.next().await?; table_info.ident.table_id = id as u32; - let table_route_key = TableRouteKey::with_table_name(id, &table_name) + let table_route_key = TableRouteKey::with_table_name(id as _, &table_name) .to_string() .into_bytes(); @@ -409,7 +409,7 @@ async fn fetch_tables( pub(crate) fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> { TableRouteKey { - table_id, + table_id: table_id as _, catalog_name: &t.catalog_name, schema_name: &t.schema_name, table_name: &t.table_name,