mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
refactor: TableNameKey and DatanodeTableKey (#1868)
* refactor: TableNameKey and DatanodeTableKey
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1813,7 +1813,9 @@ dependencies = [
|
||||
"common-time",
|
||||
"datatypes",
|
||||
"futures",
|
||||
"lazy_static",
|
||||
"prost",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
|
||||
@@ -75,10 +75,12 @@ futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "917ead6274b4dccbaf33c59a7360646ba2f285a9" }
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
parquet = "40.0"
|
||||
paste = "1.0"
|
||||
prost = "0.11"
|
||||
rand = "0.8"
|
||||
regex = "1.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
|
||||
@@ -14,7 +14,9 @@ common-runtime = { path = "../runtime" }
|
||||
common-telemetry = { path = "../telemetry" }
|
||||
common-time = { path = "../time" }
|
||||
futures.workspace = true
|
||||
lazy_static.workspace = true
|
||||
prost.workspace = true
|
||||
regex.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
snafu.workspace = true
|
||||
|
||||
@@ -56,6 +56,9 @@ pub enum Error {
|
||||
#[snafu(display("Invalid protobuf message, err: {}", err_msg))]
|
||||
InvalidProtoMsg { err_msg: String, location: Location },
|
||||
|
||||
#[snafu(display("Concurrent modify regions placement: {err_msg}"))]
|
||||
ConcurrentModifyRegionsPlacement { err_msg: String, location: Location },
|
||||
|
||||
#[snafu(display("Invalid table metadata, err: {}", err_msg))]
|
||||
InvalidTableMetadata { err_msg: String, location: Location },
|
||||
|
||||
@@ -85,7 +88,10 @@ impl ErrorExt for Error {
|
||||
| InvalidProtoMsg { .. }
|
||||
| InvalidTableMetadata { .. } => StatusCode::Unexpected,
|
||||
|
||||
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
|
||||
SendMessage { .. }
|
||||
| GetKvCache { .. }
|
||||
| CacheNotGet { .. }
|
||||
| ConcurrentModifyRegionsPlacement { .. } => StatusCode::Internal,
|
||||
|
||||
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => {
|
||||
StatusCode::Unexpected
|
||||
|
||||
@@ -15,12 +15,22 @@
|
||||
//! This mod defines all the keys used in the metadata store (Metasrv).
|
||||
//! Specifically, there are these kinds of keys:
|
||||
//!
|
||||
//! 1. Table info key: `__table_info/{table_id}`
|
||||
//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
|
||||
//! - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
|
||||
//! belong to this Datanode.
|
||||
//! - This key is primary used in the startup of Datanode, to let Datanode know which tables
|
||||
//! and regions it should open.
|
||||
//!
|
||||
//! 2. Table info key: `__table_info/{table_id}`
|
||||
//! - The value is a [TableInfoValue] struct; it contains the whole table info (like column
|
||||
//! schemas).
|
||||
//! - This key is mainly used in constructing the table in Datanode and Frontend.
|
||||
//!
|
||||
//! 2. Table region key: `__table_region/{table_id}`
|
||||
//! 3. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
|
||||
//! - The value is a [TableNameValue] struct; it contains the table id.
|
||||
//! - Used in the table name to table id lookup.
|
||||
//!
|
||||
//! 4. Table region key: `__table_region/{table_id}`
|
||||
//! - The value is a [TableRegionValue] struct; it contains the region distribution of the
|
||||
//! table in the Datanodes.
|
||||
//!
|
||||
@@ -31,14 +41,20 @@
|
||||
//! table metadata manager: [TableMetadataManager]. It contains all the managers defined above.
|
||||
//! It's recommended to just use this manager only.
|
||||
|
||||
pub mod datanode_table;
|
||||
pub mod table_info;
|
||||
pub mod table_name;
|
||||
pub mod table_region;
|
||||
mod table_route;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datanode_table::{DatanodeTableManager, DatanodeTableValue};
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use snafu::ResultExt;
|
||||
use table_info::{TableInfoManager, TableInfoValue};
|
||||
use table_name::{TableNameManager, TableNameValue};
|
||||
use table_region::{TableRegionManager, TableRegionValue};
|
||||
|
||||
use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu};
|
||||
@@ -47,9 +63,25 @@ use crate::kv_backend::KvBackendRef;
|
||||
|
||||
pub const REMOVED_PREFIX: &str = "__removed";
|
||||
|
||||
const TABLE_NAME_PATTERN: &str = "[a-zA-Z_:][a-zA-Z0-9_:]*";
|
||||
|
||||
const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
|
||||
const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
|
||||
const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
|
||||
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";
|
||||
|
||||
lazy_static! {
|
||||
static ref DATANODE_TABLE_KEY_PATTERN: Regex =
|
||||
Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9])/([0-9])$")).unwrap();
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
|
||||
"^{TABLE_NAME_KEY_PREFIX}/({TABLE_NAME_PATTERN})/({TABLE_NAME_PATTERN})/({TABLE_NAME_PATTERN})$"
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn to_removed_key(key: &str) -> String {
|
||||
format!("{REMOVED_PREFIX}-{key}")
|
||||
}
|
||||
@@ -61,18 +93,26 @@ pub trait TableMetaKey {
|
||||
pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
|
||||
|
||||
pub struct TableMetadataManager {
|
||||
table_name_manager: TableNameManager,
|
||||
table_info_manager: TableInfoManager,
|
||||
table_region_manager: TableRegionManager,
|
||||
datanode_table_manager: DatanodeTableManager,
|
||||
}
|
||||
|
||||
impl TableMetadataManager {
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
TableMetadataManager {
|
||||
table_name_manager: TableNameManager::new(kv_backend.clone()),
|
||||
table_info_manager: TableInfoManager::new(kv_backend.clone()),
|
||||
table_region_manager: TableRegionManager::new(kv_backend),
|
||||
table_region_manager: TableRegionManager::new(kv_backend.clone()),
|
||||
datanode_table_manager: DatanodeTableManager::new(kv_backend),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_name_manager(&self) -> &TableNameManager {
|
||||
&self.table_name_manager
|
||||
}
|
||||
|
||||
pub fn table_info_manager(&self) -> &TableInfoManager {
|
||||
&self.table_info_manager
|
||||
}
|
||||
@@ -80,6 +120,10 @@ impl TableMetadataManager {
|
||||
pub fn table_region_manager(&self) -> &TableRegionManager {
|
||||
&self.table_region_manager
|
||||
}
|
||||
|
||||
pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
|
||||
&self.datanode_table_manager
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_table_meta_value {
|
||||
@@ -104,8 +148,10 @@ macro_rules! impl_table_meta_value {
|
||||
}
|
||||
|
||||
impl_table_meta_value! {
|
||||
TableNameValue,
|
||||
TableInfoValue,
|
||||
TableRegionValue
|
||||
TableRegionValue,
|
||||
DatanodeTableValue
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
331
src/common/meta/src/key/datanode_table.rs
Normal file
331
src/common/meta/src/key/datanode_table.rs
Normal file
@@ -0,0 +1,331 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use super::{DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX};
|
||||
use crate::error::{ConcurrentModifyRegionsPlacementSnafu, InvalidTableMetadataSnafu, Result};
|
||||
use crate::key::{to_removed_key, TableMetaKey};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::DatanodeId;
|
||||
|
||||
struct DatanodeTableKey {
|
||||
datanode_id: DatanodeId,
|
||||
table_id: TableId,
|
||||
}
|
||||
|
||||
impl DatanodeTableKey {
|
||||
fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
|
||||
Self {
|
||||
datanode_id,
|
||||
table_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn prefix(datanode_id: DatanodeId) -> String {
|
||||
format!("{}/{datanode_id}", DATANODE_TABLE_KEY_PREFIX)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub fn strip_table_id(raw_key: &[u8]) -> Result<TableId> {
|
||||
let key = String::from_utf8(raw_key.to_vec()).map_err(|e| {
|
||||
InvalidTableMetadataSnafu {
|
||||
err_msg: format!(
|
||||
"DatanodeTableKey '{}' is not a valid UTF8 string: {e}",
|
||||
String::from_utf8_lossy(raw_key)
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let captures =
|
||||
DATANODE_TABLE_KEY_PATTERN
|
||||
.captures(&key)
|
||||
.context(InvalidTableMetadataSnafu {
|
||||
err_msg: format!("Invalid DatanodeTableKey '{key}'"),
|
||||
})?;
|
||||
// Safety: pass the regex check above
|
||||
let table_id = captures[2].parse::<TableId>().unwrap();
|
||||
Ok(table_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl TableMetaKey for DatanodeTableKey {
|
||||
fn as_raw_key(&self) -> Vec<u8> {
|
||||
format!("{}/{}", Self::prefix(self.datanode_id), self.table_id).into_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct DatanodeTableValue {
|
||||
pub table_id: TableId,
|
||||
pub regions: Vec<RegionNumber>,
|
||||
version: u64,
|
||||
}
|
||||
|
||||
impl DatanodeTableValue {
|
||||
fn new(table_id: TableId, regions: Vec<RegionNumber>) -> Self {
|
||||
Self {
|
||||
table_id,
|
||||
regions,
|
||||
version: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DatanodeTableManager {
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
impl DatanodeTableManager {
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
Self { kv_backend }
|
||||
}
|
||||
|
||||
async fn get(&self, key: &DatanodeTableKey) -> Result<Option<DatanodeTableValue>> {
|
||||
self.kv_backend
|
||||
.get(&key.as_raw_key())
|
||||
.await?
|
||||
.map(|kv| DatanodeTableValue::try_from_raw_value(kv.1))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn create(
|
||||
&self,
|
||||
datanode_id: DatanodeId,
|
||||
table_id: TableId,
|
||||
regions: Vec<RegionNumber>,
|
||||
) -> Result<()> {
|
||||
let key = DatanodeTableKey::new(datanode_id, table_id).as_raw_key();
|
||||
let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?;
|
||||
self.kv_backend
|
||||
.compare_and_set(&key, &[], &val)
|
||||
.await?
|
||||
.map_err(|curr| {
|
||||
let curr = if let Some(curr) = curr {
|
||||
DatanodeTableValue::try_from_raw_value(curr).map_or_else(
|
||||
|e| format!("invalid DatanodeTableValue for Datanode {datanode_id}: {e}"),
|
||||
|v| format!("{v:?}"),
|
||||
)
|
||||
} else {
|
||||
"empty".to_string()
|
||||
};
|
||||
ConcurrentModifyRegionsPlacementSnafu {
|
||||
err_msg: format!("Datanode {datanode_id} already existed {curr}"),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn remove(&self, datanode_id: DatanodeId, table_id: TableId) -> Result<()> {
|
||||
let key = DatanodeTableKey::new(datanode_id, table_id);
|
||||
let removed_key = to_removed_key(&String::from_utf8_lossy(&key.as_raw_key()));
|
||||
self.kv_backend
|
||||
.move_value(&key.as_raw_key(), removed_key.as_bytes())
|
||||
.await
|
||||
}
|
||||
|
||||
// TODO(LFC): Use transaction to move region once the KvBackend and KvStore are merged into one.
|
||||
pub async fn move_region(
|
||||
&self,
|
||||
from_datanode: DatanodeId,
|
||||
to_datanode: DatanodeId,
|
||||
table_id: TableId,
|
||||
region: RegionNumber,
|
||||
) -> Result<()> {
|
||||
let from_key = DatanodeTableKey::new(from_datanode, table_id);
|
||||
let from_value = self.get(&from_key).await?;
|
||||
if let Some(mut from_value) = from_value {
|
||||
from_value.regions.retain(|x| *x != region);
|
||||
from_value.version += 1;
|
||||
self.kv_backend
|
||||
.set(&from_key.as_raw_key(), &from_value.try_as_raw_value()?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let to_key = DatanodeTableKey::new(to_datanode, table_id);
|
||||
let to_value = self.get(&to_key).await?;
|
||||
if let Some(mut to_value) = to_value {
|
||||
to_value.regions.push(region);
|
||||
to_value.version += 1;
|
||||
self.kv_backend
|
||||
.set(&to_key.as_raw_key(), &to_value.try_as_raw_value()?)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tables(&self, datanode_id: DatanodeId) -> Result<Vec<DatanodeTableValue>> {
|
||||
let prefix = DatanodeTableKey::prefix(datanode_id);
|
||||
let table_ids = self
|
||||
.kv_backend
|
||||
.range(prefix.as_bytes())
|
||||
.map(|result| result.map(|kv| DatanodeTableValue::try_from_raw_value(kv.1)))
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
Ok(table_ids)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::KvBackend;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_move_region() {
|
||||
let manager = DatanodeTableManager::new(Arc::new(MemoryKvBackend::default()));
|
||||
|
||||
assert!(manager.create(1, 1, vec![1, 2]).await.is_ok());
|
||||
assert!(manager.create(2, 1, vec![3, 4]).await.is_ok());
|
||||
|
||||
assert!(manager.move_region(1, 2, 1, 1).await.is_ok());
|
||||
|
||||
let value = manager
|
||||
.get(&DatanodeTableKey::new(1, 1))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
value,
|
||||
DatanodeTableValue {
|
||||
table_id: 1,
|
||||
regions: vec![2],
|
||||
version: 1,
|
||||
}
|
||||
);
|
||||
|
||||
let value = manager
|
||||
.get(&DatanodeTableKey::new(2, 1))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
value,
|
||||
DatanodeTableValue {
|
||||
table_id: 1,
|
||||
regions: vec![3, 4, 1],
|
||||
version: 1,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_datanode_table_value_manager() {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
let manager = DatanodeTableManager::new(backend.clone());
|
||||
|
||||
assert!(manager.create(1, 1, vec![1, 2, 3]).await.is_ok());
|
||||
assert!(manager.create(1, 2, vec![4, 5, 6]).await.is_ok());
|
||||
assert!(manager.create(2, 1, vec![4, 5, 6]).await.is_ok());
|
||||
assert!(manager.create(2, 2, vec![1, 2, 3]).await.is_ok());
|
||||
|
||||
let err_msg = manager
|
||||
.create(1, 1, vec![4, 5, 6])
|
||||
.await
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
assert!(err_msg.contains("Concurrent modify regions placement: Datanode 1 already existed DatanodeTableValue { table_id: 1, regions: [1, 2, 3], version: 0 }"));
|
||||
|
||||
let to_be_removed_key = DatanodeTableKey::new(2, 1);
|
||||
let expected_value = DatanodeTableValue {
|
||||
table_id: 1,
|
||||
regions: vec![4, 5, 6],
|
||||
version: 0,
|
||||
};
|
||||
let value = manager.get(&to_be_removed_key).await.unwrap().unwrap();
|
||||
assert_eq!(value, expected_value);
|
||||
|
||||
assert!(manager.remove(2, 1).await.is_ok());
|
||||
assert!(manager.get(&to_be_removed_key).await.unwrap().is_none());
|
||||
let kv = backend
|
||||
.get(b"__removed-__dn_table/2/1")
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(b"__removed-__dn_table/2/1", kv.0.as_slice());
|
||||
let value = DatanodeTableValue::try_from_raw_value(kv.1).unwrap();
|
||||
assert_eq!(value, expected_value);
|
||||
|
||||
let values = manager.tables(1).await.unwrap();
|
||||
assert_eq!(values.len(), 2);
|
||||
assert_eq!(
|
||||
values[0],
|
||||
DatanodeTableValue {
|
||||
table_id: 1,
|
||||
regions: vec![1, 2, 3],
|
||||
version: 0,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
values[1],
|
||||
DatanodeTableValue {
|
||||
table_id: 2,
|
||||
regions: vec![4, 5, 6],
|
||||
version: 0,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serde() {
|
||||
let key = DatanodeTableKey {
|
||||
datanode_id: 1,
|
||||
table_id: 2,
|
||||
};
|
||||
let raw_key = key.as_raw_key();
|
||||
assert_eq!(raw_key, b"__dn_table/1/2");
|
||||
|
||||
let value = DatanodeTableValue {
|
||||
table_id: 42,
|
||||
regions: vec![1, 2, 3],
|
||||
version: 1,
|
||||
};
|
||||
let literal = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#;
|
||||
|
||||
let raw_value = value.try_as_raw_value().unwrap();
|
||||
assert_eq!(raw_value, literal);
|
||||
|
||||
let actual = DatanodeTableValue::try_from_raw_value(literal.to_vec()).unwrap();
|
||||
assert_eq!(actual, value);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_strip_table_id() {
|
||||
fn test_err(raw_key: &[u8]) {
|
||||
let result = DatanodeTableKey::strip_table_id(raw_key);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
test_err(b"");
|
||||
test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string
|
||||
test_err(b"invalid_prefix/1/2");
|
||||
test_err(b"__dn_table/");
|
||||
test_err(b"__dn_table/invalid_len_1");
|
||||
test_err(b"__dn_table/invalid_len_3/1/2");
|
||||
test_err(b"__dn_table/invalid_node_id/2");
|
||||
test_err(b"__dn_table/1/invalid_table_id");
|
||||
|
||||
let table_id = DatanodeTableKey::strip_table_id(b"__dn_table/1/2").unwrap();
|
||||
assert_eq!(table_id, 2);
|
||||
}
|
||||
}
|
||||
234
src/common/meta/src/key/table_name.rs
Normal file
234
src/common/meta/src/key/table_name.rs
Normal file
@@ -0,0 +1,234 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use super::{TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
|
||||
use crate::error::{InvalidTableMetadataSnafu, Result};
|
||||
use crate::key::{to_removed_key, TableMetaKey};
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TableNameKey<'a> {
|
||||
pub catalog: &'a str,
|
||||
pub schema: &'a str,
|
||||
pub table: &'a str,
|
||||
}
|
||||
|
||||
impl<'a> TableNameKey<'a> {
|
||||
pub fn new(catalog: &'a str, schema: &'a str, table: &'a str) -> Self {
|
||||
Self {
|
||||
catalog,
|
||||
schema,
|
||||
table,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prefix_to_table(catalog: &str, schema: &str) -> String {
|
||||
format!("{}/{}/{}", TABLE_NAME_KEY_PREFIX, catalog, schema)
|
||||
}
|
||||
|
||||
fn strip_table_name(raw_key: &[u8]) -> Result<String> {
|
||||
let key = String::from_utf8(raw_key.to_vec()).map_err(|e| {
|
||||
InvalidTableMetadataSnafu {
|
||||
err_msg: format!(
|
||||
"TableNameKey '{}' is not a valid UTF8 string: {e}",
|
||||
String::from_utf8_lossy(raw_key)
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let captures =
|
||||
TABLE_NAME_KEY_PATTERN
|
||||
.captures(&key)
|
||||
.context(InvalidTableMetadataSnafu {
|
||||
err_msg: format!("Invalid TableNameKey '{key}'"),
|
||||
})?;
|
||||
// Safety: pass the regex check above
|
||||
Ok(captures[3].to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl TableMetaKey for TableNameKey<'_> {
|
||||
fn as_raw_key(&self) -> Vec<u8> {
|
||||
format!(
|
||||
"{}/{}",
|
||||
Self::prefix_to_table(self.catalog, self.schema),
|
||||
self.table
|
||||
)
|
||||
.into_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
|
||||
pub struct TableNameValue {
|
||||
table_id: TableId,
|
||||
}
|
||||
|
||||
impl TableNameValue {
|
||||
fn new(table_id: TableId) -> Self {
|
||||
Self { table_id }
|
||||
}
|
||||
|
||||
pub fn table_id(&self) -> TableId {
|
||||
self.table_id
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TableNameManager {
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
impl Default for TableNameManager {
|
||||
fn default() -> Self {
|
||||
Self::new(Arc::new(MemoryKvBackend::default()))
|
||||
}
|
||||
}
|
||||
|
||||
impl TableNameManager {
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
Self { kv_backend }
|
||||
}
|
||||
|
||||
pub async fn create(&self, key: &TableNameKey<'_>, table_id: TableId) -> Result<bool> {
|
||||
let raw_key = key.as_raw_key();
|
||||
let value = TableNameValue::new(table_id);
|
||||
let raw_value = value.try_as_raw_value()?;
|
||||
let result = self
|
||||
.kv_backend
|
||||
.compare_and_set(&raw_key, &[], &raw_value)
|
||||
.await?;
|
||||
Ok(matches!(result, Ok(())))
|
||||
}
|
||||
|
||||
pub async fn get(&self, key: &TableNameKey<'_>) -> Result<Option<TableNameValue>> {
|
||||
let raw_key = key.as_raw_key();
|
||||
self.kv_backend
|
||||
.get(&raw_key)
|
||||
.await?
|
||||
.map(|x| TableNameValue::try_from_raw_value(x.1))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn tables(&self, catalog: &str, schema: &str) -> Result<Vec<String>> {
|
||||
let key = TableNameKey::prefix_to_table(catalog, schema).into_bytes();
|
||||
let table_names = self
|
||||
.kv_backend
|
||||
.range(&key)
|
||||
.map(|x| x.map(|kv| TableNameKey::strip_table_name(&kv.0)))
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
Ok(table_names)
|
||||
}
|
||||
|
||||
pub async fn remove(&self, key: &TableNameKey<'_>) -> Result<()> {
|
||||
let raw_key = key.as_raw_key();
|
||||
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
|
||||
self.kv_backend
|
||||
.move_value(&raw_key, removed_key.as_bytes())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::KvBackend;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_name_manager() {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
let manager = TableNameManager::new(backend.clone());
|
||||
|
||||
for i in 1..=3 {
|
||||
let table_name = format!("table_{}", i);
|
||||
let key = TableNameKey::new("my_catalog", "my_schema", &table_name);
|
||||
assert!(manager.create(&key, i).await.unwrap());
|
||||
}
|
||||
|
||||
let key = TableNameKey::new("my_catalog", "my_schema", "my_table");
|
||||
assert!(manager.create(&key, 99).await.unwrap());
|
||||
assert!(!manager.create(&key, 99).await.unwrap());
|
||||
|
||||
let value = manager.get(&key).await.unwrap().unwrap();
|
||||
assert_eq!(value.table_id(), 99);
|
||||
let not_existed = TableNameKey::new("x", "y", "z");
|
||||
assert!(manager.get(¬_existed).await.unwrap().is_none());
|
||||
|
||||
assert!(manager.remove(&key).await.is_ok());
|
||||
let kv = backend
|
||||
.get(b"__removed-__table_name/my_catalog/my_schema/my_table")
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let value = TableNameValue::try_from_raw_value(kv.1).unwrap();
|
||||
assert_eq!(value.table_id(), 99);
|
||||
|
||||
let tables = manager.tables("my_catalog", "my_schema").await.unwrap();
|
||||
assert_eq!(tables.len(), 3);
|
||||
assert_eq!(tables, vec!["table_1", "table_2", "table_3"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_strip_table_name() {
|
||||
fn test_err(raw_key: &[u8]) {
|
||||
assert!(TableNameKey::strip_table_name(raw_key).is_err());
|
||||
}
|
||||
|
||||
test_err(b"");
|
||||
test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string
|
||||
test_err(b"invalid_prefix/my_catalog/my_schema/my_table");
|
||||
test_err(b"__table_name/");
|
||||
test_err(b"__table_name/invalid_len_1");
|
||||
test_err(b"__table_name/invalid_len_2/x");
|
||||
test_err(b"__table_name/invalid_len_4/x/y/z");
|
||||
test_err(b"__table_name/000_invalid_catalog/y/z");
|
||||
test_err(b"__table_name/x/000_invalid_schema/z");
|
||||
test_err(b"__table_name/x/y/000_invalid_table");
|
||||
|
||||
let table_name =
|
||||
TableNameKey::strip_table_name(b"__table_name/my_catalog/my_schema/my_table").unwrap();
|
||||
assert_eq!(table_name, "my_table");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serde() {
|
||||
let key = TableNameKey::new("my_catalog", "my_schema", "my_table");
|
||||
let raw_key = key.as_raw_key();
|
||||
assert_eq!(
|
||||
b"__table_name/my_catalog/my_schema/my_table",
|
||||
raw_key.as_slice()
|
||||
);
|
||||
|
||||
let value = TableNameValue::new(1);
|
||||
let literal = br#"{"table_id":1}"#;
|
||||
|
||||
assert_eq!(value.try_as_raw_value().unwrap(), literal);
|
||||
assert_eq!(
|
||||
TableNameValue::try_from_raw_value(literal.to_vec()).unwrap(),
|
||||
value
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -15,20 +15,21 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use api::v1::meta::TableName;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::key::to_removed_key;
|
||||
|
||||
pub const TABLE_ROUTE_PREFIX: &str = "__meta_table_route";
|
||||
|
||||
pub struct TableRouteKey<'a> {
|
||||
pub table_id: u64,
|
||||
pub table_id: TableId,
|
||||
pub catalog_name: &'a str,
|
||||
pub schema_name: &'a str,
|
||||
pub table_name: &'a str,
|
||||
}
|
||||
|
||||
impl<'a> TableRouteKey<'a> {
|
||||
pub fn with_table_name(table_id: u64, t: &'a TableName) -> Self {
|
||||
pub fn with_table_name(table_id: TableId, t: &'a TableName) -> Self {
|
||||
Self {
|
||||
table_id,
|
||||
catalog_name: &t.catalog_name,
|
||||
|
||||
@@ -296,7 +296,7 @@ impl DistTable {
|
||||
new_table_name: &str,
|
||||
) -> Result<()> {
|
||||
let old_key = TableRouteKey {
|
||||
table_id: table_id.into(),
|
||||
table_id,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: old_table_name,
|
||||
@@ -304,7 +304,7 @@ impl DistTable {
|
||||
.to_string();
|
||||
|
||||
let new_key = TableRouteKey {
|
||||
table_id: table_id.into(),
|
||||
table_id,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: new_table_name,
|
||||
|
||||
@@ -30,6 +30,7 @@ use futures::future::join_all;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use table::engine::TableReference;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result};
|
||||
@@ -107,7 +108,7 @@ impl CreateTableProcedure {
|
||||
async fn register_metadata(&self) -> Result<()> {
|
||||
let table_name = self.table_name();
|
||||
|
||||
let table_id = self.creator.data.table_route.table.id;
|
||||
let table_id = self.creator.data.table_route.table.id as TableId;
|
||||
|
||||
let table_route_key = TableRouteKey::with_table_name(table_id, &table_name.clone().into())
|
||||
.to_string()
|
||||
|
||||
@@ -329,7 +329,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let key = TableRouteKey {
|
||||
table_id: failed_region.table_ident.table_id as u64,
|
||||
table_id: failed_region.table_ident.table_id,
|
||||
catalog_name: &failed_region.table_ident.catalog,
|
||||
schema_name: &failed_region.table_ident.schema,
|
||||
table_name: &failed_region.table_ident.table,
|
||||
@@ -465,7 +465,7 @@ mod tests {
|
||||
let catalog_name = failed_region_1.table_ident.catalog.clone();
|
||||
let schema_name = failed_region_1.table_ident.schema.clone();
|
||||
let table_name = failed_region_1.table_ident.table.clone();
|
||||
let table_id = failed_region_1.table_ident.table_id as u64;
|
||||
let table_id = failed_region_1.table_ident.table_id;
|
||||
|
||||
let _ = futures::future::join_all(vec![
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -181,7 +181,7 @@ async fn handle_create(
|
||||
let id = table_id_sequence.next().await?;
|
||||
table_info.ident.table_id = id as u32;
|
||||
|
||||
let table_route_key = TableRouteKey::with_table_name(id, &table_name)
|
||||
let table_route_key = TableRouteKey::with_table_name(id as _, &table_name)
|
||||
.to_string()
|
||||
.into_bytes();
|
||||
|
||||
@@ -409,7 +409,7 @@ async fn fetch_tables(
|
||||
|
||||
pub(crate) fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> {
|
||||
TableRouteKey {
|
||||
table_id,
|
||||
table_id: table_id as _,
|
||||
catalog_name: &t.catalog_name,
|
||||
schema_name: &t.schema_name,
|
||||
table_name: &t.table_name,
|
||||
|
||||
Reference in New Issue
Block a user