From 19a9035f4bb5a32c5f3dcd4eea4e09aa9065af76 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Tue, 23 Apr 2024 14:21:53 +0800 Subject: [PATCH] fix: operating region guards should be dropped when procedure is done (#3775) --- src/common/meta/src/ddl/create_table.rs | 5 +- src/common/meta/src/ddl/tests/create_table.rs | 46 +++++++++++++++++-- src/common/meta/src/ddl/tests/drop_table.rs | 19 ++++---- src/common/procedure-test/src/lib.rs | 19 ++++++++ 4 files changed, 73 insertions(+), 16 deletions(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 0d494cd87b..6204f168a5 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -271,7 +271,7 @@ impl CreateTableProcedure { /// /// Abort(not-retry): /// - Failed to create table metadata. - async fn on_create_metadata(&self) -> Result { + async fn on_create_metadata(&mut self) -> Result { let table_id = self.table_id(); let manager = &self.context.table_metadata_manager; @@ -285,6 +285,7 @@ impl CreateTableProcedure { .await?; info!("Created table metadata for table {table_id}"); + self.creator.opening_regions.clear(); Ok(Status::done_with_output(table_id)) } } @@ -385,7 +386,7 @@ impl TableCreator { } } -#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] +#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] pub enum CreateTableState { /// Prepares to create the table Prepare, diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index f59be01d7d..33e5fd55d5 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -21,9 +21,12 @@ use api::v1::{ColumnDataType, SemanticType}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; -use common_procedure_test::MockContextProvider; +use common_procedure_test::{ + execute_procedure_until, execute_procedure_until_done, MockContextProvider, +}; +use store_api::storage::RegionId; -use crate::ddl::create_table::CreateTableProcedure; +use crate::ddl::create_table::{CreateTableProcedure, CreateTableState}; use crate::ddl::test_util::columns::TestColumnDefBuilder; use crate::ddl::test_util::create_table::{ build_raw_table_info_from_expr, TestCreateTableExprBuilder, @@ -33,8 +36,9 @@ use crate::ddl::test_util::datanode_handler::{ }; use crate::error::Error; use crate::key::table_route::TableRouteValue; +use crate::kv_backend::memory::MemoryKvBackend; use crate::rpc::ddl::CreateTableTask; -use crate::test_util::{new_ddl_context, MockDatanodeManager}; +use crate::test_util::{new_ddl_context, new_ddl_context_with_kv_backend, MockDatanodeManager}; fn test_create_table_task(name: &str) -> CreateTableTask { let create_table = TestCreateTableExprBuilder::default() @@ -244,3 +248,39 @@ async fn test_on_create_metadata() { let table_id = status.downcast_output_ref::().unwrap(); assert_eq!(*table_id, 1024); } + +#[tokio::test] +async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { + let cluster_id = 1; + + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend); + + let task = test_create_table_task("foo"); + let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context.clone()); + + execute_procedure_until(&mut procedure, |p| { + p.creator.data.state == CreateTableState::CreateMetadata + }) + .await; + + // Ensure that after running to the state `CreateMetadata`(just past `DatanodeCreateRegions`), + // the opening regions should be recorded: + let guards = &procedure.creator.opening_regions; + assert_eq!(guards.len(), 1); + let (datanode_id, region_id) = (0, RegionId::new(procedure.table_id(), 0)); + assert_eq!(guards[0].info(), (datanode_id, region_id)); + assert!(ddl_context + .memory_region_keeper + .contains(datanode_id, region_id)); + + execute_procedure_until_done(&mut procedure).await; + + // Ensure that when run to the end, the opening regions should be cleared: + let guards = &procedure.creator.opening_regions; + assert!(guards.is_empty()); + assert!(!ddl_context + .memory_region_keeper + .contains(datanode_id, region_id)); +} diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index 5c86bb765c..26ad458033 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -19,8 +19,10 @@ use api::v1::region::{region_request, RegionRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_procedure::{Procedure, Status}; -use common_procedure_test::{execute_procedure_until_done, new_test_procedure_context}; +use common_procedure::Procedure; +use common_procedure_test::{ + execute_procedure_until, execute_procedure_until_done, new_test_procedure_context, +}; use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::mpsc; @@ -265,16 +267,11 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { create_logical_table(ddl_context.clone(), cluster_id, physical_table_id, "s").await; let inner_test = |task: DropTableTask| async { - let context = new_test_procedure_context(); let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); - while !matches!( - procedure.execute(&context).await.unwrap(), - Status::Done { .. } - ) { - if procedure.data.state == DropTableState::InvalidateTableCache { - break; - } - } + execute_procedure_until(&mut procedure, |p| { + p.data.state == DropTableState::InvalidateTableCache + }) + .await; // Ensure that after running to the state `InvalidateTableCache`(just past `DeleteMetadata`), // the dropping regions should be recorded: diff --git a/src/common/procedure-test/src/lib.rs b/src/common/procedure-test/src/lib.rs index 7b8c2fb230..84d44fa273 100644 --- a/src/common/procedure-test/src/lib.rs +++ b/src/common/procedure-test/src/lib.rs @@ -121,3 +121,22 @@ pub fn new_test_procedure_context() -> Context { provider: Arc::new(MockContextProvider::default()), } } + +pub async fn execute_procedure_until(procedure: &mut P, until: impl Fn(&P) -> bool) { + let mut reached = false; + let context = new_test_procedure_context(); + while !matches!( + procedure.execute(&context).await.unwrap(), + Status::Done { .. } + ) { + if until(procedure) { + reached = true; + break; + } + } + assert!( + reached, + "procedure '{}' did not reach the expected state", + procedure.type_name() + ); +}