diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index aa102a0f25..a14d23deb5 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -159,6 +159,7 @@ impl State for DropDatabaseExecutor { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::sync::Arc; use api::region::RegionResponse; @@ -167,6 +168,8 @@ mod tests { use common_error::ext::BoxedError; use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; use table::table_name::TableName; use crate::ddl::drop_database::cursor::DropDatabaseCursor; @@ -179,7 +182,7 @@ mod tests { use crate::error::{self, Error, Result}; use crate::key::datanode_table::DatanodeTableKey; use crate::peer::Peer; - use crate::rpc::router::region_distribution; + use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution}; use crate::test_util::{MockDatanodeHandler, MockDatanodeManager, new_ddl_context}; #[derive(Clone)] @@ -423,6 +426,34 @@ mod tests { } } + #[tokio::test] + async fn test_recover_registers_region_role_from_routes() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let region_id = RegionId::new(1024, 1); + let mut state = DropDatabaseExecutor::new( + 1024, + 1024, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(7)), + follower_peers: vec![], + leader_state: Some(LeaderState::Downgrading), + leader_down_since: None, + write_route_policy: None, + }], + DropTableTarget::Physical, + ); + + state.recover(&ddl_context).unwrap(); + + let roles = ddl_context + .memory_region_keeper + .extract_operating_region_roles(7, &mut HashSet::from([region_id])); + assert_eq!(roles.get(®ion_id), Some(&RegionRole::DowngradingLeader)); + } + #[tokio::test] async fn test_next_remaps_addresses_when_retrying() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index 7f4a6bd716..29dd120b9f 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::assert_matches; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::region::RegionResponse; @@ -30,6 +30,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; +use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use tokio::sync::mpsc; @@ -351,6 +352,10 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { .memory_region_keeper .contains(datanode_id, region_id) ); + let roles = ddl_context + .memory_region_keeper + .extract_operating_region_roles(datanode_id, &mut HashSet::from([region_id])); + assert_eq!(roles.get(®ion_id), Some(&RegionRole::Leader)); execute_procedure_until_done(&mut procedure).await; diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index ae81bd7f52..ae08d8b689 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::v1::region::{RegionRequest, region_request}; @@ -23,6 +23,7 @@ use common_procedure::Procedure; use common_procedure_test::{ execute_procedure_until, execute_procedure_until_done, new_test_procedure_context, }; +use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::mpsc; @@ -328,6 +329,10 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { .memory_region_keeper .contains(datanode_id, region_id) ); + let roles = ddl_context + .memory_region_keeper + .extract_operating_region_roles(datanode_id, &mut HashSet::from([region_id])); + assert_eq!(roles.get(®ion_id), Some(&RegionRole::Leader)); execute_procedure_until_done(&mut procedure).await; diff --git a/src/common/meta/src/region_keeper.rs b/src/common/meta/src/region_keeper.rs index 6e14c61aae..55116b15e9 100644 --- a/src/common/meta/src/region_keeper.rs +++ b/src/common/meta/src/region_keeper.rs @@ -85,18 +85,6 @@ impl MemoryRegionKeeper { inner.contains_key(&(datanode_id, region_id)) } - /// Extracts all operating regions from `region_ids` and returns operating regions. - pub fn extract_operating_regions( - &self, - datanode_id: DatanodeId, - region_ids: &mut HashSet, - ) -> HashSet { - let inner = self.inner.read().unwrap(); - region_ids - .extract_if(|region_id| inner.contains_key(&(datanode_id, *region_id))) - .collect() - } - /// Extracts all operating regions with roles from `region_ids`. pub fn extract_operating_region_roles( &self, @@ -104,12 +92,8 @@ impl MemoryRegionKeeper { region_ids: &mut HashSet, ) -> HashMap { let inner = self.inner.read().unwrap(); - let operating_regions = region_ids + region_ids .extract_if(|region_id| inner.contains_key(&(datanode_id, *region_id))) - .collect::>(); - - operating_regions - .into_iter() .map(|region_id| { let role = *inner .get(&(datanode_id, region_id)) @@ -167,11 +151,11 @@ mod tests { RegionId::from_u64(2), RegionId::from_u64(3), ]); - let output = keeper.extract_operating_regions(1, &mut regions); + let output = keeper.extract_operating_region_roles(1, &mut regions); assert_eq!(output.len(), 2); - assert!(output.contains(&RegionId::from_u64(1))); - assert!(output.contains(&RegionId::from_u64(2))); + assert!(output.contains_key(&RegionId::from_u64(1))); + assert!(output.contains_key(&RegionId::from_u64(2))); assert_eq!(regions, HashSet::from([RegionId::from_u64(3)])); assert_eq!(keeper.len(), 2); diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index bbf2ba71aa..7ee9137f5a 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -837,9 +837,11 @@ mod tests { }; use common_meta::error; use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; + use common_meta::region_keeper::MemoryRegionKeeper; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use common_meta::test_util::MockDatanodeManager; use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState}; + use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::table_name::TableName; use tokio::sync::mpsc; @@ -993,6 +995,59 @@ mod tests { assert!(!procedure.should_rollback_allocated_regions()); } + #[test] + fn test_register_operating_regions_preserves_route_roles() { + let keeper = Arc::new(MemoryRegionKeeper::new()); + let region_routes = vec![ + RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(2)), + follower_peers: vec![], + leader_state: Some(LeaderState::Staging), + leader_down_since: None, + write_route_policy: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 3)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![], + leader_state: Some(LeaderState::Downgrading), + leader_down_since: None, + write_route_policy: None, + }, + ]; + + let _guards = Context::register_operating_regions(&keeper, ®ion_routes).unwrap(); + + let leader_roles = + keeper.extract_operating_region_roles(1, &mut HashSet::from([RegionId::new(1024, 1)])); + let staging_roles = + keeper.extract_operating_region_roles(2, &mut HashSet::from([RegionId::new(1024, 2)])); + let downgrading_roles = + keeper.extract_operating_region_roles(3, &mut HashSet::from([RegionId::new(1024, 3)])); + + assert_eq!( + leader_roles.get(&RegionId::new(1024, 1)), + Some(&RegionRole::Leader) + ); + assert_eq!( + staging_roles.get(&RegionId::new(1024, 2)), + Some(&RegionRole::StagingLeader) + ); + assert_eq!( + downgrading_roles.get(&RegionId::new(1024, 3)), + Some(&RegionRole::DowngradingLeader) + ); + } + #[tokio::test] async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() { let env = TestingEnv::new();