feat(cli): add metadata del commands (#6339)

* feat: introduce cli for deleting metadata

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(cli): flatten metadata command structure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add alias

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): implement logical deletion for CLI tool

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-06-19 14:48:00 +08:00
committed by GitHub
parent 12c3a3205b
commit d9faa5c801
16 changed files with 632 additions and 88 deletions

View File

@@ -67,6 +67,7 @@ tokio.workspace = true
tracing-appender.workspace = true
[dev-dependencies]
common-meta = { workspace = true, features = ["testing"] }
common-version.workspace = true
serde.workspace = true
tempfile.workspace = true

View File

@@ -20,6 +20,7 @@ use common_macro::stack_trace_debug;
use common_meta::peer::Peer;
use object_store::Error as ObjectStoreError;
use snafu::{Location, Snafu};
use store_api::storage::TableId;
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -237,6 +238,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Table not found: {table_id}"))]
TableNotFound {
table_id: TableId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("OpenDAL operator failed"))]
OpenDal {
#[snafu(implicit)]
@@ -356,6 +364,7 @@ impl ErrorExt for Error {
Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal,
Error::MetaClientInit { source, .. } => source.status_code(),
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::SchemaNotFound { .. } => StatusCode::DatabaseNotFound,
}
}

View File

@@ -21,7 +21,7 @@ mod utils;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::metadata::control::ControlCommand;
use crate::metadata::control::{DelCommand, GetCommand};
use crate::metadata::repair::RepairLogicalTablesCommand;
use crate::metadata::snapshot::SnapshotCommand;
use crate::Tool;
@@ -34,7 +34,9 @@ pub enum MetadataCommand {
#[clap(subcommand)]
Snapshot(SnapshotCommand),
#[clap(subcommand)]
Control(ControlCommand),
Get(GetCommand),
#[clap(subcommand)]
Del(DelCommand),
RepairLogicalTables(RepairLogicalTablesCommand),
}
@@ -42,8 +44,9 @@ impl MetadataCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetadataCommand::Snapshot(cmd) => cmd.build().await,
MetadataCommand::Control(cmd) => cmd.build().await,
MetadataCommand::RepairLogicalTables(cmd) => cmd.build().await,
MetadataCommand::Get(cmd) => cmd.build().await,
MetadataCommand::Del(cmd) => cmd.build().await,
}
}
}

View File

@@ -12,26 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod del;
mod get;
#[cfg(test)]
mod test_utils;
mod utils;
use clap::Subcommand;
use common_error::ext::BoxedError;
use get::GetCommand;
use crate::Tool;
/// Subcommand for metadata control, including getting metadata from metadata store
#[derive(Subcommand)]
pub enum ControlCommand {
#[clap(subcommand)]
Get(GetCommand),
}
impl ControlCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
ControlCommand::Get(cmd) => cmd.build().await,
}
}
}
pub(crate) use del::DelCommand;
pub(crate) use get::GetCommand;

View File

@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod key;
mod table;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::metadata::control::del::key::DelKeyCommand;
use crate::metadata::control::del::table::DelTableCommand;
use crate::Tool;
/// The prefix of the tombstone keys.
pub(crate) const CLI_TOMBSTONE_PREFIX: &str = "__cli_tombstone/";
/// Subcommand for deleting metadata from the metadata store.
#[derive(Subcommand)]
pub enum DelCommand {
Key(DelKeyCommand),
Table(DelTableCommand),
}
impl DelCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
DelCommand::Key(cmd) => cmd.build().await,
DelCommand::Table(cmd) => cmd.build().await,
}
}
}

View File

