mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-03 12:30:40 +00:00
refactor: simplify UndropTableTask to use table_id only
Remove catalog, schema, and table name fields from `UndropTableTask` since the table name can be derived from the dropped table metadata in the procedure itself. This eliminates redundant fields and the associated name-validation test. Simplify locking in `UndropTableProcedure` to only use `TableLock`. Update `greptime-proto` dependency revision. - `Cargo.toml`, `Cargo.lock` - `src/common/meta/src/rpc/ddl.rs` - `src/common/meta/src/ddl/undrop_table.rs` - `src/common/meta/src/ddl/tests/drop_table.rs` Signed-off-by: Lei, HUANG <ratuthomm@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5938,7 +5938,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d21d344eb357fd250e0d71b354c8d1b38a077b14#d21d344eb357fd250e0d71b354c8d1b38a077b14"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=013b3c4e6ad96a6acd2d2df80fa619a3a0ea655b#013b3c4e6ad96a6acd2d2df80fa619a3a0ea655b"
|
||||
dependencies = [
|
||||
"prost 0.14.1",
|
||||
"prost-types 0.14.1",
|
||||
|
||||
@@ -158,7 +158,7 @@ fs2 = "0.4"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d21d344eb357fd250e0d71b354c8d1b38a077b14" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "013b3c4e6ad96a6acd2d2df80fa619a3a0ea655b" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -546,10 +546,8 @@ async fn test_undrop_table_restores_metadata_and_reopens_regions() {
|
||||
|
||||
while rx.try_recv().is_ok() {}
|
||||
|
||||
let mut procedure = UndropTableProcedure::new(
|
||||
new_undrop_table_task(table_name, table_id),
|
||||
ddl_context.clone(),
|
||||
);
|
||||
let mut procedure =
|
||||
UndropTableProcedure::new(new_undrop_table_task(table_id), ddl_context.clone());
|
||||
execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
let live_table = ddl_context
|
||||
@@ -641,10 +639,8 @@ async fn test_undrop_logical_table_skips_datanode_open() {
|
||||
|
||||
while rx.try_recv().is_ok() {}
|
||||
|
||||
let mut procedure = UndropTableProcedure::new(
|
||||
new_undrop_table_task(table_name, logical_table_id),
|
||||
ddl_context.clone(),
|
||||
);
|
||||
let mut procedure =
|
||||
UndropTableProcedure::new(new_undrop_table_task(logical_table_id), ddl_context.clone());
|
||||
execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
let live_table = ddl_context
|
||||
@@ -689,7 +685,7 @@ async fn test_undrop_metric_logical_table_fails() {
|
||||
create_metric_logical_table_tombstone(&ddl_context, physical_table_id, "foo").await;
|
||||
|
||||
let mut procedure =
|
||||
UndropTableProcedure::new(new_undrop_table_task("foo", logical_table_id), ddl_context);
|
||||
UndropTableProcedure::new(new_undrop_table_task(logical_table_id), ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
|
||||
assert_eq!(err.status_code(), StatusCode::Unsupported);
|
||||
@@ -748,46 +744,13 @@ async fn test_undrop_table_fails_when_live_name_exists() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut procedure = UndropTableProcedure::new(
|
||||
new_undrop_table_task(table_name, dropped_table_id),
|
||||
ddl_context,
|
||||
);
|
||||
let mut procedure =
|
||||
UndropTableProcedure::new(new_undrop_table_task(dropped_table_id), ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::TableAlreadyExists { .. });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_undrop_table_fails_when_task_name_mismatches_table_id() {
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let mut ddl_context = new_ddl_context(node_manager);
|
||||
ddl_context.soft_drop_enabled = true;
|
||||
let table_id = 1024;
|
||||
let task = test_create_table_task("foo", table_id);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut drop_procedure = DropTableProcedure::new(
|
||||
new_drop_table_task("foo", table_id, false),
|
||||
ddl_context.clone(),
|
||||
);
|
||||
execute_procedure_until_done(&mut drop_procedure).await;
|
||||
|
||||
let mut procedure = UndropTableProcedure::new(
|
||||
new_undrop_table_task("bar", table_id),
|
||||
ddl_context,
|
||||
);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
|
||||
assert_eq!(err.status_code(), StatusCode::TableNotFound);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_purge_dropped_table_drops_regions_and_deletes_tombstone() {
|
||||
let (tx, mut rx) = mpsc::channel(8);
|
||||
@@ -1002,13 +965,8 @@ fn new_drop_table_task(table_name: &str, table_id: TableId, drop_if_exists: bool
|
||||
}
|
||||
}
|
||||
|
||||
fn new_undrop_table_task(table_name: &str, table_id: TableId) -> UndropTableTask {
|
||||
UndropTableTask {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: table_name.to_string(),
|
||||
table_id,
|
||||
}
|
||||
fn new_undrop_table_task(table_id: TableId) -> UndropTableTask {
|
||||
UndropTableTask { table_id }
|
||||
}
|
||||
|
||||
fn new_purge_dropped_table_task(
|
||||
|
||||
@@ -31,7 +31,6 @@ use store_api::storage::{RegionId, RegionNumber};
|
||||
use strum::AsRefStr;
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::ddl::utils::{
|
||||
add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
|
||||
@@ -42,7 +41,7 @@ use crate::error::{self, Result};
|
||||
use crate::instruction::CacheIdent;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
|
||||
use crate::lock_key::TableLock;
|
||||
use crate::rpc::ddl::UndropTableTask;
|
||||
use crate::rpc::router::{
|
||||
RegionRoute, find_follower_regions, find_followers, find_leader_regions, find_leaders,
|
||||
@@ -69,35 +68,24 @@ impl UndropTableProcedure {
|
||||
}
|
||||
|
||||
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
|
||||
let table_ref = self.data.table_ref();
|
||||
ensure!(
|
||||
!self
|
||||
.context
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.exists(TableNameKey::new(
|
||||
table_ref.catalog,
|
||||
table_ref.schema,
|
||||
table_ref.table
|
||||
))
|
||||
.await?,
|
||||
error::TableAlreadyExistsSnafu {
|
||||
table_name: table_ref.to_string()
|
||||
}
|
||||
);
|
||||
|
||||
let dropped_table = self
|
||||
.context
|
||||
.table_metadata_manager
|
||||
.get_dropped_table_by_id(self.data.task.table_id)
|
||||
.await?
|
||||
.with_context(|| error::TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
table_name: self.data.task.table_id.to_string(),
|
||||
})?;
|
||||
let table_name = &dropped_table.table_name;
|
||||
ensure!(
|
||||
dropped_table.table_name == self.data.task.table_name(),
|
||||
error::TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string()
|
||||
!self
|
||||
.context
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.exists(TableNameKey::from(table_name))
|
||||
.await?,
|
||||
error::TableAlreadyExistsSnafu {
|
||||
table_name: table_name.to_string()
|
||||
}
|
||||
);
|
||||
self.data.table_name = Some(dropped_table.table_name.clone());
|
||||
@@ -198,13 +186,7 @@ impl Procedure for UndropTableProcedure {
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
let table_ref = self.data.table_ref();
|
||||
LockKey::new(vec![
|
||||
CatalogLock::Read(table_ref.catalog).into(),
|
||||
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
|
||||
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
|
||||
TableLock::Write(self.data.task.table_id).into(),
|
||||
])
|
||||
LockKey::new(vec![TableLock::Write(self.data.task.table_id).into()])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,10 +276,6 @@ impl UndropTableData {
|
||||
}
|
||||
}
|
||||
|
||||
fn table_ref(&self) -> TableReference<'_> {
|
||||
self.task.table_ref()
|
||||
}
|
||||
|
||||
fn table_name(&self) -> &TableName {
|
||||
self.table_name.as_ref().unwrap()
|
||||
}
|
||||
|
||||
@@ -156,18 +156,8 @@ impl DdlTask {
|
||||
}
|
||||
|
||||
/// Creates a [`DdlTask`] to undrop a table.
|
||||
pub fn new_undrop_table(
|
||||
catalog: String,
|
||||
schema: String,
|
||||
table: String,
|
||||
table_id: TableId,
|
||||
) -> Self {
|
||||
DdlTask::UndropTable(UndropTableTask {
|
||||
catalog,
|
||||
schema,
|
||||
table,
|
||||
table_id,
|
||||
})
|
||||
pub fn new_undrop_table(table_id: TableId) -> Self {
|
||||
DdlTask::UndropTable(UndropTableTask { table_id })
|
||||
}
|
||||
|
||||
/// Creates a [`DdlTask`] to purge a dropped table.
|
||||
@@ -639,30 +629,9 @@ pub struct DropTableTask {
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
|
||||
pub struct UndropTableTask {
|
||||
pub catalog: String,
|
||||
pub schema: String,
|
||||
pub table: String,
|
||||
pub table_id: TableId,
|
||||
}
|
||||
|
||||
impl UndropTableTask {
|
||||
pub fn table_ref(&self) -> TableReference<'_> {
|
||||
TableReference {
|
||||
catalog: &self.catalog,
|
||||
schema: &self.schema,
|
||||
table: &self.table,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_name(&self) -> TableName {
|
||||
TableName {
|
||||
catalog_name: self.catalog.clone(),
|
||||
schema_name: self.schema.clone(),
|
||||
table_name: self.table.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
|
||||
pub struct PurgeDroppedTableTask {
|
||||
pub catalog: String,
|
||||
@@ -694,9 +663,6 @@ impl TryFrom<PbUndropTableTask> for UndropTableTask {
|
||||
|
||||
fn try_from(pb: PbUndropTableTask) -> Result<Self> {
|
||||
Ok(Self {
|
||||
catalog: pb.catalog_name,
|
||||
schema: pb.schema_name,
|
||||
table: pb.table_name,
|
||||
table_id: pb
|
||||
.table_id
|
||||
.context(error::InvalidProtoMsgSnafu {
|
||||
@@ -710,9 +676,6 @@ impl TryFrom<PbUndropTableTask> for UndropTableTask {
|
||||
impl From<UndropTableTask> for PbUndropTableTask {
|
||||
fn from(task: UndropTableTask) -> Self {
|
||||
Self {
|
||||
catalog_name: task.catalog,
|
||||
schema_name: task.schema,
|
||||
table_name: task.table,
|
||||
table_id: Some(api::v1::TableId { id: task.table_id }),
|
||||
}
|
||||
}
|
||||
@@ -1910,12 +1873,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_undrop_table_task_pb_roundtrip() {
|
||||
let expected = UndropTableTask {
|
||||
catalog: "greptime".to_string(),
|
||||
schema: "public".to_string(),
|
||||
table: "foo".to_string(),
|
||||
table_id: 1024,
|
||||
};
|
||||
let expected = UndropTableTask { table_id: 1024 };
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
QueryContext::default(),
|
||||
DdlTask::UndropTable(expected.clone()),
|
||||
@@ -1950,12 +1908,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_undrop_table_task_json_roundtrip() {
|
||||
let task = UndropTableTask {
|
||||
catalog: "greptime".to_string(),
|
||||
schema: "public".to_string(),
|
||||
table: "foo".to_string(),
|
||||
table_id: 1024,
|
||||
};
|
||||
let task = UndropTableTask { table_id: 1024 };
|
||||
|
||||
let output = serde_json::to_vec(&task).unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user