From 06bf581cc7b44c00c1740fc41cce29758f47fa4e Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 15 Apr 2026 08:52:29 +0000 Subject: [PATCH] refactor(meta): register operating region roles from region routes Signed-off-by: WenyXu --- src/common/meta/src/ddl/create_table.rs | 8 ++--- .../meta/src/ddl/drop_database/executor.rs | 8 ++--- src/common/meta/src/ddl/drop_table.rs | 8 ++--- src/common/meta/src/rpc/router.rs | 30 +++++++++++++++++++ .../region_migration/open_candidate_region.rs | 3 +- src/meta-srv/src/procedure/repartition.rs | 6 ++-- 6 files changed, 47 insertions(+), 16 deletions(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index b377a60406..650a706296 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -45,7 +45,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock}; use crate::metrics; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::CreateTableTask; -use crate::rpc::router::{RegionRoute, operating_leader_regions}; +use crate::rpc::router::{RegionRoute, operating_leader_region_roles}; pub struct CreateTableProcedure { pub context: DdlContext, @@ -256,17 +256,17 @@ impl CreateTableProcedure { context: &DdlContext, region_routes: &[RegionRoute], ) -> Result> { - let opening_regions = operating_leader_regions(region_routes); + let opening_regions = operating_leader_region_roles(region_routes); if self.opening_regions.len() == opening_regions.len() { return Ok(vec![]); } let mut opening_region_guards = Vec::with_capacity(opening_regions.len()); - for (region_id, datanode_id) in opening_regions { + for (region_id, datanode_id, role) in opening_regions { let guard = context .memory_region_keeper - .register(datanode_id, region_id) + .register_with_role(datanode_id, region_id, role) .context(error::RegionOperatingRaceSnafu { region_id, peer_id: datanode_id, diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 80c730f30a..aa102a0f25 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -29,7 +29,7 @@ use crate::ddl::utils::get_region_wal_options; use crate::error::{self, Result}; use crate::key::table_route::TableRouteValue; use crate::region_keeper::OperatingRegionGuard; -use crate::rpc::router::{RegionRoute, operating_leader_regions}; +use crate::rpc::router::{RegionRoute, operating_leader_region_roles}; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct DropDatabaseExecutor { @@ -69,12 +69,12 @@ impl DropDatabaseExecutor { if !self.dropping_regions.is_empty() { return Ok(()); } - let dropping_regions = operating_leader_regions(&self.physical_region_routes); + let dropping_regions = operating_leader_region_roles(&self.physical_region_routes); let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len()); - for (region_id, datanode_id) in dropping_regions { + for (region_id, datanode_id, role) in dropping_regions { let guard = ddl_ctx .memory_region_keeper - .register(datanode_id, region_id) + .register_with_role(datanode_id, region_id, role) .context(error::RegionOperatingRaceSnafu { region_id, peer_id: datanode_id, diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 8bd7c7155c..4f4e2d6053 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -43,7 +43,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::metrics; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::DropTableTask; -use crate::rpc::router::{RegionRoute, operating_leader_regions}; +use crate::rpc::router::{RegionRoute, operating_leader_region_roles}; pub struct DropTableProcedure { /// The context of procedure runtime. @@ -94,7 +94,7 @@ impl DropTableProcedure { /// Register dropping regions if doesn't exist. fn register_dropping_regions(&mut self) -> Result<()> { - let dropping_regions = operating_leader_regions(&self.data.physical_region_routes); + let dropping_regions = operating_leader_region_roles(&self.data.physical_region_routes); if !self.dropping_regions.is_empty() { return Ok(()); @@ -102,11 +102,11 @@ impl DropTableProcedure { let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len()); - for (region_id, datanode_id) in dropping_regions { + for (region_id, datanode_id, role) in dropping_regions { let guard = self .context .memory_region_keeper - .register(datanode_id, region_id) + .register_with_role(datanode_id, region_id, role) .context(error::RegionOperatingRaceSnafu { region_id, peer_id: datanode_id, diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 92627c1d8f..41e5bb4128 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -23,6 +23,7 @@ use derive_builder::Builder; use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use snafu::OptionExt; +use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::table_name::TableName; @@ -99,6 +100,22 @@ pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, .collect::>() } +/// Returns the operating leader regions with corresponding [DatanodeId] and [RegionRole]. +pub fn operating_leader_region_roles( + region_routes: &[RegionRoute], +) -> Vec<(RegionId, DatanodeId, RegionRole)> { + region_routes + .iter() + .filter_map(|route| { + route + .leader_peer + .as_ref() + .zip(route.leader_region_role()) + .map(|(leader, role)| (route.region.id, leader.id, role)) + }) + .collect::>() +} + /// Returns the HashMap<[RegionNumber], &[Peer]>; /// /// If the region doesn't have a leader peer, the [Region] will be omitted. @@ -342,6 +359,19 @@ impl RegionRoute { matches!(self.leader_state, Some(LeaderState::Staging)) } + /// Returns the role of the leader region. + pub fn leader_region_role(&self) -> Option { + self.leader_peer.as_ref().map(|_| { + if self.is_leader_staging() { + RegionRole::StagingLeader + } else if self.is_leader_downgrading() { + RegionRole::DowngradingLeader + } else { + RegionRole::Leader + } + }) + } + /// Marks the Leader [`Region`] as [`RegionState::Downgrading`]. /// /// We should downgrade a [`Region`] before deactivating it: diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 189ba89449..1d02986b23 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -25,6 +25,7 @@ use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use store_api::region_engine::RegionRole; use tokio::time::Instant; use crate::error::{self, Result}; @@ -129,7 +130,7 @@ impl OpenCandidateRegion { // Registers the opening region. let guard = ctx .opening_region_keeper - .register(candidate.id, *region_id) + .register_with_role(candidate.id, *region_id, RegionRole::Follower) .context(error::RegionOperatingRaceSnafu { peer_id: candidate.id, region_id: *region_id, diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index fc1ece15e2..bbf2ba71aa 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -42,7 +42,7 @@ use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use common_meta::node_manager::NodeManagerRef; use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_meta::region_registry::LeaderRegionRegistryRef; -use common_meta::rpc::router::{RegionRoute, operating_leader_regions}; +use common_meta::rpc::router::{RegionRoute, operating_leader_region_roles}; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, @@ -417,9 +417,9 @@ impl Context { region_routes: &[RegionRoute], ) -> Result> { let mut operating_guards = Vec::with_capacity(region_routes.len()); - for (region_id, datanode_id) in operating_leader_regions(region_routes) { + for (region_id, datanode_id, role) in operating_leader_region_roles(region_routes) { let guard = memory_region_keeper - .register(datanode_id, region_id) + .register_with_role(datanode_id, region_id, role) .context(error::RegionOperatingRaceSnafu { peer_id: datanode_id, region_id,