@@ -0,0 +1,133 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::key::tombstone::TombstoneManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::RangeRequest;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::Tool;
/// Delete key-value pairs logically from the metadata store.
#[derive(Debug, Default, Parser)]
pub struct DelKeyCommand {
/// The key to delete from the metadata store.
#[clap(long)]
key: String,
/// Delete key-value pairs with the given prefix.
#[clap(long)]
prefix: bool,
#[clap(flatten)]
store: StoreConfig,
}
impl DelKeyCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kv_backend = self.store.build().await?;
Ok(Box::new(DelKeyTool {
key: self.key.to_string(),
prefix: self.prefix,
key_deleter: KeyDeleter::new(kv_backend),
}))
}
}
struct KeyDeleter {
kv_backend: KvBackendRef,
tombstone_manager: TombstoneManager,
}
impl KeyDeleter {
fn new(kv_backend: KvBackendRef) -> Self {
Self {
kv_backend: kv_backend.clone(),
tombstone_manager: TombstoneManager::new_with_prefix(kv_backend, CLI_TOMBSTONE_PREFIX),
}
}
async fn delete(&self, key: &str, prefix: bool) -> Result<usize, BoxedError> {
let mut req = RangeRequest::default().with_keys_only();
if prefix {
req = req.with_prefix(key.as_bytes());
} else {
req = req.with_key(key.as_bytes());
}
let resp = self.kv_backend.range(req).await.map_err(BoxedError::new)?;
let keys = resp.kvs.iter().map(|kv| kv.key.clone()).collect::<Vec<_>>();
self.tombstone_manager
.create(keys)
.await
.map_err(BoxedError::new)
}
}
struct DelKeyTool {
key: String,
prefix: bool,
key_deleter: KeyDeleter,
}
#[async_trait]
impl Tool for DelKeyTool {
async fn do_work(&self) -> Result<(), BoxedError> {
let deleted = self.key_deleter.delete(&self.key, self.prefix).await?;
// Print the number of deleted keys.
println!("{}", deleted);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::RangeRequest;
use crate::metadata::control::del::key::KeyDeleter;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::test_utils::put_key;
#[tokio::test]
async fn test_delete_keys() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let key_deleter = KeyDeleter::new(kv_backend.clone());
put_key(&kv_backend, "foo", "bar").await;
put_key(&kv_backend, "foo/bar", "baz").await;
put_key(&kv_backend, "foo/baz", "qux").await;
let deleted = key_deleter.delete("foo", true).await.unwrap();
assert_eq!(deleted, 3);
let deleted = key_deleter.delete("foo/bar", false).await.unwrap();
assert_eq!(deleted, 0);
let chroot = ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend);
let req = RangeRequest::default().with_prefix(b"foo");
let resp = chroot.range(req).await.unwrap();
assert_eq!(resp.kvs.len(), 3);
assert_eq!(resp.kvs[0].key, b"foo");
assert_eq!(resp.kvs[0].value, b"bar");
assert_eq!(resp.kvs[1].key, b"foo/bar");
assert_eq!(resp.kvs[1].value, b"baz");
assert_eq!(resp.kvs[2].key, b"foo/baz");
assert_eq!(resp.kvs[2].value, b"qux");
}
}

View File

