mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 11:52:54 +00:00
fix: the dropping_regions guards should be dropped on procedure done (#3771)
* fix: the `dropping_regions` guards should be dropped on procedure done * fix ci
This commit is contained in:
@@ -165,7 +165,7 @@ mod tests {
|
||||
async fn test_next_without_logical_tables() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
create_physical_table(ddl_context.clone(), 0, "phy").await;
|
||||
create_physical_table(&ddl_context, 0, "phy").await;
|
||||
// It always starts from Logical
|
||||
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
|
||||
let mut ctx = DropDatabaseContext {
|
||||
@@ -199,7 +199,7 @@ mod tests {
|
||||
async fn test_next_with_logical_tables() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
|
||||
let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await;
|
||||
create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric_0").await;
|
||||
// It always starts from Logical
|
||||
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
|
||||
|
||||
@@ -161,7 +161,7 @@ mod tests {
|
||||
async fn test_next_with_physical_table() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
|
||||
let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await;
|
||||
let (_, table_route) = ddl_context
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
@@ -211,7 +211,7 @@ mod tests {
|
||||
async fn test_next_logical_table() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
|
||||
let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await;
|
||||
create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric").await;
|
||||
let logical_table_id = physical_table_id + 1;
|
||||
let (_, table_route) = ddl_context
|
||||
@@ -315,7 +315,7 @@ mod tests {
|
||||
async fn test_next_retryable_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
|
||||
let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await;
|
||||
let (_, table_route) = ddl_context
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
|
||||
@@ -46,7 +46,7 @@ pub struct DropTableProcedure {
|
||||
/// The serializable data.
|
||||
pub data: DropTableData,
|
||||
/// The guards of opening regions.
|
||||
pub dropping_regions: Vec<OperatingRegionGuard>,
|
||||
pub(crate) dropping_regions: Vec<OperatingRegionGuard>,
|
||||
/// The drop table executor.
|
||||
executor: DropTableExecutor,
|
||||
}
|
||||
@@ -153,7 +153,7 @@ impl DropTableProcedure {
|
||||
}
|
||||
|
||||
/// Deletes metadata tombstone.
|
||||
async fn on_delete_metadata_tombstone(&self) -> Result<Status> {
|
||||
async fn on_delete_metadata_tombstone(&mut self) -> Result<Status> {
|
||||
let table_route_value = &TableRouteValue::new(
|
||||
self.data.task.table_id,
|
||||
// Safety: checked
|
||||
@@ -163,6 +163,8 @@ impl DropTableProcedure {
|
||||
self.executor
|
||||
.on_delete_metadata_tombstone(&self.context, table_route_value)
|
||||
.await?;
|
||||
|
||||
self.dropping_regions.clear();
|
||||
Ok(Status::done())
|
||||
}
|
||||
}
|
||||
@@ -266,7 +268,7 @@ impl DropTableData {
|
||||
}
|
||||
|
||||
/// The state of drop table.
|
||||
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
|
||||
#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
|
||||
pub enum DropTableState {
|
||||
/// Prepares to drop the table
|
||||
Prepare,
|
||||
|
||||
@@ -47,7 +47,7 @@ pub async fn create_physical_table_metadata(
|
||||
}
|
||||
|
||||
pub async fn create_physical_table(
|
||||
ddl_context: DdlContext,
|
||||
ddl_context: &DdlContext,
|
||||
cluster_id: ClusterId,
|
||||
name: &str,
|
||||
) -> TableId {
|
||||
@@ -67,7 +67,7 @@ pub async fn create_physical_table(
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
TableRouteValue::Physical(table_route),
|
||||
)
|
||||
@@ -81,7 +81,7 @@ pub async fn create_logical_table(
|
||||
cluster_id: ClusterId,
|
||||
physical_table_id: TableId,
|
||||
table_name: &str,
|
||||
) {
|
||||
) -> TableId {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
let tasks = vec![test_create_logical_table_task(table_name)];
|
||||
@@ -91,6 +91,14 @@ pub async fn create_logical_table(
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let status = procedure.on_create_metadata().await.unwrap();
|
||||
assert_matches!(status, Status::Done { .. });
|
||||
|
||||
let Status::Done {
|
||||
output: Some(output),
|
||||
} = status
|
||||
else {
|
||||
panic!("Unexpected status: {:?}", status);
|
||||
};
|
||||
output.downcast_ref::<Vec<u32>>().unwrap()[0]
|
||||
}
|
||||
|
||||
pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
|
||||
|
||||
@@ -128,9 +128,9 @@ async fn test_on_prepare_different_physical_table() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
|
||||
let phy1_id = create_physical_table(ddl_context.clone(), cluster_id, "phy1").await;
|
||||
let phy1_id = create_physical_table(&ddl_context, cluster_id, "phy1").await;
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy1_id, "table1").await;
|
||||
let phy2_id = create_physical_table(ddl_context.clone(), cluster_id, "phy2").await;
|
||||
let phy2_id = create_physical_table(&ddl_context, cluster_id, "phy2").await;
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy2_id, "table2").await;
|
||||
|
||||
let tasks = vec![
|
||||
@@ -150,7 +150,7 @@ async fn test_on_prepare_logical_table_not_exists() {
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
|
||||
// Creates physical table
|
||||
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
|
||||
let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await;
|
||||
// Creates 3 logical tables
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
|
||||
|
||||
@@ -172,7 +172,7 @@ async fn test_on_prepare() {
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
|
||||
// Creates physical table
|
||||
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
|
||||
let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await;
|
||||
// Creates 3 logical tables
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
|
||||
@@ -196,7 +196,7 @@ async fn test_on_update_metadata() {
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
|
||||
// Creates physical table
|
||||
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
|
||||
let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await;
|
||||
// Creates 3 logical tables
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
|
||||
@@ -233,7 +233,7 @@ async fn test_on_part_duplicate_alter_request() {
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
|
||||
// Creates physical table
|
||||
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
|
||||
let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await;
|
||||
// Creates 3 logical tables
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
|
||||
|
||||
@@ -42,7 +42,7 @@ async fn test_drop_database_with_logical_tables() {
|
||||
.await
|
||||
.unwrap();
|
||||
// Creates physical table
|
||||
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
|
||||
let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await;
|
||||
// Creates 3 logical tables
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
|
||||
@@ -91,7 +91,7 @@ async fn test_drop_database_retryable_error() {
|
||||
.await
|
||||
.unwrap();
|
||||
// Creates physical table
|
||||
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
|
||||
let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await;
|
||||
// Creates 3 logical tables
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
|
||||
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
|
||||
|
||||
@@ -19,17 +19,19 @@ use api::v1::region::{region_request, RegionRequest};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId};
|
||||
use common_procedure_test::MockContextProvider;
|
||||
use common_procedure::{Procedure, Status};
|
||||
use common_procedure_test::{execute_procedure_until_done, new_test_procedure_context};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
|
||||
use crate::ddl::drop_table::DropTableProcedure;
|
||||
use crate::ddl::drop_table::{DropTableProcedure, DropTableState};
|
||||
use crate::ddl::test_util::create_table::test_create_table_task;
|
||||
use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler};
|
||||
use crate::ddl::test_util::{
|
||||
create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task,
|
||||
create_logical_table, create_physical_table, create_physical_table_metadata,
|
||||
test_create_logical_table_task, test_create_physical_table_task,
|
||||
};
|
||||
use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
@@ -58,14 +60,7 @@ async fn test_on_prepare_table_not_exists_err() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let task = DropTableTask {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: "bar".to_string(),
|
||||
table_id,
|
||||
drop_if_exists: false,
|
||||
};
|
||||
|
||||
let task = new_drop_table_task("bar", table_id, false);
|
||||
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_eq!(err.status_code(), StatusCode::TableNotFound);
|
||||
@@ -90,26 +85,12 @@ async fn test_on_prepare_table() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let task = DropTableTask {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: "bar".to_string(),
|
||||
table_id,
|
||||
drop_if_exists: true,
|
||||
};
|
||||
|
||||
let task = new_drop_table_task("bar", table_id, true);
|
||||
// Drop if exists
|
||||
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
|
||||
procedure.on_prepare().await.unwrap();
|
||||
|
||||
let task = DropTableTask {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: table_name.to_string(),
|
||||
table_id,
|
||||
drop_if_exists: false,
|
||||
};
|
||||
|
||||
let task = new_drop_table_task(table_name, table_id, false);
|
||||
// Drop table
|
||||
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context);
|
||||
procedure.on_prepare().await.unwrap();
|
||||
@@ -158,13 +139,7 @@ async fn test_on_datanode_drop_regions() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let task = DropTableTask {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: table_name.to_string(),
|
||||
table_id,
|
||||
drop_if_exists: false,
|
||||
};
|
||||
let task = new_drop_table_task(table_name, table_id, false);
|
||||
// Drop table
|
||||
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context);
|
||||
procedure.on_prepare().await.unwrap();
|
||||
@@ -234,10 +209,7 @@ async fn test_on_rollback() {
|
||||
ddl_context.clone(),
|
||||
);
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
let ctx = new_test_procedure_context();
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let status = procedure.execute(&ctx).await.unwrap();
|
||||
@@ -247,20 +219,10 @@ async fn test_on_rollback() {
|
||||
let expected_kvs = kv_backend.dump();
|
||||
// Drops the physical table
|
||||
{
|
||||
let task = DropTableTask {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: "phy_table".to_string(),
|
||||
table_id: physical_table_id,
|
||||
drop_if_exists: false,
|
||||
};
|
||||
let task = new_drop_table_task("phy_table", physical_table_id, false);
|
||||
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
|
||||
procedure.on_prepare().await.unwrap();
|
||||
procedure.on_delete_metadata().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.rollback(&ctx).await.unwrap();
|
||||
// Rollback again
|
||||
procedure.rollback(&ctx).await.unwrap();
|
||||
@@ -269,23 +231,71 @@ async fn test_on_rollback() {
|
||||
}
|
||||
|
||||
// Drops the logical table
|
||||
let task = DropTableTask {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: "foo".to_string(),
|
||||
table_id: table_ids[0],
|
||||
drop_if_exists: false,
|
||||
};
|
||||
let task = new_drop_table_task("foo", table_ids[0], false);
|
||||
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
|
||||
procedure.on_prepare().await.unwrap();
|
||||
procedure.on_delete_metadata().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.rollback(&ctx).await.unwrap();
|
||||
// Rollback again
|
||||
procedure.rollback(&ctx).await.unwrap();
|
||||
let kvs = kv_backend.dump();
|
||||
assert_eq!(kvs, expected_kvs);
|
||||
}
|
||||
|
||||
fn new_drop_table_task(table_name: &str, table_id: TableId, drop_if_exists: bool) -> DropTableTask {
|
||||
DropTableTask {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: table_name.to_string(),
|
||||
table_id,
|
||||
drop_if_exists,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
|
||||
let cluster_id = 1;
|
||||
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend);
|
||||
|
||||
let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await;
|
||||
let logical_table_id =
|
||||
create_logical_table(ddl_context.clone(), cluster_id, physical_table_id, "s").await;
|
||||
|
||||
let inner_test = |task: DropTableTask| async {
|
||||
let context = new_test_procedure_context();
|
||||
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
|
||||
while !matches!(
|
||||
procedure.execute(&context).await.unwrap(),
|
||||
Status::Done { .. }
|
||||
) {
|
||||
if procedure.data.state == DropTableState::InvalidateTableCache {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that after running to the state `InvalidateTableCache`(just past `DeleteMetadata`),
|
||||
// the dropping regions should be recorded:
|
||||
let guards = &procedure.dropping_regions;
|
||||
assert_eq!(guards.len(), 1);
|
||||
let (datanode_id, region_id) = (0, RegionId::new(physical_table_id, 0));
|
||||
assert_eq!(guards[0].info(), (datanode_id, region_id));
|
||||
assert!(ddl_context
|
||||
.memory_region_keeper
|
||||
.contains(datanode_id, region_id));
|
||||
|
||||
execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Ensure that when run to the end, the dropping regions should be cleared:
|
||||
let guards = &procedure.dropping_regions;
|
||||
assert!(guards.is_empty());
|
||||
assert!(!ddl_context
|
||||
.memory_region_keeper
|
||||
.contains(datanode_id, region_id));
|
||||
};
|
||||
|
||||
inner_test(new_drop_table_task("s", logical_table_id, false)).await;
|
||||
inner_test(new_drop_table_task("t", physical_table_id, false)).await;
|
||||
}
|
||||
|
||||
@@ -258,7 +258,7 @@ pub enum Error {
|
||||
error: Utf8Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Table nod found, table: {}", table_name))]
|
||||
#[snafu(display("Table not found: '{}'", table_name))]
|
||||
TableNotFound {
|
||||
table_name: String,
|
||||
location: Location,
|
||||
|
||||
@@ -114,3 +114,10 @@ pub async fn execute_until_suspended_or_done(
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub fn new_test_procedure_context() -> Context {
|
||||
Context {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user