diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index b377a60406..650a706296 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -45,7 +45,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock}; use crate::metrics; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::CreateTableTask; -use crate::rpc::router::{RegionRoute, operating_leader_regions}; +use crate::rpc::router::{RegionRoute, operating_leader_region_roles}; pub struct CreateTableProcedure { pub context: DdlContext, @@ -256,17 +256,17 @@ impl CreateTableProcedure { context: &DdlContext, region_routes: &[RegionRoute], ) -> Result> { - let opening_regions = operating_leader_regions(region_routes); + let opening_regions = operating_leader_region_roles(region_routes); if self.opening_regions.len() == opening_regions.len() { return Ok(vec![]); } let mut opening_region_guards = Vec::with_capacity(opening_regions.len()); - for (region_id, datanode_id) in opening_regions { + for (region_id, datanode_id, role) in opening_regions { let guard = context .memory_region_keeper - .register(datanode_id, region_id) + .register_with_role(datanode_id, region_id, role) .context(error::RegionOperatingRaceSnafu { region_id, peer_id: datanode_id, diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 80c730f30a..1c01060c5e 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -29,7 +29,7 @@ use crate::ddl::utils::get_region_wal_options; use crate::error::{self, Result}; use crate::key::table_route::TableRouteValue; use crate::region_keeper::OperatingRegionGuard; -use crate::rpc::router::{RegionRoute, operating_leader_regions}; +use crate::rpc::router::{RegionRoute, operating_leader_region_roles}; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct DropDatabaseExecutor { @@ -69,12 +69,12 @@ impl DropDatabaseExecutor { if !self.dropping_regions.is_empty() { return Ok(()); } - let dropping_regions = operating_leader_regions(&self.physical_region_routes); + let dropping_regions = operating_leader_region_roles(&self.physical_region_routes); let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len()); - for (region_id, datanode_id) in dropping_regions { + for (region_id, datanode_id, role) in dropping_regions { let guard = ddl_ctx .memory_region_keeper - .register(datanode_id, region_id) + .register_with_role(datanode_id, region_id, role) .context(error::RegionOperatingRaceSnafu { region_id, peer_id: datanode_id, @@ -159,6 +159,7 @@ impl State for DropDatabaseExecutor { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::sync::Arc; use api::region::RegionResponse; @@ -167,6 +168,8 @@ mod tests { use common_error::ext::BoxedError; use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; use table::table_name::TableName; use crate::ddl::drop_database::cursor::DropDatabaseCursor; @@ -179,7 +182,7 @@ mod tests { use crate::error::{self, Error, Result}; use crate::key::datanode_table::DatanodeTableKey; use crate::peer::Peer; - use crate::rpc::router::region_distribution; + use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution}; use crate::test_util::{MockDatanodeHandler, MockDatanodeManager, new_ddl_context}; #[derive(Clone)] @@ -423,6 +426,34 @@ mod tests { } } + #[tokio::test] + async fn test_recover_registers_region_role_from_routes() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let region_id = RegionId::new(1024, 1); + let mut state = DropDatabaseExecutor::new( + 1024, + 1024, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(7)), + follower_peers: vec![], + leader_state: Some(LeaderState::Downgrading), + leader_down_since: None, + write_route_policy: None, + }], + DropTableTarget::Physical, + ); + + state.recover(&ddl_context).unwrap(); + + let roles = ddl_context + .memory_region_keeper + .extract_operating_region_roles(7, &HashSet::from([region_id])); + assert_eq!(roles.get(®ion_id), Some(&RegionRole::DowngradingLeader)); + } + #[tokio::test] async fn test_next_remaps_addresses_when_retrying() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 8bd7c7155c..4f4e2d6053 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -43,7 +43,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::metrics; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::DropTableTask; -use crate::rpc::router::{RegionRoute, operating_leader_regions}; +use crate::rpc::router::{RegionRoute, operating_leader_region_roles}; pub struct DropTableProcedure { /// The context of procedure runtime. @@ -94,7 +94,7 @@ impl DropTableProcedure { /// Register dropping regions if doesn't exist. fn register_dropping_regions(&mut self) -> Result<()> { - let dropping_regions = operating_leader_regions(&self.data.physical_region_routes); + let dropping_regions = operating_leader_region_roles(&self.data.physical_region_routes); if !self.dropping_regions.is_empty() { return Ok(()); @@ -102,11 +102,11 @@ impl DropTableProcedure { let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len()); - for (region_id, datanode_id) in dropping_regions { + for (region_id, datanode_id, role) in dropping_regions { let guard = self .context .memory_region_keeper - .register(datanode_id, region_id) + .register_with_role(datanode_id, region_id, role) .context(error::RegionOperatingRaceSnafu { region_id, peer_id: datanode_id, diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index 7f4a6bd716..da91dedcaf 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::assert_matches; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::region::RegionResponse; @@ -30,6 +30,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; +use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use tokio::sync::mpsc; @@ -351,6 +352,10 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { .memory_region_keeper .contains(datanode_id, region_id) ); + let roles = ddl_context + .memory_region_keeper + .extract_operating_region_roles(datanode_id, &HashSet::from([region_id])); + assert_eq!(roles.get(®ion_id), Some(&RegionRole::Leader)); execute_procedure_until_done(&mut procedure).await; diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index ae81bd7f52..c518bb36a2 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::v1::region::{RegionRequest, region_request}; @@ -23,6 +23,7 @@ use common_procedure::Procedure; use common_procedure_test::{ execute_procedure_until, execute_procedure_until_done, new_test_procedure_context, }; +use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::mpsc; @@ -328,6 +329,10 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { .memory_region_keeper .contains(datanode_id, region_id) ); + let roles = ddl_context + .memory_region_keeper + .extract_operating_region_roles(datanode_id, &HashSet::from([region_id])); + assert_eq!(roles.get(®ion_id), Some(&RegionRole::Leader)); execute_procedure_until_done(&mut procedure).await; diff --git a/src/common/meta/src/region_keeper.rs b/src/common/meta/src/region_keeper.rs index 605aaba893..aca0b95838 100644 --- a/src/common/meta/src/region_keeper.rs +++ b/src/common/meta/src/region_keeper.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; +use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use crate::DatanodeId; @@ -24,7 +26,7 @@ use crate::DatanodeId; pub struct OperatingRegionGuard { datanode_id: DatanodeId, region_id: RegionId, - inner: Arc>>, + inner: Arc>>, } impl Drop for OperatingRegionGuard { @@ -50,7 +52,7 @@ pub type MemoryRegionKeeperRef = Arc; /// - Tracks the deleting regions after the corresponding metadata is deleted. #[derive(Debug, Clone, Default)] pub struct MemoryRegionKeeper { - inner: Arc>>, + inner: Arc>>, } impl MemoryRegionKeeper { @@ -59,40 +61,48 @@ impl MemoryRegionKeeper { } /// Returns [OperatingRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist. - pub fn register( + pub fn register_with_role( &self, datanode_id: DatanodeId, region_id: RegionId, + role: RegionRole, ) -> Option { let mut inner = self.inner.write().unwrap(); - if inner.insert((datanode_id, region_id)) { - Some(OperatingRegionGuard { - datanode_id, - region_id, - inner: self.inner.clone(), - }) - } else { - None + match inner.entry((datanode_id, region_id)) { + Entry::Occupied(_) => None, + Entry::Vacant(vacant_entry) => { + vacant_entry.insert(role); + Some(OperatingRegionGuard { + datanode_id, + region_id, + inner: self.inner.clone(), + }) + } } } /// Returns true if the keeper contains a (`datanoe_id`, `region_id`) tuple. pub fn contains(&self, datanode_id: DatanodeId, region_id: RegionId) -> bool { let inner = self.inner.read().unwrap(); - inner.contains(&(datanode_id, region_id)) + inner.contains_key(&(datanode_id, region_id)) } - /// Extracts all operating regions from `region_ids` and returns operating regions. - pub fn extract_operating_regions( + /// Extracts all operating regions with roles from `region_ids`. + pub fn extract_operating_region_roles( &self, datanode_id: DatanodeId, - region_ids: &mut HashSet, - ) -> HashSet { + region_ids: &HashSet, + ) -> HashMap { let inner = self.inner.read().unwrap(); region_ids - .extract_if(|region_id| inner.contains(&(datanode_id, *region_id))) - .collect::>() + .iter() + .filter_map(|region_id| { + inner + .get(&(datanode_id, *region_id)) + .map(|role| (*region_id, *role)) + }) + .collect() } /// Returns number of element in tracking set. @@ -115,8 +125,9 @@ impl MemoryRegionKeeper { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; + use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use crate::region_keeper::MemoryRegionKeeper; @@ -125,20 +136,43 @@ mod tests { fn test_opening_region_keeper() { let keeper = MemoryRegionKeeper::new(); - let guard = keeper.register(1, RegionId::from_u64(1)).unwrap(); - assert!(keeper.register(1, RegionId::from_u64(1)).is_none()); - let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap(); + let guard = keeper + .register_with_role(1, RegionId::from_u64(1), RegionRole::Leader) + .unwrap(); + assert!( + keeper + .register_with_role(1, RegionId::from_u64(1), RegionRole::Leader) + .is_none() + ); + let guard2 = keeper + .register_with_role(1, RegionId::from_u64(2), RegionRole::Follower) + .unwrap(); - let mut regions = HashSet::from([ + let regions = HashSet::from([ RegionId::from_u64(1), RegionId::from_u64(2), RegionId::from_u64(3), ]); - let output = keeper.extract_operating_regions(1, &mut regions); + let output = keeper.extract_operating_region_roles(1, ®ions); assert_eq!(output.len(), 2); - assert!(output.contains(&RegionId::from_u64(1))); - assert!(output.contains(&RegionId::from_u64(2))); + assert!(output.contains_key(&RegionId::from_u64(1))); + assert!(output.contains_key(&RegionId::from_u64(2))); + assert_eq!(keeper.len(), 2); + + let regions = HashSet::from([ + RegionId::from_u64(1), + RegionId::from_u64(2), + RegionId::from_u64(3), + ]); + let output = keeper.extract_operating_region_roles(1, ®ions); + assert_eq!( + output, + HashMap::from([ + (RegionId::from_u64(1), RegionRole::Leader), + (RegionId::from_u64(2), RegionRole::Follower), + ]) + ); assert_eq!(keeper.len(), 2); drop(guard); diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 92627c1d8f..519b12e081 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -23,6 +23,7 @@ use derive_builder::Builder; use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use snafu::OptionExt; +use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::table_name::TableName; @@ -99,6 +100,20 @@ pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, .collect::>() } +/// Returns the operating leader regions with corresponding [DatanodeId] and [RegionRole]. +pub fn operating_leader_region_roles( + region_routes: &[RegionRoute], +) -> Vec<(RegionId, DatanodeId, RegionRole)> { + region_routes + .iter() + .filter_map(|route| { + let role = route.leader_region_role()?; + let leader = route.leader_peer.as_ref()?; + Some((route.region.id, leader.id, role)) + }) + .collect() +} + /// Returns the HashMap<[RegionNumber], &[Peer]>; /// /// If the region doesn't have a leader peer, the [Region] will be omitted. @@ -342,6 +357,19 @@ impl RegionRoute { matches!(self.leader_state, Some(LeaderState::Staging)) } + /// Returns the role of the leader region. + pub fn leader_region_role(&self) -> Option { + self.leader_peer.as_ref().map(|_| { + if self.is_leader_staging() { + RegionRole::StagingLeader + } else if self.is_leader_downgrading() { + RegionRole::DowngradingLeader + } else { + RegionRole::Leader + } + }) + } + /// Marks the Leader [`Region`] as [`RegionState::Downgrading`]. /// /// We should downgrade a [`Region`] before deactivating it: @@ -577,6 +605,17 @@ mod tests { use super::*; use crate::key::RegionRoleSet; + fn new_test_region_route(region_id: RegionId) -> RegionRoute { + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::new(1, "a1")), + follower_peers: vec![Peer::new(2, "a2")], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + } + } + #[test] fn test_leader_is_downgraded() { let mut region_route = RegionRoute { @@ -740,6 +779,65 @@ mod tests { assert!(!region_route.is_ignore_all_writes()); } + #[test] + fn test_leader_region_role_without_leader_peer_returns_none() { + let region_route = RegionRoute { + leader_peer: None, + ..new_test_region_route(RegionId::new(1, 1)) + }; + + assert_eq!(region_route.leader_region_role(), None); + } + + #[test] + fn test_leader_region_role_variants() { + let normal = new_test_region_route(RegionId::new(1, 1)); + let mut downgrading = new_test_region_route(RegionId::new(1, 2)); + downgrading.leader_state = Some(LeaderState::Downgrading); + let mut staging = new_test_region_route(RegionId::new(1, 3)); + staging.leader_state = Some(LeaderState::Staging); + + assert_eq!(normal.leader_region_role(), Some(RegionRole::Leader)); + assert_eq!( + downgrading.leader_region_role(), + Some(RegionRole::DowngradingLeader) + ); + assert_eq!( + staging.leader_region_role(), + Some(RegionRole::StagingLeader) + ); + } + + #[test] + fn test_operating_leader_region_roles_returns_expected_roles() { + let no_leader_region = RegionRoute { + leader_peer: None, + ..new_test_region_route(RegionId::new(1, 4)) + }; + let mut downgrading = new_test_region_route(RegionId::new(1, 2)); + downgrading.leader_peer = Some(Peer::new(2, "a2")); + downgrading.leader_state = Some(LeaderState::Downgrading); + let mut staging = new_test_region_route(RegionId::new(1, 3)); + staging.leader_peer = Some(Peer::new(3, "a3")); + staging.leader_state = Some(LeaderState::Staging); + + let roles = operating_leader_region_roles(&[ + new_test_region_route(RegionId::new(1, 1)), + downgrading, + staging, + no_leader_region, + ]); + + assert_eq!( + roles, + vec![ + (RegionId::new(1, 1), 1, RegionRole::Leader), + (RegionId::new(1, 2), 2, RegionRole::DowngradingLeader), + (RegionId::new(1, 3), 3, RegionRole::StagingLeader), + ] + ); + } + #[test] fn test_region_distribution() { let region_routes = vec![ diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index c6c1d44521..aa26a037e3 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -281,7 +281,7 @@ mod test { let opening_region_id = RegionId::new(table_id, region_number + 2); let _guard = opening_region_keeper - .register(follower_peer.id, opening_region_id) + .register_with_role(follower_peer.id, opening_region_id, RegionRole::Follower) .unwrap(); let acc = &mut HeartbeatAccumulator::default(); diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 189ba89449..0c0e5de5d7 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -25,6 +25,7 @@ use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use store_api::region_engine::RegionRole; use tokio::time::Instant; use crate::error::{self, Result}; @@ -129,7 +130,7 @@ impl OpenCandidateRegion { // Registers the opening region. let guard = ctx .opening_region_keeper - .register(candidate.id, *region_id) + .register_with_role(candidate.id, *region_id, RegionRole::Follower) .context(error::RegionOperatingRaceSnafu { peer_id: candidate.id, region_id: *region_id, @@ -296,7 +297,7 @@ mod tests { let mut ctx = env.context_factory().new_context(persistent_context); let opening_region_keeper = env.opening_region_keeper(); let _guard = opening_region_keeper - .register(to_peer_id, region_id) + .register_with_role(to_peer_id, region_id, RegionRole::Follower) .unwrap(); let open_instruction = new_mock_open_instruction(to_peer_id, region_id); diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index d5aa8699ec..93df5e6e2e 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -231,6 +231,7 @@ mod tests { use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use common_time::util::current_time_millis; + use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use crate::error::Error; @@ -467,7 +468,7 @@ mod tests { }]; let guard = opening_keeper - .register(2, RegionId::new(table_id, 1)) + .register_with_role(2, RegionId::new(table_id, 1), RegionRole::Follower) .unwrap(); ctx.volatile_ctx.opening_region_guards.push(guard); diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index fc1ece15e2..f314a40080 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -42,7 +42,7 @@ use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use common_meta::node_manager::NodeManagerRef; use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_meta::region_registry::LeaderRegionRegistryRef; -use common_meta::rpc::router::{RegionRoute, operating_leader_regions}; +use common_meta::rpc::router::{RegionRoute, operating_leader_region_roles}; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, @@ -417,9 +417,9 @@ impl Context { region_routes: &[RegionRoute], ) -> Result> { let mut operating_guards = Vec::with_capacity(region_routes.len()); - for (region_id, datanode_id) in operating_leader_regions(region_routes) { + for (region_id, datanode_id, role) in operating_leader_region_roles(region_routes) { let guard = memory_region_keeper - .register(datanode_id, region_id) + .register_with_role(datanode_id, region_id, role) .context(error::RegionOperatingRaceSnafu { peer_id: datanode_id, region_id, @@ -837,9 +837,11 @@ mod tests { }; use common_meta::error; use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; + use common_meta::region_keeper::MemoryRegionKeeper; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use common_meta::test_util::MockDatanodeManager; use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState}; + use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::table_name::TableName; use tokio::sync::mpsc; @@ -993,6 +995,59 @@ mod tests { assert!(!procedure.should_rollback_allocated_regions()); } + #[test] + fn test_register_operating_regions_preserves_route_roles() { + let keeper = Arc::new(MemoryRegionKeeper::new()); + let region_routes = vec![ + RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(2)), + follower_peers: vec![], + leader_state: Some(LeaderState::Staging), + leader_down_since: None, + write_route_policy: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 3)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![], + leader_state: Some(LeaderState::Downgrading), + leader_down_since: None, + write_route_policy: None, + }, + ]; + + let _guards = Context::register_operating_regions(&keeper, ®ion_routes).unwrap(); + + let leader_roles = + keeper.extract_operating_region_roles(1, &HashSet::from([RegionId::new(1024, 1)])); + let staging_roles = + keeper.extract_operating_region_roles(2, &HashSet::from([RegionId::new(1024, 2)])); + let downgrading_roles = + keeper.extract_operating_region_roles(3, &HashSet::from([RegionId::new(1024, 3)])); + + assert_eq!( + leader_roles.get(&RegionId::new(1024, 1)), + Some(&RegionRole::Leader) + ); + assert_eq!( + staging_roles.get(&RegionId::new(1024, 2)), + Some(&RegionRole::StagingLeader) + ); + assert_eq!( + downgrading_roles.get(&RegionId::new(1024, 3)), + Some(&RegionRole::DowngradingLeader) + ); + } + #[tokio::test] async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() { let env = TestingEnv::new(); diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index ac9f7d71b9..d4a899d002 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -20,7 +20,7 @@ use common_meta::key::TableMetadataManagerRef; use common_meta::key::table_route::TableRouteValue; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::rpc::router::RegionRoute; -use common_telemetry::warn; +use common_telemetry::{info, warn}; use snafu::ResultExt; use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, TableId}; @@ -63,15 +63,9 @@ fn renew_region_lease_via_region_route( if let Some(leader) = ®ion_route.leader_peer && leader.id == datanode_id { - let region_role = if region_route.is_leader_staging() { - RegionRole::StagingLeader - } else if region_route.is_leader_downgrading() { - RegionRole::DowngradingLeader - } else { - RegionRole::Leader - }; - - return Some((region_id, region_role)); + return region_route + .leader_region_role() + .map(|region_role| (region_id, region_role)); } // If it's a follower region on this datanode. @@ -83,11 +77,28 @@ fn renew_region_lease_via_region_route( return Some((region_id, RegionRole::Follower)); } - warn!( - "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region_routes: {:?}", - region_route - ); - // The region doesn't belong to this datanode. + None +} + +fn renew_region_lease_via_operating_regions( + operating_regions: &HashMap, + datanode_id: DatanodeId, + region_id: RegionId, + reported_role: RegionRole, +) -> Option { + // `operating_regions` is filtered by the current datanode in `collect_metadata`, + // so looking up by `region_id` is sufficient here. + if let Some(role) = operating_regions.get(®ion_id) { + let region_lease_info = RegionLeaseInfo::operating(region_id, *role); + if *role != reported_role { + info!( + "The region {} on datanode {} is operating with role {:?}, but reported as {:?}", + region_id, datanode_id, role, reported_role + ); + } + return Some(region_lease_info); + } + None } @@ -149,49 +160,51 @@ impl RegionLeaseKeeper { } /// Returns [None] if: - /// - The region doesn't belong to the datanode. + /// - The region doesn't belong to the datanode in metadata or operating regions. /// - The region belongs to a logical table. fn renew_region_lease( &self, table_metadata: &HashMap, - operating_regions: &HashSet, + operating_regions: &HashMap, datanode_id: DatanodeId, region_id: RegionId, - role: RegionRole, + reported_role: RegionRole, ) -> Option { - if operating_regions.contains(®ion_id) { - let region_lease_info = RegionLeaseInfo::operating(region_id, role); + // First try to renew via region route + if let Some(table_route) = table_metadata.get(®ion_id.table_id()) + && let Ok(Some(region_route)) = table_route.region_route(region_id) + && let Some(region_lease) = + renew_region_lease_via_region_route(®ion_route, datanode_id, region_id) + { + return Some(RegionLeaseInfo::from(region_lease)); + } + // Then try to renew via operating regions, which covers the opening region without region route in metadata. + if let Some(region_lease_info) = renew_region_lease_via_operating_regions( + operating_regions, + datanode_id, + region_id, + reported_role, + ) { return Some(region_lease_info); } - if let Some(table_route) = table_metadata.get(®ion_id.table_id()) { - if let Ok(Some(region_route)) = table_route.region_route(region_id) { - return renew_region_lease_via_region_route(®ion_route, datanode_id, region_id) - .map(RegionLeaseInfo::from); - } else { - warn!( - "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region route is not found in table({})", - region_id.table_id() - ); - } - } else { - warn!( - "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, table({}) is not found", - region_id.table_id() - ); - } + warn!( + "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, no matching metadata or operating region found", + ); None } async fn collect_metadata( &self, datanode_id: DatanodeId, - mut region_ids: HashSet, - ) -> Result<(HashMap, HashSet)> { - // Filters out operating region first, improves the cache hit rate(reduce expensive remote fetches). + region_ids: HashSet, + ) -> Result<( + HashMap, + HashMap, + )> { let operating_regions = self .memory_region_keeper - .extract_operating_regions(datanode_id, &mut region_ids); + .extract_operating_region_roles(datanode_id, ®ion_ids); let table_ids = region_ids .into_iter() .map(|region_id| region_id.table_id()) @@ -224,13 +237,13 @@ impl RegionLeaseKeeper { let mut renewed = HashMap::new(); let mut non_exists = HashSet::new(); - for &(region, role) in regions { + for &(region, reported_role) in regions { match self.renew_region_lease( &table_metadata, &operating_regions, datanode_id, region, - role, + reported_role, ) { Some(region_lease_info) => { renewed.insert(region_lease_info.region_id, region_lease_info); @@ -376,12 +389,16 @@ mod tests { let opening_region_id = RegionId::new(1025, 1); let _guard = keeper .memory_region_keeper - .register(leader_peer_id, opening_region_id) + .register_with_role(leader_peer_id, opening_region_id, RegionRole::Leader) .unwrap(); let another_opening_region_id = RegionId::new(1025, 2); let _guard2 = keeper .memory_region_keeper - .register(follower_peer_id, another_opening_region_id) + .register_with_role( + follower_peer_id, + another_opening_region_id, + RegionRole::Follower, + ) .unwrap(); let (metadata, regions) = keeper @@ -395,8 +412,10 @@ mod tests { metadata.keys().cloned().collect::>(), vec![region_id.table_id()] ); - assert!(regions.contains(&opening_region_id)); - assert_eq!(regions.len(), 1); + assert_eq!( + regions, + HashMap::from([(opening_region_id, RegionRole::Leader)]) + ); } #[tokio::test] @@ -481,17 +500,17 @@ mod tests { let opening_region_id = RegionId::new(2048, 1); let _guard = keeper .memory_region_keeper - .register(leader_peer_id, opening_region_id) + .register_with_role(leader_peer_id, opening_region_id, RegionRole::Leader) .unwrap(); // The opening region on the datanode. // NOTES: The procedure lock will ensure only one opening leader. - for role in [RegionRole::Leader, RegionRole::Follower] { + for reported_role in [RegionRole::Leader, RegionRole::Follower] { let RenewRegionLeasesResponse { non_exists, renewed, } = keeper - .renew_region_leases(leader_peer_id, &[(opening_region_id, role)]) + .renew_region_leases(leader_peer_id, &[(opening_region_id, reported_role)]) .await .unwrap(); @@ -500,7 +519,7 @@ mod tests { renewed, HashMap::from([( opening_region_id, - RegionLeaseInfo::operating(opening_region_id, role) + RegionLeaseInfo::operating(opening_region_id, RegionRole::Leader) )]) ); } @@ -676,14 +695,80 @@ mod tests { } #[tokio::test] - async fn test_renew_region_leases_operating_region_preserves_reported_role() { - let keeper = new_test_keeper(); + async fn test_renew_region_leases_metadata_role_beats_keeper_role() { + let table_id = 2048; + let table_info: TableInfo = new_test_table_info(table_id); + let datanode_id = 1024; - let region_id = RegionId::new(2048, 1); + let region_id = RegionId::new(table_id, 1); + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(datanode_id)) + .build() + .unwrap(); + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(vec![region_route]), + HashMap::default(), + ) + .await + .unwrap(); let _guard = keeper .memory_region_keeper - .register(datanode_id, region_id) + .register_with_role(datanode_id, region_id, RegionRole::Follower) + .unwrap(); + + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(datanode_id, &[(region_id, RegionRole::Follower)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!( + renewed, + HashMap::from([( + region_id, + RegionLeaseInfo::from((region_id, RegionRole::Leader)) + )]) + ); + } + + #[tokio::test] + async fn test_renew_region_leases_missing_route_falls_back_to_keeper_role() { + let table_id = 2048; + let table_info: TableInfo = new_test_table_info(table_id); + + let datanode_id = 1024; + let region_id = RegionId::new(table_id, 1); + let another_region_id = RegionId::new(table_id, 2); + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(another_region_id)) + .leader_peer(Peer::empty(datanode_id)) + .build() + .unwrap(); + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(vec![region_route]), + HashMap::default(), + ) + .await + .unwrap(); + + let _guard = keeper + .memory_region_keeper + .register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader) .unwrap(); let RenewRegionLeasesResponse { @@ -699,7 +784,36 @@ mod tests { renewed, HashMap::from([( region_id, - RegionLeaseInfo::operating(region_id, RegionRole::StagingLeader) + RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader) + )]) + ); + } + + #[tokio::test] + async fn test_renew_region_leases_operating_region_uses_keeper_role() { + let keeper = new_test_keeper(); + let datanode_id = 1024; + let region_id = RegionId::new(2048, 1); + + let _guard = keeper + .memory_region_keeper + .register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader) + .unwrap(); + + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!( + renewed, + HashMap::from([( + region_id, + RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader) )]) ); }