@@ -0,0 +1,235 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use clap::Parser;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::ddl::utils::get_region_wal_options;
use common_meta::key::table_name::TableNameManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use store_api::storage::TableId;
use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu};
use crate::metadata::common::StoreConfig;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::utils::get_table_id_by_name;
use crate::Tool;
/// Delete table metadata logically from the metadata store.
#[derive(Debug, Default, Parser)]
pub struct DelTableCommand {
/// The table id to delete from the metadata store.
#[clap(long)]
table_id: Option<u32>,
/// The table name to delete from the metadata store.
#[clap(long)]
table_name: Option<String>,
/// The schema name of the table.
#[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
schema_name: String,
/// The catalog name of the table.
#[clap(long, default_value = DEFAULT_CATALOG_NAME)]
catalog_name: String,
#[clap(flatten)]
store: StoreConfig,
}
impl DelTableCommand {
fn validate(&self) -> Result<(), BoxedError> {
if matches!(
(&self.table_id, &self.table_name),
(Some(_), Some(_)) | (None, None)
) {
return Err(BoxedError::new(
InvalidArgumentsSnafu {
msg: "You must specify either --table-id or --table-name.",
}
.build(),
));
}
Ok(())
}
}
impl DelTableCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
self.validate()?;
let kv_backend = self.store.build().await?;
Ok(Box::new(DelTableTool {
table_id: self.table_id,
table_name: self.table_name.clone(),
schema_name: self.schema_name.clone(),
catalog_name: self.catalog_name.clone(),
table_name_manager: TableNameManager::new(kv_backend.clone()),
table_metadata_deleter: TableMetadataDeleter::new(kv_backend),
}))
}
}
struct DelTableTool {
table_id: Option<u32>,
table_name: Option<String>,
schema_name: String,
catalog_name: String,
table_name_manager: TableNameManager,
table_metadata_deleter: TableMetadataDeleter,
}
#[async_trait]
impl Tool for DelTableTool {
async fn do_work(&self) -> Result<(), BoxedError> {
let table_id = if let Some(table_name) = &self.table_name {
let catalog_name = &self.catalog_name;
let schema_name = &self.schema_name;
let Some(table_id) = get_table_id_by_name(
&self.table_name_manager,
catalog_name,
schema_name,
table_name,
)
.await?
else {
println!(
"Table({}) not found",
format_full_table_name(catalog_name, schema_name, table_name)
);
return Ok(());
};
table_id
} else {
// Safety: we have validated that table_id or table_name is not None
self.table_id.unwrap()
};
self.table_metadata_deleter.delete(table_id).await?;
println!("Table({}) deleted", table_id);
Ok(())
}
}
struct TableMetadataDeleter {
table_metadata_manager: TableMetadataManager,
}
impl TableMetadataDeleter {
fn new(kv_backend: KvBackendRef) -> Self {
Self {
table_metadata_manager: TableMetadataManager::new_with_custom_tombstone_prefix(
kv_backend,
CLI_TOMBSTONE_PREFIX,
),
}
}
async fn delete(&self, table_id: TableId) -> Result<(), BoxedError> {
let (table_info, table_route) = self
.table_metadata_manager
.get_full_table_info(table_id)
.await
.map_err(BoxedError::new)?;
let Some(table_info) = table_info else {
return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
};
let Some(table_route) = table_route else {
return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
};
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.map_err(BoxedError::new)?;
let table_name = table_info.table_name();
let region_wal_options = get_region_wal_options(
&self.table_metadata_manager,
&table_route,
physical_table_id,
)
.await
.map_err(BoxedError::new)?;
self.table_metadata_manager
.delete_table_metadata(table_id, &table_name, &table_route, &region_wal_options)
.await
.map_err(BoxedError::new)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::RangeRequest;
use crate::metadata::control::del::table::TableMetadataDeleter;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::test_utils::prepare_physical_table_metadata;
#[tokio::test]
async fn test_delete_table_not_found() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let table_metadata_deleter = TableMetadataDeleter::new(kv_backend);
let table_id = 1;
let err = table_metadata_deleter.delete(table_id).await.unwrap_err();
assert_eq!(err.status_code(), StatusCode::TableNotFound);
}
#[tokio::test]
async fn test_delete_table_metadata() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
let table_id = 1024;
let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::Physical(table_route),
HashMap::new(),
)
.await
.unwrap();
let total_keys = kv_backend.len();
assert!(total_keys > 0);
let table_metadata_deleter = TableMetadataDeleter::new(kv_backend.clone());
table_metadata_deleter.delete(table_id).await.unwrap();
// Check the tombstone keys are deleted
let chroot =
ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend.clone());
let req = RangeRequest::default().with_range(vec![0], vec![0]);
let resp = chroot.range(req).await.unwrap();
assert_eq!(resp.kvs.len(), total_keys);
}
}

View File

