mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 12:00:40 +00:00
refactor(meta): register operating region roles from region routes
Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -45,7 +45,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
|
||||
use crate::metrics;
|
||||
use crate::region_keeper::OperatingRegionGuard;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_regions};
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
|
||||
|
||||
pub struct CreateTableProcedure {
|
||||
pub context: DdlContext,
|
||||
@@ -256,17 +256,17 @@ impl CreateTableProcedure {
|
||||
context: &DdlContext,
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<Vec<OperatingRegionGuard>> {
|
||||
let opening_regions = operating_leader_regions(region_routes);
|
||||
let opening_regions = operating_leader_region_roles(region_routes);
|
||||
if self.opening_regions.len() == opening_regions.len() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
|
||||
|
||||
for (region_id, datanode_id) in opening_regions {
|
||||
for (region_id, datanode_id, role) in opening_regions {
|
||||
let guard = context
|
||||
.memory_region_keeper
|
||||
.register(datanode_id, region_id)
|
||||
.register_with_role(datanode_id, region_id, role)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
region_id,
|
||||
peer_id: datanode_id,
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::ddl::utils::get_region_wal_options;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::region_keeper::OperatingRegionGuard;
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_regions};
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct DropDatabaseExecutor {
|
||||
@@ -69,12 +69,12 @@ impl DropDatabaseExecutor {
|
||||
if !self.dropping_regions.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let dropping_regions = operating_leader_regions(&self.physical_region_routes);
|
||||
let dropping_regions = operating_leader_region_roles(&self.physical_region_routes);
|
||||
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
|
||||
for (region_id, datanode_id) in dropping_regions {
|
||||
for (region_id, datanode_id, role) in dropping_regions {
|
||||
let guard = ddl_ctx
|
||||
.memory_region_keeper
|
||||
.register(datanode_id, region_id)
|
||||
.register_with_role(datanode_id, region_id, role)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
region_id,
|
||||
peer_id: datanode_id,
|
||||
|
||||
@@ -43,7 +43,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
|
||||
use crate::metrics;
|
||||
use crate::region_keeper::OperatingRegionGuard;
|
||||
use crate::rpc::ddl::DropTableTask;
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_regions};
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
|
||||
|
||||
pub struct DropTableProcedure {
|
||||
/// The context of procedure runtime.
|
||||
@@ -94,7 +94,7 @@ impl DropTableProcedure {
|
||||
|
||||
/// Register dropping regions if doesn't exist.
|
||||
fn register_dropping_regions(&mut self) -> Result<()> {
|
||||
let dropping_regions = operating_leader_regions(&self.data.physical_region_routes);
|
||||
let dropping_regions = operating_leader_region_roles(&self.data.physical_region_routes);
|
||||
|
||||
if !self.dropping_regions.is_empty() {
|
||||
return Ok(());
|
||||
@@ -102,11 +102,11 @@ impl DropTableProcedure {
|
||||
|
||||
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
|
||||
|
||||
for (region_id, datanode_id) in dropping_regions {
|
||||
for (region_id, datanode_id, role) in dropping_regions {
|
||||
let guard = self
|
||||
.context
|
||||
.memory_region_keeper
|
||||
.register(datanode_id, region_id)
|
||||
.register_with_role(datanode_id, region_id, role)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
region_id,
|
||||
peer_id: datanode_id,
|
||||
|
||||
@@ -23,6 +23,7 @@ use derive_builder::Builder;
|
||||
use serde::ser::SerializeSeq;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use snafu::OptionExt;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use strum::AsRefStr;
|
||||
use table::table_name::TableName;
|
||||
@@ -99,6 +100,22 @@ pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId,
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Returns the operating leader regions with corresponding [DatanodeId] and [RegionRole].
|
||||
pub fn operating_leader_region_roles(
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Vec<(RegionId, DatanodeId, RegionRole)> {
|
||||
region_routes
|
||||
.iter()
|
||||
.filter_map(|route| {
|
||||
route
|
||||
.leader_peer
|
||||
.as_ref()
|
||||
.zip(route.leader_region_role())
|
||||
.map(|(leader, role)| (route.region.id, leader.id, role))
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Returns the HashMap<[RegionNumber], &[Peer]>;
|
||||
///
|
||||
/// If the region doesn't have a leader peer, the [Region] will be omitted.
|
||||
@@ -342,6 +359,19 @@ impl RegionRoute {
|
||||
matches!(self.leader_state, Some(LeaderState::Staging))
|
||||
}
|
||||
|
||||
/// Returns the role of the leader region.
|
||||
pub fn leader_region_role(&self) -> Option<RegionRole> {
|
||||
self.leader_peer.as_ref().map(|_| {
|
||||
if self.is_leader_staging() {
|
||||
RegionRole::StagingLeader
|
||||
} else if self.is_leader_downgrading() {
|
||||
RegionRole::DowngradingLeader
|
||||
} else {
|
||||
RegionRole::Leader
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Marks the Leader [`Region`] as [`RegionState::Downgrading`].
|
||||
///
|
||||
/// We should downgrade a [`Region`] before deactivating it:
|
||||
|
||||
@@ -25,6 +25,7 @@ use common_telemetry::info;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::region_engine::RegionRole;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -129,7 +130,7 @@ impl OpenCandidateRegion {
|
||||
// Registers the opening region.
|
||||
let guard = ctx
|
||||
.opening_region_keeper
|
||||
.register(candidate.id, *region_id)
|
||||
.register_with_role(candidate.id, *region_id, RegionRole::Follower)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
peer_id: candidate.id,
|
||||
region_id: *region_id,
|
||||
|
||||
@@ -42,7 +42,7 @@ use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
|
||||
use common_meta::region_registry::LeaderRegionRegistryRef;
|
||||
use common_meta::rpc::router::{RegionRoute, operating_leader_regions};
|
||||
use common_meta::rpc::router::{RegionRoute, operating_leader_region_roles};
|
||||
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
|
||||
use common_procedure::{
|
||||
BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
@@ -417,9 +417,9 @@ impl Context {
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<Vec<OperatingRegionGuard>> {
|
||||
let mut operating_guards = Vec::with_capacity(region_routes.len());
|
||||
for (region_id, datanode_id) in operating_leader_regions(region_routes) {
|
||||
for (region_id, datanode_id, role) in operating_leader_region_roles(region_routes) {
|
||||
let guard = memory_region_keeper
|
||||
.register(datanode_id, region_id)
|
||||
.register_with_role(datanode_id, region_id, role)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
peer_id: datanode_id,
|
||||
region_id,
|
||||
|
||||
Reference in New Issue
Block a user