Files
greptimedb/src/common/meta/src/key/table_name.rs
Weny Xu 41656c8635 refactor: allocate table id in the procedure (#3271)
* refactor: replace TableMetadataManager with TableNameManager

* refactor: allocate table id in the procedure

* refactor: refactor client logical of handling retries

* feat(test_util): add TestCreateTableExprBuilder

* feat(test_util): add MockDatanodeManager

* feat(test_util): add new_ddl_context

* feat(test_util): add build_raw_table_info_from_expr

* feat(test_util): add MockDatanodeManager::new

* feat(procedure): add downcast_output_ref to Status

* test(create_table): add tests for CreateTableProcedure on_prepare

* refactor(ddl): rename handle_operate_region_error to add_peer_context_if_need

* test(create_table): add tests for CreateTableProcedure on_datanode_create_regions

* test(create_table): add tests for CreateTableProcedure on_create_metadata

* refactor(meta): use CreateTableExprBuilder

* feat(create_table): ensure number of partitions is greater than 0

* refactor: rename to add_peer_context_if_needed

* feat: add context for panic

* refactor: simplify the should_retry

* refactor: use Option<&T> instead of &Option<T>

* refactor: move downcast_output_ref under cfg(test)

* chore: fmt toml
2024-02-21 04:38:46 +00:00

328 lines
9.8 KiB
Rust

// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use futures_util::stream::BoxStream;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use super::{TableMetaValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
use crate::error::{Error, InvalidTableMetadataSnafu, Result};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{BatchGetRequest, RangeRequest};
use crate::rpc::KeyValue;
use crate::table_name::TableName;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TableNameKey<'a> {
pub catalog: &'a str,
pub schema: &'a str,
pub table: &'a str,
}
impl<'a> TableNameKey<'a> {
pub fn new(catalog: &'a str, schema: &'a str, table: &'a str) -> Self {
Self {
catalog,
schema,
table,
}
}
pub fn prefix_to_table(catalog: &str, schema: &str) -> String {
format!("{}/{}/{}", TABLE_NAME_KEY_PREFIX, catalog, schema)
}
fn strip_table_name(raw_key: &[u8]) -> Result<String> {
let key = String::from_utf8(raw_key.to_vec()).map_err(|e| {
InvalidTableMetadataSnafu {
err_msg: format!(
"TableNameKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(raw_key)
),
}
.build()
})?;
let captures =
TABLE_NAME_KEY_PATTERN
.captures(&key)
.context(InvalidTableMetadataSnafu {
err_msg: format!("Invalid TableNameKey '{key}'"),
})?;
// Safety: pass the regex check above
Ok(captures[3].to_string())
}
}
impl TableMetaKey for TableNameKey<'_> {
fn as_raw_key(&self) -> Vec<u8> {
format!(
"{}/{}",
Self::prefix_to_table(self.catalog, self.schema),
self.table
)
.into_bytes()
}
}
/// Decodes `KeyValue` to ({table_name}, TableNameValue)
pub fn table_decoder(kv: KeyValue) -> Result<(String, TableNameValue)> {
let table_name = TableNameKey::strip_table_name(kv.key())?;
let table_name_value = TableNameValue::try_from_raw_value(&kv.value)?;
Ok((table_name, table_name_value))
}
impl<'a> From<&'a TableName> for TableNameKey<'a> {
fn from(value: &'a TableName) -> Self {
Self {
catalog: &value.catalog_name,
schema: &value.schema_name,
table: &value.table_name,
}
}
}
impl From<TableNameKey<'_>> for TableName {
fn from(value: TableNameKey<'_>) -> Self {
Self {
catalog_name: value.catalog.to_string(),
schema_name: value.schema.to_string(),
table_name: value.table.to_string(),
}
}
}
impl<'a> TryFrom<&'a str> for TableNameKey<'a> {
type Error = Error;
fn try_from(s: &'a str) -> Result<Self> {
let captures = TABLE_NAME_KEY_PATTERN
.captures(s)
.context(InvalidTableMetadataSnafu {
err_msg: format!("Illegal TableNameKey format: '{s}'"),
})?;
// Safety: pass the regex check above
Ok(Self {
catalog: captures.get(1).unwrap().as_str(),
schema: captures.get(2).unwrap().as_str(),
table: captures.get(3).unwrap().as_str(),
})
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub struct TableNameValue {
table_id: TableId,
}
impl TableNameValue {
pub fn new(table_id: TableId) -> Self {
Self { table_id }
}
pub fn table_id(&self) -> TableId {
self.table_id
}
}
#[derive(Clone)]
pub struct TableNameManager {
kv_backend: KvBackendRef,
}
impl Default for TableNameManager {
fn default() -> Self {
Self::new(Arc::new(MemoryKvBackend::default()))
}
}
impl TableNameManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Builds a create table name transaction. It only executes while the primary keys comparing successes.
pub(crate) fn build_create_txn(
&self,
key: &TableNameKey<'_>,
table_id: TableId,
) -> Result<Txn> {
let raw_key = key.as_raw_key();
let value = TableNameValue::new(table_id);
let raw_value = value.try_as_raw_value()?;
let txn = Txn::new().and_then(vec![TxnOp::Put(raw_key, raw_value)]);
Ok(txn)
}
/// Builds a update table name transaction. It only executes while the primary keys comparing successes.
pub(crate) fn build_update_txn(
&self,
key: &TableNameKey<'_>,
new_key: &TableNameKey<'_>,
table_id: TableId,
) -> Result<Txn> {
let raw_key = key.as_raw_key();
let new_raw_key = new_key.as_raw_key();
let value = TableNameValue::new(table_id);
let raw_value = value.try_as_raw_value()?;
let txn = Txn::new().and_then(vec![
TxnOp::Delete(raw_key),
TxnOp::Put(new_raw_key, raw_value),
]);
Ok(txn)
}
/// Builds a delete table name transaction. It only executes while the primary keys comparing successes.
pub(crate) fn build_delete_txn(
&self,
key: &TableNameKey<'_>,
table_id: TableId,
) -> Result<Txn> {
let raw_key = key.as_raw_key();
let value = TableNameValue::new(table_id);
let raw_value = value.try_as_raw_value()?;
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::new().and_then(vec![
TxnOp::Delete(raw_key),
TxnOp::Put(removed_key.into_bytes(), raw_value),
]);
Ok(txn)
}
pub async fn get(&self, key: TableNameKey<'_>) -> Result<Option<TableNameValue>> {
let raw_key = key.as_raw_key();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| TableNameValue::try_from_raw_value(&x.value))
.transpose()
}
pub async fn batch_get(
&self,
keys: Vec<TableNameKey<'_>>,
) -> Result<Vec<Option<TableNameValue>>> {
let raw_keys = keys
.into_iter()
.map(|key| key.as_raw_key())
.collect::<Vec<_>>();
let req = BatchGetRequest::new().with_keys(raw_keys.clone());
let res = self.kv_backend.batch_get(req).await?;
let kvs = res
.kvs
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect::<HashMap<_, _>>();
let mut array = vec![None; raw_keys.len()];
for (i, key) in raw_keys.into_iter().enumerate() {
let v = kvs.get(&key);
array[i] = v
.map(|v| TableNameValue::try_from_raw_value(v))
.transpose()?;
}
Ok(array)
}
pub async fn exists(&self, key: TableNameKey<'_>) -> Result<bool> {
let raw_key = key.as_raw_key();
self.kv_backend.exists(&raw_key).await
}
pub async fn tables(
&self,
catalog: &str,
schema: &str,
) -> BoxStream<'static, Result<(String, TableNameValue)>> {
let key = TableNameKey::prefix_to_table(catalog, schema).into_bytes();
let req = RangeRequest::new().with_prefix(key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(table_decoder),
);
Box::pin(stream)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_strip_table_name() {
fn test_err(raw_key: &[u8]) {
assert!(TableNameKey::strip_table_name(raw_key).is_err());
}
test_err(b"");
test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string
test_err(b"invalid_prefix/my_catalog/my_schema/my_table");
test_err(b"__table_name/");
test_err(b"__table_name/invalid_len_1");
test_err(b"__table_name/invalid_len_2/x");
test_err(b"__table_name/invalid_len_4/x/y/z");
test_err(b"__table_name/000_invalid_catalog/y/z");
test_err(b"__table_name/x/000_invalid_schema/z");
test_err(b"__table_name/x/y/000_invalid_table");
fn test_ok(table_name: &str) {
assert_eq!(
table_name,
TableNameKey::strip_table_name(
format!("__table_name/my_catalog/my_schema/{}", table_name).as_bytes()
)
.unwrap()
);
}
test_ok("my_table");
test_ok("cpu:metrics");
test_ok(":cpu:metrics");
test_ok("sys.cpu.system");
test_ok("foo-bar");
}
#[test]
fn test_serde() {
let key = TableNameKey::new("my_catalog", "my_schema", "my_table");
let raw_key = key.as_raw_key();
assert_eq!(
b"__table_name/my_catalog/my_schema/my_table",
raw_key.as_slice()
);
let value = TableNameValue::new(1);
let literal = br#"{"table_id":1}"#;
assert_eq!(value.try_as_raw_value().unwrap(), literal);
assert_eq!(TableNameValue::try_from_raw_value(literal).unwrap(), value);
}
}