From d9faa5c8015eee4c913667b5de2add74d05b4165 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 19 Jun 2025 14:48:00 +0800 Subject: [PATCH] feat(cli): add metadata del commands (#6339) * feat: introduce cli for deleting metadata Signed-off-by: WenyXu * refactor(cli): flatten metadata command structure Signed-off-by: WenyXu * chore: add alias Signed-off-by: WenyXu * refactor(meta): implement logical deletion for CLI tool Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/cli/Cargo.toml | 1 + src/cli/src/error.rs | 9 + src/cli/src/metadata.rs | 9 +- src/cli/src/metadata/control.rs | 25 +- src/cli/src/metadata/control/del.rs | 42 ++++ src/cli/src/metadata/control/del/key.rs | 133 ++++++++++ src/cli/src/metadata/control/del/table.rs | 235 ++++++++++++++++++ src/cli/src/metadata/control/get.rs | 37 +-- src/cli/src/metadata/control/test_utils.rs | 51 ++++ src/cli/src/metadata/control/utils.rs | 21 ++ src/cli/src/metadata/repair.rs | 4 +- .../meta/src/ddl/drop_database/executor.rs | 20 +- src/common/meta/src/ddl/utils.rs | 22 +- src/common/meta/src/key.rs | 34 ++- src/common/meta/src/key/tombstone.rs | 71 ++++-- src/common/meta/src/kv_backend/memory.rs | 6 +- 16 files changed, 632 insertions(+), 88 deletions(-) create mode 100644 src/cli/src/metadata/control/del.rs create mode 100644 src/cli/src/metadata/control/del/key.rs create mode 100644 src/cli/src/metadata/control/del/table.rs create mode 100644 src/cli/src/metadata/control/test_utils.rs diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index 204817465c..5d5fa1038b 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -67,6 +67,7 @@ tokio.workspace = true tracing-appender.workspace = true [dev-dependencies] +common-meta = { workspace = true, features = ["testing"] } common-version.workspace = true serde.workspace = true tempfile.workspace = true diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index 0ee6d55b30..bdc2c65806 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -20,6 +20,7 @@ use common_macro::stack_trace_debug; use common_meta::peer::Peer; use object_store::Error as ObjectStoreError; use snafu::{Location, Snafu}; +use store_api::storage::TableId; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -237,6 +238,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Table not found: {table_id}"))] + TableNotFound { + table_id: TableId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("OpenDAL operator failed"))] OpenDal { #[snafu(implicit)] @@ -356,6 +364,7 @@ impl ErrorExt for Error { Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal, Error::MetaClientInit { source, .. } => source.status_code(), + Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::SchemaNotFound { .. } => StatusCode::DatabaseNotFound, } } diff --git a/src/cli/src/metadata.rs b/src/cli/src/metadata.rs index f8ec28414a..80486214da 100644 --- a/src/cli/src/metadata.rs +++ b/src/cli/src/metadata.rs @@ -21,7 +21,7 @@ mod utils; use clap::Subcommand; use common_error::ext::BoxedError; -use crate::metadata::control::ControlCommand; +use crate::metadata::control::{DelCommand, GetCommand}; use crate::metadata::repair::RepairLogicalTablesCommand; use crate::metadata::snapshot::SnapshotCommand; use crate::Tool; @@ -34,7 +34,9 @@ pub enum MetadataCommand { #[clap(subcommand)] Snapshot(SnapshotCommand), #[clap(subcommand)] - Control(ControlCommand), + Get(GetCommand), + #[clap(subcommand)] + Del(DelCommand), RepairLogicalTables(RepairLogicalTablesCommand), } @@ -42,8 +44,9 @@ impl MetadataCommand { pub async fn build(&self) -> Result, BoxedError> { match self { MetadataCommand::Snapshot(cmd) => cmd.build().await, - MetadataCommand::Control(cmd) => cmd.build().await, MetadataCommand::RepairLogicalTables(cmd) => cmd.build().await, + MetadataCommand::Get(cmd) => cmd.build().await, + MetadataCommand::Del(cmd) => cmd.build().await, } } } diff --git a/src/cli/src/metadata/control.rs b/src/cli/src/metadata/control.rs index 3d10093b04..3fba582ce5 100644 --- a/src/cli/src/metadata/control.rs +++ b/src/cli/src/metadata/control.rs @@ -12,26 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod del; mod get; +#[cfg(test)] +mod test_utils; mod utils; -use clap::Subcommand; -use common_error::ext::BoxedError; -use get::GetCommand; - -use crate::Tool; - -/// Subcommand for metadata control, including getting metadata from metadata store -#[derive(Subcommand)] -pub enum ControlCommand { - #[clap(subcommand)] - Get(GetCommand), -} - -impl ControlCommand { - pub async fn build(&self) -> Result, BoxedError> { - match self { - ControlCommand::Get(cmd) => cmd.build().await, - } - } -} +pub(crate) use del::DelCommand; +pub(crate) use get::GetCommand; diff --git a/src/cli/src/metadata/control/del.rs b/src/cli/src/metadata/control/del.rs new file mode 100644 index 0000000000..6afe472607 --- /dev/null +++ b/src/cli/src/metadata/control/del.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod key; +mod table; + +use clap::Subcommand; +use common_error::ext::BoxedError; + +use crate::metadata::control::del::key::DelKeyCommand; +use crate::metadata::control::del::table::DelTableCommand; +use crate::Tool; + +/// The prefix of the tombstone keys. +pub(crate) const CLI_TOMBSTONE_PREFIX: &str = "__cli_tombstone/"; + +/// Subcommand for deleting metadata from the metadata store. +#[derive(Subcommand)] +pub enum DelCommand { + Key(DelKeyCommand), + Table(DelTableCommand), +} + +impl DelCommand { + pub async fn build(&self) -> Result, BoxedError> { + match self { + DelCommand::Key(cmd) => cmd.build().await, + DelCommand::Table(cmd) => cmd.build().await, + } + } +} diff --git a/src/cli/src/metadata/control/del/key.rs b/src/cli/src/metadata/control/del/key.rs new file mode 100644 index 0000000000..78700dbeab --- /dev/null +++ b/src/cli/src/metadata/control/del/key.rs @@ -0,0 +1,133 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use clap::Parser; +use common_error::ext::BoxedError; +use common_meta::key::tombstone::TombstoneManager; +use common_meta::kv_backend::KvBackendRef; +use common_meta::rpc::store::RangeRequest; + +use crate::metadata::common::StoreConfig; +use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX; +use crate::Tool; + +/// Delete key-value pairs logically from the metadata store. +#[derive(Debug, Default, Parser)] +pub struct DelKeyCommand { + /// The key to delete from the metadata store. + #[clap(long)] + key: String, + + /// Delete key-value pairs with the given prefix. + #[clap(long)] + prefix: bool, + + #[clap(flatten)] + store: StoreConfig, +} + +impl DelKeyCommand { + pub async fn build(&self) -> Result, BoxedError> { + let kv_backend = self.store.build().await?; + Ok(Box::new(DelKeyTool { + key: self.key.to_string(), + prefix: self.prefix, + key_deleter: KeyDeleter::new(kv_backend), + })) + } +} + +struct KeyDeleter { + kv_backend: KvBackendRef, + tombstone_manager: TombstoneManager, +} + +impl KeyDeleter { + fn new(kv_backend: KvBackendRef) -> Self { + Self { + kv_backend: kv_backend.clone(), + tombstone_manager: TombstoneManager::new_with_prefix(kv_backend, CLI_TOMBSTONE_PREFIX), + } + } + + async fn delete(&self, key: &str, prefix: bool) -> Result { + let mut req = RangeRequest::default().with_keys_only(); + if prefix { + req = req.with_prefix(key.as_bytes()); + } else { + req = req.with_key(key.as_bytes()); + } + let resp = self.kv_backend.range(req).await.map_err(BoxedError::new)?; + let keys = resp.kvs.iter().map(|kv| kv.key.clone()).collect::>(); + self.tombstone_manager + .create(keys) + .await + .map_err(BoxedError::new) + } +} + +struct DelKeyTool { + key: String, + prefix: bool, + key_deleter: KeyDeleter, +} + +#[async_trait] +impl Tool for DelKeyTool { + async fn do_work(&self) -> Result<(), BoxedError> { + let deleted = self.key_deleter.delete(&self.key, self.prefix).await?; + // Print the number of deleted keys. + println!("{}", deleted); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_meta::kv_backend::chroot::ChrootKvBackend; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::{KvBackend, KvBackendRef}; + use common_meta::rpc::store::RangeRequest; + + use crate::metadata::control::del::key::KeyDeleter; + use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX; + use crate::metadata::control::test_utils::put_key; + + #[tokio::test] + async fn test_delete_keys() { + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let key_deleter = KeyDeleter::new(kv_backend.clone()); + put_key(&kv_backend, "foo", "bar").await; + put_key(&kv_backend, "foo/bar", "baz").await; + put_key(&kv_backend, "foo/baz", "qux").await; + let deleted = key_deleter.delete("foo", true).await.unwrap(); + assert_eq!(deleted, 3); + let deleted = key_deleter.delete("foo/bar", false).await.unwrap(); + assert_eq!(deleted, 0); + + let chroot = ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend); + let req = RangeRequest::default().with_prefix(b"foo"); + let resp = chroot.range(req).await.unwrap(); + assert_eq!(resp.kvs.len(), 3); + assert_eq!(resp.kvs[0].key, b"foo"); + assert_eq!(resp.kvs[0].value, b"bar"); + assert_eq!(resp.kvs[1].key, b"foo/bar"); + assert_eq!(resp.kvs[1].value, b"baz"); + assert_eq!(resp.kvs[2].key, b"foo/baz"); + assert_eq!(resp.kvs[2].value, b"qux"); + } +} diff --git a/src/cli/src/metadata/control/del/table.rs b/src/cli/src/metadata/control/del/table.rs new file mode 100644 index 0000000000..d62ab1ed31 --- /dev/null +++ b/src/cli/src/metadata/control/del/table.rs @@ -0,0 +1,235 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use clap::Parser; +use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::format_full_table_name; +use common_error::ext::BoxedError; +use common_meta::ddl::utils::get_region_wal_options; +use common_meta::key::table_name::TableNameManager; +use common_meta::key::TableMetadataManager; +use common_meta::kv_backend::KvBackendRef; +use store_api::storage::TableId; + +use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu}; +use crate::metadata::common::StoreConfig; +use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX; +use crate::metadata::control::utils::get_table_id_by_name; +use crate::Tool; + +/// Delete table metadata logically from the metadata store. +#[derive(Debug, Default, Parser)] +pub struct DelTableCommand { + /// The table id to delete from the metadata store. + #[clap(long)] + table_id: Option, + + /// The table name to delete from the metadata store. + #[clap(long)] + table_name: Option, + + /// The schema name of the table. + #[clap(long, default_value = DEFAULT_SCHEMA_NAME)] + schema_name: String, + + /// The catalog name of the table. + #[clap(long, default_value = DEFAULT_CATALOG_NAME)] + catalog_name: String, + + #[clap(flatten)] + store: StoreConfig, +} + +impl DelTableCommand { + fn validate(&self) -> Result<(), BoxedError> { + if matches!( + (&self.table_id, &self.table_name), + (Some(_), Some(_)) | (None, None) + ) { + return Err(BoxedError::new( + InvalidArgumentsSnafu { + msg: "You must specify either --table-id or --table-name.", + } + .build(), + )); + } + Ok(()) + } +} + +impl DelTableCommand { + pub async fn build(&self) -> Result, BoxedError> { + self.validate()?; + let kv_backend = self.store.build().await?; + Ok(Box::new(DelTableTool { + table_id: self.table_id, + table_name: self.table_name.clone(), + schema_name: self.schema_name.clone(), + catalog_name: self.catalog_name.clone(), + table_name_manager: TableNameManager::new(kv_backend.clone()), + table_metadata_deleter: TableMetadataDeleter::new(kv_backend), + })) + } +} + +struct DelTableTool { + table_id: Option, + table_name: Option, + schema_name: String, + catalog_name: String, + table_name_manager: TableNameManager, + table_metadata_deleter: TableMetadataDeleter, +} + +#[async_trait] +impl Tool for DelTableTool { + async fn do_work(&self) -> Result<(), BoxedError> { + let table_id = if let Some(table_name) = &self.table_name { + let catalog_name = &self.catalog_name; + let schema_name = &self.schema_name; + + let Some(table_id) = get_table_id_by_name( + &self.table_name_manager, + catalog_name, + schema_name, + table_name, + ) + .await? + else { + println!( + "Table({}) not found", + format_full_table_name(catalog_name, schema_name, table_name) + ); + return Ok(()); + }; + table_id + } else { + // Safety: we have validated that table_id or table_name is not None + self.table_id.unwrap() + }; + self.table_metadata_deleter.delete(table_id).await?; + println!("Table({}) deleted", table_id); + + Ok(()) + } +} + +struct TableMetadataDeleter { + table_metadata_manager: TableMetadataManager, +} + +impl TableMetadataDeleter { + fn new(kv_backend: KvBackendRef) -> Self { + Self { + table_metadata_manager: TableMetadataManager::new_with_custom_tombstone_prefix( + kv_backend, + CLI_TOMBSTONE_PREFIX, + ), + } + } + + async fn delete(&self, table_id: TableId) -> Result<(), BoxedError> { + let (table_info, table_route) = self + .table_metadata_manager + .get_full_table_info(table_id) + .await + .map_err(BoxedError::new)?; + let Some(table_info) = table_info else { + return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build())); + }; + let Some(table_route) = table_route else { + return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build())); + }; + let physical_table_id = self + .table_metadata_manager + .table_route_manager() + .get_physical_table_id(table_id) + .await + .map_err(BoxedError::new)?; + + let table_name = table_info.table_name(); + let region_wal_options = get_region_wal_options( + &self.table_metadata_manager, + &table_route, + physical_table_id, + ) + .await + .map_err(BoxedError::new)?; + + self.table_metadata_manager + .delete_table_metadata(table_id, &table_name, &table_route, ®ion_wal_options) + .await + .map_err(BoxedError::new)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; + use common_meta::key::table_route::TableRouteValue; + use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::chroot::ChrootKvBackend; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::{KvBackend, KvBackendRef}; + use common_meta::rpc::store::RangeRequest; + + use crate::metadata::control::del::table::TableMetadataDeleter; + use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX; + use crate::metadata::control::test_utils::prepare_physical_table_metadata; + + #[tokio::test] + async fn test_delete_table_not_found() { + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + + let table_metadata_deleter = TableMetadataDeleter::new(kv_backend); + let table_id = 1; + let err = table_metadata_deleter.delete(table_id).await.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::TableNotFound); + } + + #[tokio::test] + async fn test_delete_table_metadata() { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = TableMetadataManager::new(kv_backend.clone()); + let table_id = 1024; + let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await; + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::Physical(table_route), + HashMap::new(), + ) + .await + .unwrap(); + + let total_keys = kv_backend.len(); + assert!(total_keys > 0); + + let table_metadata_deleter = TableMetadataDeleter::new(kv_backend.clone()); + table_metadata_deleter.delete(table_id).await.unwrap(); + + // Check the tombstone keys are deleted + let chroot = + ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend.clone()); + let req = RangeRequest::default().with_range(vec![0], vec![0]); + let resp = chroot.range(req).await.unwrap(); + assert_eq!(resp.kvs.len(), total_keys); + } +} diff --git a/src/cli/src/metadata/control/get.rs b/src/cli/src/metadata/control/get.rs index 875512689d..04ada521d5 100644 --- a/src/cli/src/metadata/control/get.rs +++ b/src/cli/src/metadata/control/get.rs @@ -20,7 +20,6 @@ use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::key::table_info::TableInfoKey; -use common_meta::key::table_name::TableNameKey; use common_meta::key::table_route::TableRouteKey; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; @@ -30,7 +29,7 @@ use futures::TryStreamExt; use crate::error::InvalidArgumentsSnafu; use crate::metadata::common::StoreConfig; -use crate::metadata::control::utils::{decode_key_value, json_fromatter}; +use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter}; use crate::Tool; /// Getting metadata from metadata store. @@ -130,8 +129,12 @@ pub struct GetTableCommand { table_name: Option, /// The schema name of the table. - #[clap(long)] - schema_name: Option, + #[clap(long, default_value = DEFAULT_SCHEMA_NAME)] + schema_name: String, + + /// The catalog name of the table. + #[clap(long, default_value = DEFAULT_CATALOG_NAME)] + catalog_name: String, /// Pretty print the output. #[clap(long, default_value = "false")] @@ -143,7 +146,10 @@ pub struct GetTableCommand { impl GetTableCommand { pub fn validate(&self) -> Result<(), BoxedError> { - if self.table_id.is_none() && self.table_name.is_none() { + if matches!( + (&self.table_id, &self.table_name), + (Some(_), Some(_)) | (None, None) + ) { return Err(BoxedError::new( InvalidArgumentsSnafu { msg: "You must specify either --table-id or --table-name.", @@ -159,7 +165,8 @@ struct GetTableTool { kvbackend: KvBackendRef, table_id: Option, table_name: Option, - schema_name: Option, + schema_name: String, + catalog_name: String, pretty: bool, } @@ -172,23 +179,20 @@ impl Tool for GetTableTool { let table_route_manager = table_metadata_manager.table_route_manager(); let table_id = if let Some(table_name) = &self.table_name { - let catalog = DEFAULT_CATALOG_NAME.to_string(); - let schema_name = self - .schema_name - .clone() - .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); - let key = TableNameKey::new(&catalog, &schema_name, table_name); + let catalog_name = &self.catalog_name; + let schema_name = &self.schema_name; - let Some(table_name) = table_name_manager.get(key).await.map_err(BoxedError::new)? + let Some(table_id) = + get_table_id_by_name(table_name_manager, catalog_name, schema_name, table_name) + .await? else { println!( "Table({}) not found", - format_full_table_name(&catalog, &schema_name, table_name) + format_full_table_name(catalog_name, schema_name, table_name) ); return Ok(()); }; - - table_name.table_id() + table_id } else { // Safety: we have validated that table_id or table_name is not None self.table_id.unwrap() @@ -236,6 +240,7 @@ impl GetTableCommand { table_id: self.table_id, table_name: self.table_name.clone(), schema_name: self.schema_name.clone(), + catalog_name: self.catalog_name.clone(), pretty: self.pretty, })) } diff --git a/src/cli/src/metadata/control/test_utils.rs b/src/cli/src/metadata/control/test_utils.rs new file mode 100644 index 0000000000..8f7d236ec7 --- /dev/null +++ b/src/cli/src/metadata/control/test_utils.rs @@ -0,0 +1,51 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::ddl::test_util::test_create_physical_table_task; +use common_meta::key::table_route::PhysicalTableRouteValue; +use common_meta::kv_backend::KvBackendRef; +use common_meta::peer::Peer; +use common_meta::rpc::router::{Region, RegionRoute}; +use common_meta::rpc::store::PutRequest; +use store_api::storage::{RegionId, TableId}; +use table::metadata::RawTableInfo; + +/// Puts a key-value pair into the kv backend. +pub async fn put_key(kv_backend: &KvBackendRef, key: &str, value: &str) { + let put_req = PutRequest::new() + .with_key(key.as_bytes()) + .with_value(value.as_bytes()); + kv_backend.put(put_req).await.unwrap(); +} + +/// Prepares the physical table metadata for testing. +/// +/// Returns the table info and the table route. +pub async fn prepare_physical_table_metadata( + table_name: &str, + table_id: TableId, +) -> (RawTableInfo, PhysicalTableRouteValue) { + let mut create_physical_table_task = test_create_physical_table_task(table_name); + let table_route = PhysicalTableRouteValue::new(vec![RegionRoute { + region: Region { + id: RegionId::new(table_id, 1), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]); + create_physical_table_task.set_table_id(table_id); + + (create_physical_table_task.table_info, table_route) +} diff --git a/src/cli/src/metadata/control/utils.rs b/src/cli/src/metadata/control/utils.rs index 64e6ae8e1a..f557cb6be3 100644 --- a/src/cli/src/metadata/control/utils.rs +++ b/src/cli/src/metadata/control/utils.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_error::ext::BoxedError; use common_meta::error::Result as CommonMetaResult; +use common_meta::key::table_name::{TableNameKey, TableNameManager}; use common_meta::rpc::KeyValue; use serde::Serialize; +use store_api::storage::TableId; /// Decodes a key-value pair into a string. pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> { @@ -34,3 +37,21 @@ where serde_json::to_string(value).unwrap() } } + +/// Gets the table id by table name. +pub async fn get_table_id_by_name( + table_name_manager: &TableNameManager, + catalog_name: &str, + schema_name: &str, + table_name: &str, +) -> Result, BoxedError> { + let table_name_key = TableNameKey::new(catalog_name, schema_name, table_name); + let Some(table_name_value) = table_name_manager + .get(table_name_key) + .await + .map_err(BoxedError::new)? + else { + return Ok(None); + }; + Ok(Some(table_name_value.table_id())) +} diff --git a/src/cli/src/metadata/repair.rs b/src/cli/src/metadata/repair.rs index 8e853956a0..212a52d6e1 100644 --- a/src/cli/src/metadata/repair.rs +++ b/src/cli/src/metadata/repair.rs @@ -48,11 +48,11 @@ use crate::Tool; #[derive(Debug, Default, Parser)] pub struct RepairLogicalTablesCommand { /// The names of the tables to repair. - #[clap(long, value_delimiter = ',')] + #[clap(long, value_delimiter = ',', alias = "table-name")] table_names: Vec, /// The id of the table to repair. - #[clap(long, value_delimiter = ',')] + #[clap(long, value_delimiter = ',', alias = "table-id")] table_ids: Vec, /// The schema of the tables to repair. diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 216962a87a..36fb196819 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; use common_procedure::Status; use common_telemetry::info; @@ -25,7 +24,7 @@ use table::table_name::TableName; use crate::ddl::drop_database::cursor::DropDatabaseCursor; use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State}; use crate::ddl::drop_table::executor::DropTableExecutor; -use crate::ddl::utils::extract_region_wal_options; +use crate::ddl::utils::get_region_wal_options; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::table_route::TableRouteValue; @@ -109,17 +108,12 @@ impl State for DropDatabaseExecutor { ); // Deletes topic-region mapping if dropping physical table - let region_wal_options = - if let TableRouteValue::Physical(table_route_value) = &table_route_value { - let datanode_table_values = ddl_ctx - .table_metadata_manager - .datanode_table_manager() - .regions(self.physical_table_id, table_route_value) - .await?; - extract_region_wal_options(&datanode_table_values)? - } else { - HashMap::new() - }; + let region_wal_options = get_region_wal_options( + &ddl_ctx.table_metadata_manager, + &table_route_value, + self.physical_table_id, + ) + .await?; executor .on_destroy_metadata(ddl_ctx, &table_route_value, ®ion_wal_options) diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index 4ee165541f..eb1299334f 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -42,7 +42,8 @@ use crate::error::{ }; use crate::key::datanode_table::DatanodeTableValue; use crate::key::table_name::TableNameKey; -use crate::key::TableMetadataManagerRef; +use crate::key::table_route::TableRouteValue; +use crate::key::{TableMetadataManager, TableMetadataManagerRef}; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute}; @@ -187,6 +188,25 @@ pub fn parse_region_wal_options( Ok(region_wal_options) } +/// Gets the wal options for a table. +pub async fn get_region_wal_options( + table_metadata_manager: &TableMetadataManager, + table_route_value: &TableRouteValue, + physical_table_id: TableId, +) -> Result> { + let region_wal_options = + if let TableRouteValue::Physical(table_route_value) = &table_route_value { + let datanode_table_values = table_metadata_manager + .datanode_table_manager() + .regions(physical_table_id, table_route_value) + .await?; + extract_region_wal_options(&datanode_table_values)? + } else { + HashMap::new() + }; + Ok(region_wal_options) +} + /// Extracts region wal options from [DatanodeTableValue]s. pub fn extract_region_wal_options( datanode_table_values: &Vec, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index fdc2de38b6..6a50baf381 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -109,7 +109,7 @@ pub mod table_name; pub mod table_route; #[cfg(any(test, feature = "testing"))] pub mod test_utils; -mod tombstone; +pub mod tombstone; pub mod topic_name; pub mod topic_region; pub mod txn_helper; @@ -535,6 +535,29 @@ impl TableMetadataManager { } } + /// Creates a new `TableMetadataManager` with a custom tombstone prefix. + pub fn new_with_custom_tombstone_prefix( + kv_backend: KvBackendRef, + tombstone_prefix: &str, + ) -> Self { + Self { + table_name_manager: TableNameManager::new(kv_backend.clone()), + table_info_manager: TableInfoManager::new(kv_backend.clone()), + view_info_manager: ViewInfoManager::new(kv_backend.clone()), + datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()), + catalog_manager: CatalogManager::new(kv_backend.clone()), + schema_manager: SchemaManager::new(kv_backend.clone()), + table_route_manager: TableRouteManager::new(kv_backend.clone()), + tombstone_manager: TombstoneManager::new_with_prefix( + kv_backend.clone(), + tombstone_prefix, + ), + topic_name_manager: TopicNameManager::new(kv_backend.clone()), + topic_region_manager: TopicRegionManager::new(kv_backend.clone()), + kv_backend, + } + } + pub async fn init(&self) -> Result<()> { let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME); @@ -925,7 +948,7 @@ impl TableMetadataManager { ) -> Result<()> { let keys = self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?; - self.tombstone_manager.create(keys).await + self.tombstone_manager.create(keys).await.map(|_| ()) } /// Deletes metadata tombstone for table **permanently**. @@ -939,7 +962,10 @@ impl TableMetadataManager { ) -> Result<()> { let table_metadata_keys = self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?; - self.tombstone_manager.delete(table_metadata_keys).await + self.tombstone_manager + .delete(table_metadata_keys) + .await + .map(|_| ()) } /// Restores metadata for table. @@ -953,7 +979,7 @@ impl TableMetadataManager { ) -> Result<()> { let keys = self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?; - self.tombstone_manager.restore(keys).await + self.tombstone_manager.restore(keys).await.map(|_| ()) } /// Deletes metadata for table **permanently**. diff --git a/src/common/meta/src/key/tombstone.rs b/src/common/meta/src/key/tombstone.rs index 9aa2dd69ee..44f17bc794 100644 --- a/src/common/meta/src/key/tombstone.rs +++ b/src/common/meta/src/key/tombstone.rs @@ -25,20 +25,32 @@ use crate::rpc::store::BatchGetRequest; /// [TombstoneManager] provides the ability to: /// - logically delete values /// - restore the deleted values -pub(crate) struct TombstoneManager { +pub struct TombstoneManager { kv_backend: KvBackendRef, + tombstone_prefix: String, } const TOMBSTONE_PREFIX: &str = "__tombstone/"; -fn to_tombstone(key: &[u8]) -> Vec { - [TOMBSTONE_PREFIX.as_bytes(), key].concat() -} - impl TombstoneManager { /// Returns [TombstoneManager]. pub fn new(kv_backend: KvBackendRef) -> Self { - Self { kv_backend } + Self { + kv_backend, + tombstone_prefix: TOMBSTONE_PREFIX.to_string(), + } + } + + /// Returns [TombstoneManager] with a custom tombstone prefix. + pub fn new_with_prefix(kv_backend: KvBackendRef, prefix: &str) -> Self { + Self { + kv_backend, + tombstone_prefix: prefix.to_string(), + } + } + + pub fn to_tombstone(&self, key: &[u8]) -> Vec { + [self.tombstone_prefix.as_bytes(), key].concat() } /// Moves value to `dest_key`. @@ -67,7 +79,7 @@ impl TombstoneManager { (txn, TxnOpGetResponseSet::filter(src_key)) } - async fn move_values_inner(&self, keys: &[Vec], dest_keys: &[Vec]) -> Result<()> { + async fn move_values_inner(&self, keys: &[Vec], dest_keys: &[Vec]) -> Result { ensure!( keys.len() == dest_keys.len(), error::UnexpectedSnafu { @@ -102,7 +114,7 @@ impl TombstoneManager { .unzip(); let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?; if resp.succeeded { - return Ok(()); + return Ok(keys.len()); } let mut set = TxnOpGetResponseSet::from(&mut resp.responses); // Updates results. @@ -125,7 +137,9 @@ impl TombstoneManager { } /// Moves values to `dest_key`. - async fn move_values(&self, keys: Vec>, dest_keys: Vec>) -> Result<()> { + /// + /// Returns the number of keys that were moved. + async fn move_values(&self, keys: Vec>, dest_keys: Vec>) -> Result { let chunk_size = self.kv_backend.max_txn_ops() / 2; if keys.len() > chunk_size { let keys_chunks = keys.chunks(chunk_size).collect::>(); @@ -134,7 +148,7 @@ impl TombstoneManager { self.move_values_inner(keys, dest_keys).await?; } - Ok(()) + Ok(keys.len()) } else { self.move_values_inner(&keys, &dest_keys).await } @@ -145,11 +159,13 @@ impl TombstoneManager { /// Preforms to: /// - deletes origin values. /// - stores tombstone values. - pub(crate) async fn create(&self, keys: Vec>) -> Result<()> { + /// + /// Returns the number of keys that were moved. + pub async fn create(&self, keys: Vec>) -> Result { let (keys, dest_keys): (Vec<_>, Vec<_>) = keys .into_iter() .map(|key| { - let tombstone_key = to_tombstone(&key); + let tombstone_key = self.to_tombstone(&key); (key, tombstone_key) }) .unzip(); @@ -162,11 +178,13 @@ impl TombstoneManager { /// Preforms to: /// - restore origin value. /// - deletes tombstone values. - pub(crate) async fn restore(&self, keys: Vec>) -> Result<()> { + /// + /// Returns the number of keys that were restored. + pub async fn restore(&self, keys: Vec>) -> Result { let (keys, dest_keys): (Vec<_>, Vec<_>) = keys .into_iter() .map(|key| { - let tombstone_key = to_tombstone(&key); + let tombstone_key = self.to_tombstone(&key); (tombstone_key, key) }) .unzip(); @@ -175,16 +193,18 @@ impl TombstoneManager { } /// Deletes tombstones values for the specified `keys`. - pub(crate) async fn delete(&self, keys: Vec>) -> Result<()> { + /// + /// Returns the number of keys that were deleted. + pub async fn delete(&self, keys: Vec>) -> Result { let operations = keys .iter() - .map(|key| TxnOp::Delete(to_tombstone(key))) + .map(|key| TxnOp::Delete(self.to_tombstone(key))) .collect::>(); let txn = Txn::new().and_then(operations); // Always success. let _ = self.kv_backend.txn(txn).await?; - Ok(()) + Ok(keys.len()) } } @@ -194,7 +214,6 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; - use super::to_tombstone; use crate::error::Error; use crate::key::tombstone::TombstoneManager; use crate::kv_backend::memory::MemoryKvBackend; @@ -246,7 +265,7 @@ mod tests { assert!(!kv_backend.exists(b"foo").await.unwrap()); assert_eq!( kv_backend - .get(&to_tombstone(b"bar")) + .get(&tombstone_manager.to_tombstone(b"bar")) .await .unwrap() .unwrap() @@ -255,7 +274,7 @@ mod tests { ); assert_eq!( kv_backend - .get(&to_tombstone(b"foo")) + .get(&tombstone_manager.to_tombstone(b"foo")) .await .unwrap() .unwrap() @@ -287,7 +306,7 @@ mod tests { kv_backend.clone(), &[MoveValue { key: b"bar".to_vec(), - dest_key: to_tombstone(b"bar"), + dest_key: tombstone_manager.to_tombstone(b"bar"), value: b"baz".to_vec(), }], ) @@ -364,7 +383,7 @@ mod tests { .iter() .map(|(key, value)| MoveValue { key: key.clone(), - dest_key: to_tombstone(key), + dest_key: tombstone_manager.to_tombstone(key), value: value.clone(), }) .collect::>(); @@ -409,7 +428,7 @@ mod tests { .iter() .map(|(key, value)| MoveValue { key: key.clone(), - dest_key: to_tombstone(key), + dest_key: tombstone_manager.to_tombstone(key), value: value.clone(), }) .collect::>(); @@ -462,7 +481,7 @@ mod tests { .iter() .map(|(key, value)| MoveValue { key: key.clone(), - dest_key: to_tombstone(key), + dest_key: tombstone_manager.to_tombstone(key), value: value.clone(), }) .collect::>(); @@ -502,7 +521,7 @@ mod tests { .iter() .map(|(key, value)| MoveValue { key: key.clone(), - dest_key: to_tombstone(key), + dest_key: tombstone_manager.to_tombstone(key), value: value.clone(), }) .collect::>(); @@ -537,7 +556,7 @@ mod tests { .iter() .map(|(key, value)| MoveValue { key: key.clone(), - dest_key: to_tombstone(key), + dest_key: tombstone_manager.to_tombstone(key), value: value.clone(), }) .collect::>(); diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 1c4016e775..a913eef81d 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -54,20 +54,20 @@ impl MemoryKvBackend { kvs.clear(); } - #[cfg(test)] + #[cfg(any(test, feature = "testing"))] /// Returns true if the `kvs` is empty. pub fn is_empty(&self) -> bool { self.kvs.read().unwrap().is_empty() } - #[cfg(test)] + #[cfg(any(test, feature = "testing"))] /// Returns the `kvs`. pub fn dump(&self) -> BTreeMap, Vec> { let kvs = self.kvs.read().unwrap(); kvs.clone() } - #[cfg(test)] + #[cfg(any(test, feature = "testing"))] /// Returns the length of `kvs` pub fn len(&self) -> usize { self.kvs.read().unwrap().len()