@@ -20,7 +20,6 @@ use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
@@ -30,7 +29,7 @@ use futures::TryStreamExt;
use crate::error::InvalidArgumentsSnafu;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::utils::{decode_key_value, json_fromatter};
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
use crate::Tool;
/// Getting metadata from metadata store.
@@ -130,8 +129,12 @@ pub struct GetTableCommand {
table_name: Option<String>,
/// The schema name of the table.
#[clap(long)]
schema_name: Option<String>,
#[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
schema_name: String,
/// The catalog name of the table.
#[clap(long, default_value = DEFAULT_CATALOG_NAME)]
catalog_name: String,
/// Pretty print the output.
#[clap(long, default_value = "false")]
@@ -143,7 +146,10 @@ pub struct GetTableCommand {
impl GetTableCommand {
pub fn validate(&self) -> Result<(), BoxedError> {
if self.table_id.is_none() && self.table_name.is_none() {
if matches!(
(&self.table_id, &self.table_name),
(Some(_), Some(_)) | (None, None)
) {
return Err(BoxedError::new(
InvalidArgumentsSnafu {
msg: "You must specify either --table-id or --table-name.",
@@ -159,7 +165,8 @@ struct GetTableTool {
kvbackend: KvBackendRef,
table_id: Option<u32>,
table_name: Option<String>,
schema_name: Option<String>,
schema_name: String,
catalog_name: String,
pretty: bool,
}
@@ -172,23 +179,20 @@ impl Tool for GetTableTool {
let table_route_manager = table_metadata_manager.table_route_manager();
let table_id = if let Some(table_name) = &self.table_name {
let catalog = DEFAULT_CATALOG_NAME.to_string();
let schema_name = self
.schema_name
.clone()
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let key = TableNameKey::new(&catalog, &schema_name, table_name);
let catalog_name = &self.catalog_name;
let schema_name = &self.schema_name;
let Some(table_name) = table_name_manager.get(key).await.map_err(BoxedError::new)?
let Some(table_id) =
get_table_id_by_name(table_name_manager, catalog_name, schema_name, table_name)
.await?
else {
println!(
"Table({}) not found",
format_full_table_name(&catalog, &schema_name, table_name)
format_full_table_name(catalog_name, schema_name, table_name)
);
return Ok(());
};
table_name.table_id()
table_id
} else {
// Safety: we have validated that table_id or table_name is not None
self.table_id.unwrap()
@@ -236,6 +240,7 @@ impl GetTableCommand {
table_id: self.table_id,
table_name: self.table_name.clone(),
schema_name: self.schema_name.clone(),
catalog_name: self.catalog_name.clone(),
pretty: self.pretty,
}))
}

View File

@@ -0,0 +1,51 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::ddl::test_util::test_create_physical_table_task;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::rpc::store::PutRequest;
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
/// Puts a key-value pair into the kv backend.
pub async fn put_key(kv_backend: &KvBackendRef, key: &str, value: &str) {
let put_req = PutRequest::new()
.with_key(key.as_bytes())
.with_value(value.as_bytes());
kv_backend.put(put_req).await.unwrap();
}
/// Prepares the physical table metadata for testing.
///
/// Returns the table info and the table route.
pub async fn prepare_physical_table_metadata(
table_name: &str,
table_id: TableId,
) -> (RawTableInfo, PhysicalTableRouteValue) {
let mut create_physical_table_task = test_create_physical_table_task(table_name);
let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}]);
create_physical_table_task.set_table_id(table_id);
(create_physical_table_task.table_info, table_route)
}

View File

@@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::BoxedError;
use common_meta::error::Result as CommonMetaResult;
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use common_meta::rpc::KeyValue;
use serde::Serialize;
use store_api::storage::TableId;
/// Decodes a key-value pair into a string.
pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
@@ -34,3 +37,21 @@ where
serde_json::to_string(value).unwrap()
}
}
/// Gets the table id by table name.
pub async fn get_table_id_by_name(
table_name_manager: &TableNameManager,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> Result<Option<TableId>, BoxedError> {
let table_name_key = TableNameKey::new(catalog_name, schema_name, table_name);
let Some(table_name_value) = table_name_manager
.get(table_name_key)
.await
.map_err(BoxedError::new)?
else {
return Ok(None);
};
Ok(Some(table_name_value.table_id()))
}

View File

@@ -48,11 +48,11 @@ use crate::Tool;
#[derive(Debug, Default, Parser)]
pub struct RepairLogicalTablesCommand {
/// The names of the tables to repair.
#[clap(long, value_delimiter = ',')]
#[clap(long, value_delimiter = ',', alias = "table-name")]
table_names: Vec<String>,
/// The id of the table to repair.
#[clap(long, value_delimiter = ',')]
#[clap(long, value_delimiter = ',', alias = "table-id")]
table_ids: Vec<TableId>,
/// The schema of the tables to repair.

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use common_procedure::Status;
use common_telemetry::info;
@@ -25,7 +24,7 @@ use table::table_name::TableName;
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
use crate::ddl::drop_table::executor::DropTableExecutor;
use crate::ddl::utils::extract_region_wal_options;
use crate::ddl::utils::get_region_wal_options;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
@@ -109,17 +108,12 @@ impl State for DropDatabaseExecutor {
);
// Deletes topic-region mapping if dropping physical table
let region_wal_options =
if let TableRouteValue::Physical(table_route_value) = &table_route_value {
let datanode_table_values = ddl_ctx
.table_metadata_manager
.datanode_table_manager()
.regions(self.physical_table_id, table_route_value)
.await?;
extract_region_wal_options(&datanode_table_values)?
} else {
HashMap::new()
};
let region_wal_options = get_region_wal_options(
&ddl_ctx.table_metadata_manager,
&table_route_value,
self.physical_table_id,
)
.await?;
executor
.on_destroy_metadata(ddl_ctx, &table_route_value, &region_wal_options)

View File

@@ -42,7 +42,8 @@ use crate::error::{
};
use crate::key::datanode_table::DatanodeTableValue;
use crate::key::table_name::TableNameKey;
use crate::key::TableMetadataManagerRef;
use crate::key::table_route::TableRouteValue;
use crate::key::{TableMetadataManager, TableMetadataManagerRef};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
@@ -187,6 +188,25 @@ pub fn parse_region_wal_options(
Ok(region_wal_options)
}
/// Gets the wal options for a table.
pub async fn get_region_wal_options(
table_metadata_manager: &TableMetadataManager,
table_route_value: &TableRouteValue,
physical_table_id: TableId,
) -> Result<HashMap<RegionNumber, WalOptions>> {
let region_wal_options =
if let TableRouteValue::Physical(table_route_value) = &table_route_value {
let datanode_table_values = table_metadata_manager
.datanode_table_manager()
.regions(physical_table_id, table_route_value)
.await?;
extract_region_wal_options(&datanode_table_values)?
} else {
HashMap::new()
};
Ok(region_wal_options)
}
/// Extracts region wal options from [DatanodeTableValue]s.
pub fn extract_region_wal_options(
datanode_table_values: &Vec<DatanodeTableValue>,

View File

@@ -109,7 +109,7 @@ pub mod table_name;
pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;
mod tombstone;
pub mod tombstone;
pub mod topic_name;
pub mod topic_region;
pub mod txn_helper;
@@ -535,6 +535,29 @@ impl TableMetadataManager {
}
}
/// Creates a new `TableMetadataManager` with a custom tombstone prefix.
pub fn new_with_custom_tombstone_prefix(
kv_backend: KvBackendRef,
tombstone_prefix: &str,
) -> Self {
Self {
table_name_manager: TableNameManager::new(kv_backend.clone()),
table_info_manager: TableInfoManager::new(kv_backend.clone()),
view_info_manager: ViewInfoManager::new(kv_backend.clone()),
datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
catalog_manager: CatalogManager::new(kv_backend.clone()),
schema_manager: SchemaManager::new(kv_backend.clone()),
table_route_manager: TableRouteManager::new(kv_backend.clone()),
tombstone_manager: TombstoneManager::new_with_prefix(
kv_backend.clone(),
tombstone_prefix,
),
topic_name_manager: TopicNameManager::new(kv_backend.clone()),
topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
kv_backend,
}
}
pub async fn init(&self) -> Result<()> {
let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
@@ -925,7 +948,7 @@ impl TableMetadataManager {
) -> Result<()> {
let keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.create(keys).await
self.tombstone_manager.create(keys).await.map(|_| ())
}
/// Deletes metadata tombstone for table **permanently**.
@@ -939,7 +962,10 @@ impl TableMetadataManager {
) -> Result<()> {
let table_metadata_keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.delete(table_metadata_keys).await
self.tombstone_manager
.delete(table_metadata_keys)
.await
.map(|_| ())
}
/// Restores metadata for table.
@@ -953,7 +979,7 @@ impl TableMetadataManager {
) -> Result<()> {
let keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.restore(keys).await
self.tombstone_manager.restore(keys).await.map(|_| ())
}
/// Deletes metadata for table **permanently**.

View File

@@ -25,20 +25,32 @@ use crate::rpc::store::BatchGetRequest;
/// [TombstoneManager] provides the ability to:
/// - logically delete values
/// - restore the deleted values
pub(crate) struct TombstoneManager {
pub struct TombstoneManager {
kv_backend: KvBackendRef,
tombstone_prefix: String,
}
const TOMBSTONE_PREFIX: &str = "__tombstone/";
fn to_tombstone(key: &[u8]) -> Vec<u8> {
[TOMBSTONE_PREFIX.as_bytes(), key].concat()
}
impl TombstoneManager {
/// Returns [TombstoneManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
Self {
kv_backend,
tombstone_prefix: TOMBSTONE_PREFIX.to_string(),
}
}
/// Returns [TombstoneManager] with a custom tombstone prefix.
pub fn new_with_prefix(kv_backend: KvBackendRef, prefix: &str) -> Self {
Self {
kv_backend,
tombstone_prefix: prefix.to_string(),
}
}
pub fn to_tombstone(&self, key: &[u8]) -> Vec<u8> {
[self.tombstone_prefix.as_bytes(), key].concat()
}
/// Moves value to `dest_key`.
@@ -67,7 +79,7 @@ impl TombstoneManager {
(txn, TxnOpGetResponseSet::filter(src_key))
}
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<()> {
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
ensure!(
keys.len() == dest_keys.len(),
error::UnexpectedSnafu {
@@ -102,7 +114,7 @@ impl TombstoneManager {
.unzip();
let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
if resp.succeeded {
return Ok(());
return Ok(keys.len());
}
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
// Updates results.
@@ -125,7 +137,9 @@ impl TombstoneManager {
}
/// Moves values to `dest_key`.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<()> {
///
/// Returns the number of keys that were moved.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
let chunk_size = self.kv_backend.max_txn_ops() / 2;
if keys.len() > chunk_size {
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
@@ -134,7 +148,7 @@ impl TombstoneManager {
self.move_values_inner(keys, dest_keys).await?;
}
Ok(())
Ok(keys.len())
} else {
self.move_values_inner(&keys, &dest_keys).await
}
@@ -145,11 +159,13 @@ impl TombstoneManager {
/// Preforms to:
/// - deletes origin values.
/// - stores tombstone values.
pub(crate) async fn create(&self, keys: Vec<Vec<u8>>) -> Result<()> {
///
/// Returns the number of keys that were moved.
pub async fn create(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
.into_iter()
.map(|key| {
let tombstone_key = to_tombstone(&key);
let tombstone_key = self.to_tombstone(&key);
(key, tombstone_key)
})
.unzip();
@@ -162,11 +178,13 @@ impl TombstoneManager {
/// Preforms to:
/// - restore origin value.
/// - deletes tombstone values.
pub(crate) async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<()> {
///
/// Returns the number of keys that were restored.
pub async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
.into_iter()
.map(|key| {
let tombstone_key = to_tombstone(&key);
let tombstone_key = self.to_tombstone(&key);
(tombstone_key, key)
})
.unzip();
@@ -175,16 +193,18 @@ impl TombstoneManager {
}
/// Deletes tombstones values for the specified `keys`.
pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
///
/// Returns the number of keys that were deleted.
pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
let operations = keys
.iter()
.map(|key| TxnOp::Delete(to_tombstone(key)))
.map(|key| TxnOp::Delete(self.to_tombstone(key)))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(operations);
// Always success.
let _ = self.kv_backend.txn(txn).await?;
Ok(())
Ok(keys.len())
}
}
@@ -194,7 +214,6 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use super::to_tombstone;
use crate::error::Error;
use crate::key::tombstone::TombstoneManager;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -246,7 +265,7 @@ mod tests {
assert!(!kv_backend.exists(b"foo").await.unwrap());
assert_eq!(
kv_backend
.get(&to_tombstone(b"bar"))
.get(&tombstone_manager.to_tombstone(b"bar"))
.await
.unwrap()
.unwrap()
@@ -255,7 +274,7 @@ mod tests {
);
assert_eq!(
kv_backend
.get(&to_tombstone(b"foo"))
.get(&tombstone_manager.to_tombstone(b"foo"))
.await
.unwrap()
.unwrap()
@@ -287,7 +306,7 @@ mod tests {
kv_backend.clone(),
&[MoveValue {
key: b"bar".to_vec(),
dest_key: to_tombstone(b"bar"),
dest_key: tombstone_manager.to_tombstone(b"bar"),
value: b"baz".to_vec(),
}],
)
@@ -364,7 +383,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
@@ -409,7 +428,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
@@ -462,7 +481,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
@@ -502,7 +521,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
@@ -537,7 +556,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();

View File

@@ -54,20 +54,20 @@ impl<T> MemoryKvBackend<T> {
kvs.clear();
}
#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
/// Returns true if the `kvs` is empty.
pub fn is_empty(&self) -> bool {
self.kvs.read().unwrap().is_empty()
}
#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
/// Returns the `kvs`.
pub fn dump(&self) -> BTreeMap<Vec<u8>, Vec<u8>> {
let kvs = self.kvs.read().unwrap();
kvs.clone()
}
#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
/// Returns the length of `kvs`
pub fn len(&self) -> usize {
self.kvs.read().unwrap().len()