mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 13:22:57 +00:00
fix: operating region guards should be dropped when procedure is done (#3775)
This commit is contained in:
@@ -271,7 +271,7 @@ impl CreateTableProcedure {
|
|||||||
///
|
///
|
||||||
/// Abort(not-retry):
|
/// Abort(not-retry):
|
||||||
/// - Failed to create table metadata.
|
/// - Failed to create table metadata.
|
||||||
async fn on_create_metadata(&self) -> Result<Status> {
|
async fn on_create_metadata(&mut self) -> Result<Status> {
|
||||||
let table_id = self.table_id();
|
let table_id = self.table_id();
|
||||||
let manager = &self.context.table_metadata_manager;
|
let manager = &self.context.table_metadata_manager;
|
||||||
|
|
||||||
@@ -285,6 +285,7 @@ impl CreateTableProcedure {
|
|||||||
.await?;
|
.await?;
|
||||||
info!("Created table metadata for table {table_id}");
|
info!("Created table metadata for table {table_id}");
|
||||||
|
|
||||||
|
self.creator.opening_regions.clear();
|
||||||
Ok(Status::done_with_output(table_id))
|
Ok(Status::done_with_output(table_id))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -385,7 +386,7 @@ impl TableCreator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
|
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
|
||||||
pub enum CreateTableState {
|
pub enum CreateTableState {
|
||||||
/// Prepares to create the table
|
/// Prepares to create the table
|
||||||
Prepare,
|
Prepare,
|
||||||
|
|||||||
@@ -21,9 +21,12 @@ use api::v1::{ColumnDataType, SemanticType};
|
|||||||
use common_error::ext::ErrorExt;
|
use common_error::ext::ErrorExt;
|
||||||
use common_error::status_code::StatusCode;
|
use common_error::status_code::StatusCode;
|
||||||
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
|
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
|
||||||
use common_procedure_test::MockContextProvider;
|
use common_procedure_test::{
|
||||||
|
execute_procedure_until, execute_procedure_until_done, MockContextProvider,
|
||||||
|
};
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::ddl::create_table::CreateTableProcedure;
|
use crate::ddl::create_table::{CreateTableProcedure, CreateTableState};
|
||||||
use crate::ddl::test_util::columns::TestColumnDefBuilder;
|
use crate::ddl::test_util::columns::TestColumnDefBuilder;
|
||||||
use crate::ddl::test_util::create_table::{
|
use crate::ddl::test_util::create_table::{
|
||||||
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
|
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
|
||||||
@@ -33,8 +36,9 @@ use crate::ddl::test_util::datanode_handler::{
|
|||||||
};
|
};
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::key::table_route::TableRouteValue;
|
use crate::key::table_route::TableRouteValue;
|
||||||
|
use crate::kv_backend::memory::MemoryKvBackend;
|
||||||
use crate::rpc::ddl::CreateTableTask;
|
use crate::rpc::ddl::CreateTableTask;
|
||||||
use crate::test_util::{new_ddl_context, MockDatanodeManager};
|
use crate::test_util::{new_ddl_context, new_ddl_context_with_kv_backend, MockDatanodeManager};
|
||||||
|
|
||||||
fn test_create_table_task(name: &str) -> CreateTableTask {
|
fn test_create_table_task(name: &str) -> CreateTableTask {
|
||||||
let create_table = TestCreateTableExprBuilder::default()
|
let create_table = TestCreateTableExprBuilder::default()
|
||||||
@@ -244,3 +248,39 @@ async fn test_on_create_metadata() {
|
|||||||
let table_id = status.downcast_output_ref::<u32>().unwrap();
|
let table_id = status.downcast_output_ref::<u32>().unwrap();
|
||||||
assert_eq!(*table_id, 1024);
|
assert_eq!(*table_id, 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
|
||||||
|
let cluster_id = 1;
|
||||||
|
|
||||||
|
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||||
|
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||||
|
let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend);
|
||||||
|
|
||||||
|
let task = test_create_table_task("foo");
|
||||||
|
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context.clone());
|
||||||
|
|
||||||
|
execute_procedure_until(&mut procedure, |p| {
|
||||||
|
p.creator.data.state == CreateTableState::CreateMetadata
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Ensure that after running to the state `CreateMetadata`(just past `DatanodeCreateRegions`),
|
||||||
|
// the opening regions should be recorded:
|
||||||
|
let guards = &procedure.creator.opening_regions;
|
||||||
|
assert_eq!(guards.len(), 1);
|
||||||
|
let (datanode_id, region_id) = (0, RegionId::new(procedure.table_id(), 0));
|
||||||
|
assert_eq!(guards[0].info(), (datanode_id, region_id));
|
||||||
|
assert!(ddl_context
|
||||||
|
.memory_region_keeper
|
||||||
|
.contains(datanode_id, region_id));
|
||||||
|
|
||||||
|
execute_procedure_until_done(&mut procedure).await;
|
||||||
|
|
||||||
|
// Ensure that when run to the end, the opening regions should be cleared:
|
||||||
|
let guards = &procedure.creator.opening_regions;
|
||||||
|
assert!(guards.is_empty());
|
||||||
|
assert!(!ddl_context
|
||||||
|
.memory_region_keeper
|
||||||
|
.contains(datanode_id, region_id));
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,8 +19,10 @@ use api::v1::region::{region_request, RegionRequest};
|
|||||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||||
use common_error::ext::ErrorExt;
|
use common_error::ext::ErrorExt;
|
||||||
use common_error::status_code::StatusCode;
|
use common_error::status_code::StatusCode;
|
||||||
use common_procedure::{Procedure, Status};
|
use common_procedure::Procedure;
|
||||||
use common_procedure_test::{execute_procedure_until_done, new_test_procedure_context};
|
use common_procedure_test::{
|
||||||
|
execute_procedure_until, execute_procedure_until_done, new_test_procedure_context,
|
||||||
|
};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
use table::metadata::TableId;
|
use table::metadata::TableId;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
@@ -265,16 +267,11 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
|
|||||||
create_logical_table(ddl_context.clone(), cluster_id, physical_table_id, "s").await;
|
create_logical_table(ddl_context.clone(), cluster_id, physical_table_id, "s").await;
|
||||||
|
|
||||||
let inner_test = |task: DropTableTask| async {
|
let inner_test = |task: DropTableTask| async {
|
||||||
let context = new_test_procedure_context();
|
|
||||||
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
|
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
|
||||||
while !matches!(
|
execute_procedure_until(&mut procedure, |p| {
|
||||||
procedure.execute(&context).await.unwrap(),
|
p.data.state == DropTableState::InvalidateTableCache
|
||||||
Status::Done { .. }
|
})
|
||||||
) {
|
.await;
|
||||||
if procedure.data.state == DropTableState::InvalidateTableCache {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure that after running to the state `InvalidateTableCache`(just past `DeleteMetadata`),
|
// Ensure that after running to the state `InvalidateTableCache`(just past `DeleteMetadata`),
|
||||||
// the dropping regions should be recorded:
|
// the dropping regions should be recorded:
|
||||||
|
|||||||
@@ -121,3 +121,22 @@ pub fn new_test_procedure_context() -> Context {
|
|||||||
provider: Arc::new(MockContextProvider::default()),
|
provider: Arc::new(MockContextProvider::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn execute_procedure_until<P: Procedure>(procedure: &mut P, until: impl Fn(&P) -> bool) {
|
||||||
|
let mut reached = false;
|
||||||
|
let context = new_test_procedure_context();
|
||||||
|
while !matches!(
|
||||||
|
procedure.execute(&context).await.unwrap(),
|
||||||
|
Status::Done { .. }
|
||||||
|
) {
|
||||||
|
if until(procedure) {
|
||||||
|
reached = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!(
|
||||||
|
reached,
|
||||||
|
"procedure '{}' did not reach the expected state",
|
||||||
|
procedure.type_name()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user