mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-16 04:50:38 +00:00
fix(meta): renew operating region leases from keeper roles (#7971)
* refactor(meta): store operating region roles in memory keeper Signed-off-by: WenyXu <wenymedia@gmail.com> * refactor(meta): register operating region roles from region routes Signed-off-by: WenyXu <wenymedia@gmail.com> * refactor(meta): require explicit operating region roles Signed-off-by: WenyXu <wenymedia@gmail.com> * fix(meta): renew operating region leases from keeper roles Signed-off-by: WenyXu <wenymedia@gmail.com> * test(common-meta): cover region route role helpers Signed-off-by: WenyXu <wenymedia@gmail.com> * test(meta): cover operating region role propagation Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> * fix(meta): prefer metadata roles for region lease renewal Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -45,7 +45,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
|
||||
use crate::metrics;
|
||||
use crate::region_keeper::OperatingRegionGuard;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_regions};
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
|
||||
|
||||
pub struct CreateTableProcedure {
|
||||
pub context: DdlContext,
|
||||
@@ -256,17 +256,17 @@ impl CreateTableProcedure {
|
||||
context: &DdlContext,
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<Vec<OperatingRegionGuard>> {
|
||||
let opening_regions = operating_leader_regions(region_routes);
|
||||
let opening_regions = operating_leader_region_roles(region_routes);
|
||||
if self.opening_regions.len() == opening_regions.len() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
|
||||
|
||||
for (region_id, datanode_id) in opening_regions {
|
||||
for (region_id, datanode_id, role) in opening_regions {
|
||||
let guard = context
|
||||
.memory_region_keeper
|
||||
.register(datanode_id, region_id)
|
||||
.register_with_role(datanode_id, region_id, role)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
region_id,
|
||||
peer_id: datanode_id,
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::ddl::utils::get_region_wal_options;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::region_keeper::OperatingRegionGuard;
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_regions};
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct DropDatabaseExecutor {
|
||||
@@ -69,12 +69,12 @@ impl DropDatabaseExecutor {
|
||||
if !self.dropping_regions.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let dropping_regions = operating_leader_regions(&self.physical_region_routes);
|
||||
let dropping_regions = operating_leader_region_roles(&self.physical_region_routes);
|
||||
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
|
||||
for (region_id, datanode_id) in dropping_regions {
|
||||
for (region_id, datanode_id, role) in dropping_regions {
|
||||
let guard = ddl_ctx
|
||||
.memory_region_keeper
|
||||
.register(datanode_id, region_id)
|
||||
.register_with_role(datanode_id, region_id, role)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
region_id,
|
||||
peer_id: datanode_id,
|
||||
@@ -159,6 +159,7 @@ impl State for DropDatabaseExecutor {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
@@ -167,6 +168,8 @@ mod tests {
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
|
||||
@@ -179,7 +182,7 @@ mod tests {
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::key::datanode_table::DatanodeTableKey;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::router::region_distribution;
|
||||
use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
|
||||
use crate::test_util::{MockDatanodeHandler, MockDatanodeManager, new_ddl_context};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -423,6 +426,34 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_recover_registers_region_role_from_routes() {
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(node_manager);
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let mut state = DropDatabaseExecutor::new(
|
||||
1024,
|
||||
1024,
|
||||
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
|
||||
vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(7)),
|
||||
follower_peers: vec![],
|
||||
leader_state: Some(LeaderState::Downgrading),
|
||||
leader_down_since: None,
|
||||
write_route_policy: None,
|
||||
}],
|
||||
DropTableTarget::Physical,
|
||||
);
|
||||
|
||||
state.recover(&ddl_context).unwrap();
|
||||
|
||||
let roles = ddl_context
|
||||
.memory_region_keeper
|
||||
.extract_operating_region_roles(7, &HashSet::from([region_id]));
|
||||
assert_eq!(roles.get(®ion_id), Some(&RegionRole::DowngradingLeader));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_next_remaps_addresses_when_retrying() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
|
||||
|
||||
@@ -43,7 +43,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
|
||||
use crate::metrics;
|
||||
use crate::region_keeper::OperatingRegionGuard;
|
||||
use crate::rpc::ddl::DropTableTask;
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_regions};
|
||||
use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
|
||||
|
||||
pub struct DropTableProcedure {
|
||||
/// The context of procedure runtime.
|
||||
@@ -94,7 +94,7 @@ impl DropTableProcedure {
|
||||
|
||||
/// Register dropping regions if doesn't exist.
|
||||
fn register_dropping_regions(&mut self) -> Result<()> {
|
||||
let dropping_regions = operating_leader_regions(&self.data.physical_region_routes);
|
||||
let dropping_regions = operating_leader_region_roles(&self.data.physical_region_routes);
|
||||
|
||||
if !self.dropping_regions.is_empty() {
|
||||
return Ok(());
|
||||
@@ -102,11 +102,11 @@ impl DropTableProcedure {
|
||||
|
||||
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
|
||||
|
||||
for (region_id, datanode_id) in dropping_regions {
|
||||
for (region_id, datanode_id, role) in dropping_regions {
|
||||
let guard = self
|
||||
.context
|
||||
.memory_region_keeper
|
||||
.register(datanode_id, region_id)
|
||||
.register_with_role(datanode_id, region_id, role)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
region_id,
|
||||
peer_id: datanode_id,
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
@@ -30,6 +30,7 @@ use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
@@ -351,6 +352,10 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
|
||||
.memory_region_keeper
|
||||
.contains(datanode_id, region_id)
|
||||
);
|
||||
let roles = ddl_context
|
||||
.memory_region_keeper
|
||||
.extract_operating_region_roles(datanode_id, &HashSet::from([region_id]));
|
||||
assert_eq!(roles.get(®ion_id), Some(&RegionRole::Leader));
|
||||
|
||||
execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::region::{RegionRequest, region_request};
|
||||
@@ -23,6 +23,7 @@ use common_procedure::Procedure;
|
||||
use common_procedure_test::{
|
||||
execute_procedure_until, execute_procedure_until_done, new_test_procedure_context,
|
||||
};
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -328,6 +329,10 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
|
||||
.memory_region_keeper
|
||||
.contains(datanode_id, region_id)
|
||||
);
|
||||
let roles = ddl_context
|
||||
.memory_region_keeper
|
||||
.extract_operating_region_roles(datanode_id, &HashSet::from([region_id]));
|
||||
assert_eq!(roles.get(®ion_id), Some(&RegionRole::Leader));
|
||||
|
||||
execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
|
||||
@@ -12,9 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::DatanodeId;
|
||||
@@ -24,7 +26,7 @@ use crate::DatanodeId;
|
||||
pub struct OperatingRegionGuard {
|
||||
datanode_id: DatanodeId,
|
||||
region_id: RegionId,
|
||||
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
|
||||
inner: Arc<RwLock<HashMap<(DatanodeId, RegionId), RegionRole>>>,
|
||||
}
|
||||
|
||||
impl Drop for OperatingRegionGuard {
|
||||
@@ -50,7 +52,7 @@ pub type MemoryRegionKeeperRef = Arc<MemoryRegionKeeper>;
|
||||
/// - Tracks the deleting regions after the corresponding metadata is deleted.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct MemoryRegionKeeper {
|
||||
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
|
||||
inner: Arc<RwLock<HashMap<(DatanodeId, RegionId), RegionRole>>>,
|
||||
}
|
||||
|
||||
impl MemoryRegionKeeper {
|
||||
@@ -59,40 +61,48 @@ impl MemoryRegionKeeper {
|
||||
}
|
||||
|
||||
/// Returns [OperatingRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist.
|
||||
pub fn register(
|
||||
pub fn register_with_role(
|
||||
&self,
|
||||
datanode_id: DatanodeId,
|
||||
region_id: RegionId,
|
||||
role: RegionRole,
|
||||
) -> Option<OperatingRegionGuard> {
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
|
||||
if inner.insert((datanode_id, region_id)) {
|
||||
Some(OperatingRegionGuard {
|
||||
datanode_id,
|
||||
region_id,
|
||||
inner: self.inner.clone(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
match inner.entry((datanode_id, region_id)) {
|
||||
Entry::Occupied(_) => None,
|
||||
Entry::Vacant(vacant_entry) => {
|
||||
vacant_entry.insert(role);
|
||||
Some(OperatingRegionGuard {
|
||||
datanode_id,
|
||||
region_id,
|
||||
inner: self.inner.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the keeper contains a (`datanoe_id`, `region_id`) tuple.
|
||||
pub fn contains(&self, datanode_id: DatanodeId, region_id: RegionId) -> bool {
|
||||
let inner = self.inner.read().unwrap();
|
||||
inner.contains(&(datanode_id, region_id))
|
||||
inner.contains_key(&(datanode_id, region_id))
|
||||
}
|
||||
|
||||
/// Extracts all operating regions from `region_ids` and returns operating regions.
|
||||
pub fn extract_operating_regions(
|
||||
/// Extracts all operating regions with roles from `region_ids`.
|
||||
pub fn extract_operating_region_roles(
|
||||
&self,
|
||||
datanode_id: DatanodeId,
|
||||
region_ids: &mut HashSet<RegionId>,
|
||||
) -> HashSet<RegionId> {
|
||||
region_ids: &HashSet<RegionId>,
|
||||
) -> HashMap<RegionId, RegionRole> {
|
||||
let inner = self.inner.read().unwrap();
|
||||
region_ids
|
||||
.extract_if(|region_id| inner.contains(&(datanode_id, *region_id)))
|
||||
.collect::<HashSet<_>>()
|
||||
.iter()
|
||||
.filter_map(|region_id| {
|
||||
inner
|
||||
.get(&(datanode_id, *region_id))
|
||||
.map(|role| (*region_id, *role))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns number of element in tracking set.
|
||||
@@ -115,8 +125,9 @@ impl MemoryRegionKeeper {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::region_keeper::MemoryRegionKeeper;
|
||||
@@ -125,20 +136,43 @@ mod tests {
|
||||
fn test_opening_region_keeper() {
|
||||
let keeper = MemoryRegionKeeper::new();
|
||||
|
||||
let guard = keeper.register(1, RegionId::from_u64(1)).unwrap();
|
||||
assert!(keeper.register(1, RegionId::from_u64(1)).is_none());
|
||||
let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap();
|
||||
let guard = keeper
|
||||
.register_with_role(1, RegionId::from_u64(1), RegionRole::Leader)
|
||||
.unwrap();
|
||||
assert!(
|
||||
keeper
|
||||
.register_with_role(1, RegionId::from_u64(1), RegionRole::Leader)
|
||||
.is_none()
|
||||
);
|
||||
let guard2 = keeper
|
||||
.register_with_role(1, RegionId::from_u64(2), RegionRole::Follower)
|
||||
.unwrap();
|
||||
|
||||
let mut regions = HashSet::from([
|
||||
let regions = HashSet::from([
|
||||
RegionId::from_u64(1),
|
||||
RegionId::from_u64(2),
|
||||
RegionId::from_u64(3),
|
||||
]);
|
||||
let output = keeper.extract_operating_regions(1, &mut regions);
|
||||
let output = keeper.extract_operating_region_roles(1, ®ions);
|
||||
assert_eq!(output.len(), 2);
|
||||
|
||||
assert!(output.contains(&RegionId::from_u64(1)));
|
||||
assert!(output.contains(&RegionId::from_u64(2)));
|
||||
assert!(output.contains_key(&RegionId::from_u64(1)));
|
||||
assert!(output.contains_key(&RegionId::from_u64(2)));
|
||||
assert_eq!(keeper.len(), 2);
|
||||
|
||||
let regions = HashSet::from([
|
||||
RegionId::from_u64(1),
|
||||
RegionId::from_u64(2),
|
||||
RegionId::from_u64(3),
|
||||
]);
|
||||
let output = keeper.extract_operating_region_roles(1, ®ions);
|
||||
assert_eq!(
|
||||
output,
|
||||
HashMap::from([
|
||||
(RegionId::from_u64(1), RegionRole::Leader),
|
||||
(RegionId::from_u64(2), RegionRole::Follower),
|
||||
])
|
||||
);
|
||||
assert_eq!(keeper.len(), 2);
|
||||
|
||||
drop(guard);
|
||||
|
||||
@@ -23,6 +23,7 @@ use derive_builder::Builder;
|
||||
use serde::ser::SerializeSeq;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use snafu::OptionExt;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use strum::AsRefStr;
|
||||
use table::table_name::TableName;
|
||||
@@ -99,6 +100,20 @@ pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId,
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Returns the operating leader regions with corresponding [DatanodeId] and [RegionRole].
|
||||
pub fn operating_leader_region_roles(
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Vec<(RegionId, DatanodeId, RegionRole)> {
|
||||
region_routes
|
||||
.iter()
|
||||
.filter_map(|route| {
|
||||
let role = route.leader_region_role()?;
|
||||
let leader = route.leader_peer.as_ref()?;
|
||||
Some((route.region.id, leader.id, role))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns the HashMap<[RegionNumber], &[Peer]>;
|
||||
///
|
||||
/// If the region doesn't have a leader peer, the [Region] will be omitted.
|
||||
@@ -342,6 +357,19 @@ impl RegionRoute {
|
||||
matches!(self.leader_state, Some(LeaderState::Staging))
|
||||
}
|
||||
|
||||
/// Returns the role of the leader region.
|
||||
pub fn leader_region_role(&self) -> Option<RegionRole> {
|
||||
self.leader_peer.as_ref().map(|_| {
|
||||
if self.is_leader_staging() {
|
||||
RegionRole::StagingLeader
|
||||
} else if self.is_leader_downgrading() {
|
||||
RegionRole::DowngradingLeader
|
||||
} else {
|
||||
RegionRole::Leader
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Marks the Leader [`Region`] as [`RegionState::Downgrading`].
|
||||
///
|
||||
/// We should downgrade a [`Region`] before deactivating it:
|
||||
@@ -577,6 +605,17 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::key::RegionRoleSet;
|
||||
|
||||
fn new_test_region_route(region_id: RegionId) -> RegionRoute {
|
||||
RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::new(1, "a1")),
|
||||
follower_peers: vec![Peer::new(2, "a2")],
|
||||
leader_state: None,
|
||||
leader_down_since: None,
|
||||
write_route_policy: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_leader_is_downgraded() {
|
||||
let mut region_route = RegionRoute {
|
||||
@@ -740,6 +779,65 @@ mod tests {
|
||||
assert!(!region_route.is_ignore_all_writes());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_leader_region_role_without_leader_peer_returns_none() {
|
||||
let region_route = RegionRoute {
|
||||
leader_peer: None,
|
||||
..new_test_region_route(RegionId::new(1, 1))
|
||||
};
|
||||
|
||||
assert_eq!(region_route.leader_region_role(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_leader_region_role_variants() {
|
||||
let normal = new_test_region_route(RegionId::new(1, 1));
|
||||
let mut downgrading = new_test_region_route(RegionId::new(1, 2));
|
||||
downgrading.leader_state = Some(LeaderState::Downgrading);
|
||||
let mut staging = new_test_region_route(RegionId::new(1, 3));
|
||||
staging.leader_state = Some(LeaderState::Staging);
|
||||
|
||||
assert_eq!(normal.leader_region_role(), Some(RegionRole::Leader));
|
||||
assert_eq!(
|
||||
downgrading.leader_region_role(),
|
||||
Some(RegionRole::DowngradingLeader)
|
||||
);
|
||||
assert_eq!(
|
||||
staging.leader_region_role(),
|
||||
Some(RegionRole::StagingLeader)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_operating_leader_region_roles_returns_expected_roles() {
|
||||
let no_leader_region = RegionRoute {
|
||||
leader_peer: None,
|
||||
..new_test_region_route(RegionId::new(1, 4))
|
||||
};
|
||||
let mut downgrading = new_test_region_route(RegionId::new(1, 2));
|
||||
downgrading.leader_peer = Some(Peer::new(2, "a2"));
|
||||
downgrading.leader_state = Some(LeaderState::Downgrading);
|
||||
let mut staging = new_test_region_route(RegionId::new(1, 3));
|
||||
staging.leader_peer = Some(Peer::new(3, "a3"));
|
||||
staging.leader_state = Some(LeaderState::Staging);
|
||||
|
||||
let roles = operating_leader_region_roles(&[
|
||||
new_test_region_route(RegionId::new(1, 1)),
|
||||
downgrading,
|
||||
staging,
|
||||
no_leader_region,
|
||||
]);
|
||||
|
||||
assert_eq!(
|
||||
roles,
|
||||
vec![
|
||||
(RegionId::new(1, 1), 1, RegionRole::Leader),
|
||||
(RegionId::new(1, 2), 2, RegionRole::DowngradingLeader),
|
||||
(RegionId::new(1, 3), 3, RegionRole::StagingLeader),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_distribution() {
|
||||
let region_routes = vec![
|
||||
|
||||
@@ -281,7 +281,7 @@ mod test {
|
||||
|
||||
let opening_region_id = RegionId::new(table_id, region_number + 2);
|
||||
let _guard = opening_region_keeper
|
||||
.register(follower_peer.id, opening_region_id)
|
||||
.register_with_role(follower_peer.id, opening_region_id, RegionRole::Follower)
|
||||
.unwrap();
|
||||
|
||||
let acc = &mut HeartbeatAccumulator::default();
|
||||
|
||||
@@ -25,6 +25,7 @@ use common_telemetry::info;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::region_engine::RegionRole;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -129,7 +130,7 @@ impl OpenCandidateRegion {
|
||||
// Registers the opening region.
|
||||
let guard = ctx
|
||||
.opening_region_keeper
|
||||
.register(candidate.id, *region_id)
|
||||
.register_with_role(candidate.id, *region_id, RegionRole::Follower)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
peer_id: candidate.id,
|
||||
region_id: *region_id,
|
||||
@@ -296,7 +297,7 @@ mod tests {
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let opening_region_keeper = env.opening_region_keeper();
|
||||
let _guard = opening_region_keeper
|
||||
.register(to_peer_id, region_id)
|
||||
.register_with_role(to_peer_id, region_id, RegionRole::Follower)
|
||||
.unwrap();
|
||||
|
||||
let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
|
||||
|
||||
@@ -231,6 +231,7 @@ mod tests {
|
||||
use common_meta::region_keeper::MemoryRegionKeeper;
|
||||
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
|
||||
use common_time::util::current_time_millis;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::Error;
|
||||
@@ -467,7 +468,7 @@ mod tests {
|
||||
}];
|
||||
|
||||
let guard = opening_keeper
|
||||
.register(2, RegionId::new(table_id, 1))
|
||||
.register_with_role(2, RegionId::new(table_id, 1), RegionRole::Follower)
|
||||
.unwrap();
|
||||
ctx.volatile_ctx.opening_region_guards.push(guard);
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
|
||||
use common_meta::region_registry::LeaderRegionRegistryRef;
|
||||
use common_meta::rpc::router::{RegionRoute, operating_leader_regions};
|
||||
use common_meta::rpc::router::{RegionRoute, operating_leader_region_roles};
|
||||
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
|
||||
use common_procedure::{
|
||||
BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
@@ -417,9 +417,9 @@ impl Context {
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<Vec<OperatingRegionGuard>> {
|
||||
let mut operating_guards = Vec::with_capacity(region_routes.len());
|
||||
for (region_id, datanode_id) in operating_leader_regions(region_routes) {
|
||||
for (region_id, datanode_id, role) in operating_leader_region_roles(region_routes) {
|
||||
let guard = memory_region_keeper
|
||||
.register(datanode_id, region_id)
|
||||
.register_with_role(datanode_id, region_id, role)
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
peer_id: datanode_id,
|
||||
region_id,
|
||||
@@ -837,9 +837,11 @@ mod tests {
|
||||
};
|
||||
use common_meta::error;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
use common_meta::region_keeper::MemoryRegionKeeper;
|
||||
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
|
||||
use common_meta::test_util::MockDatanodeManager;
|
||||
use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState};
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
use table::table_name::TableName;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -993,6 +995,59 @@ mod tests {
|
||||
assert!(!procedure.should_rollback_allocated_regions());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_register_operating_regions_preserves_route_roles() {
|
||||
let keeper = Arc::new(MemoryRegionKeeper::new());
|
||||
let region_routes = vec![
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
follower_peers: vec![],
|
||||
leader_state: None,
|
||||
leader_down_since: None,
|
||||
write_route_policy: None,
|
||||
},
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 2)),
|
||||
leader_peer: Some(Peer::empty(2)),
|
||||
follower_peers: vec![],
|
||||
leader_state: Some(LeaderState::Staging),
|
||||
leader_down_since: None,
|
||||
write_route_policy: None,
|
||||
},
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 3)),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
follower_peers: vec![],
|
||||
leader_state: Some(LeaderState::Downgrading),
|
||||
leader_down_since: None,
|
||||
write_route_policy: None,
|
||||
},
|
||||
];
|
||||
|
||||
let _guards = Context::register_operating_regions(&keeper, ®ion_routes).unwrap();
|
||||
|
||||
let leader_roles =
|
||||
keeper.extract_operating_region_roles(1, &HashSet::from([RegionId::new(1024, 1)]));
|
||||
let staging_roles =
|
||||
keeper.extract_operating_region_roles(2, &HashSet::from([RegionId::new(1024, 2)]));
|
||||
let downgrading_roles =
|
||||
keeper.extract_operating_region_roles(3, &HashSet::from([RegionId::new(1024, 3)]));
|
||||
|
||||
assert_eq!(
|
||||
leader_roles.get(&RegionId::new(1024, 1)),
|
||||
Some(&RegionRole::Leader)
|
||||
);
|
||||
assert_eq!(
|
||||
staging_roles.get(&RegionId::new(1024, 2)),
|
||||
Some(&RegionRole::StagingLeader)
|
||||
);
|
||||
assert_eq!(
|
||||
downgrading_roles.get(&RegionId::new(1024, 3)),
|
||||
Some(&RegionRole::DowngradingLeader)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
|
||||
let env = TestingEnv::new();
|
||||
|
||||
@@ -20,7 +20,7 @@ use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::region_keeper::MemoryRegionKeeperRef;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_telemetry::warn;
|
||||
use common_telemetry::{info, warn};
|
||||
use snafu::ResultExt;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
@@ -63,15 +63,9 @@ fn renew_region_lease_via_region_route(
|
||||
if let Some(leader) = ®ion_route.leader_peer
|
||||
&& leader.id == datanode_id
|
||||
{
|
||||
let region_role = if region_route.is_leader_staging() {
|
||||
RegionRole::StagingLeader
|
||||
} else if region_route.is_leader_downgrading() {
|
||||
RegionRole::DowngradingLeader
|
||||
} else {
|
||||
RegionRole::Leader
|
||||
};
|
||||
|
||||
return Some((region_id, region_role));
|
||||
return region_route
|
||||
.leader_region_role()
|
||||
.map(|region_role| (region_id, region_role));
|
||||
}
|
||||
|
||||
// If it's a follower region on this datanode.
|
||||
@@ -83,11 +77,28 @@ fn renew_region_lease_via_region_route(
|
||||
return Some((region_id, RegionRole::Follower));
|
||||
}
|
||||
|
||||
warn!(
|
||||
"Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region_routes: {:?}",
|
||||
region_route
|
||||
);
|
||||
// The region doesn't belong to this datanode.
|
||||
None
|
||||
}
|
||||
|
||||
fn renew_region_lease_via_operating_regions(
|
||||
operating_regions: &HashMap<RegionId, RegionRole>,
|
||||
datanode_id: DatanodeId,
|
||||
region_id: RegionId,
|
||||
reported_role: RegionRole,
|
||||
) -> Option<RegionLeaseInfo> {
|
||||
// `operating_regions` is filtered by the current datanode in `collect_metadata`,
|
||||
// so looking up by `region_id` is sufficient here.
|
||||
if let Some(role) = operating_regions.get(®ion_id) {
|
||||
let region_lease_info = RegionLeaseInfo::operating(region_id, *role);
|
||||
if *role != reported_role {
|
||||
info!(
|
||||
"The region {} on datanode {} is operating with role {:?}, but reported as {:?}",
|
||||
region_id, datanode_id, role, reported_role
|
||||
);
|
||||
}
|
||||
return Some(region_lease_info);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
@@ -149,49 +160,51 @@ impl RegionLeaseKeeper {
|
||||
}
|
||||
|
||||
/// Returns [None] if:
|
||||
/// - The region doesn't belong to the datanode.
|
||||
/// - The region doesn't belong to the datanode in metadata or operating regions.
|
||||
/// - The region belongs to a logical table.
|
||||
fn renew_region_lease(
|
||||
&self,
|
||||
table_metadata: &HashMap<TableId, TableRouteValue>,
|
||||
operating_regions: &HashSet<RegionId>,
|
||||
operating_regions: &HashMap<RegionId, RegionRole>,
|
||||
datanode_id: DatanodeId,
|
||||
region_id: RegionId,
|
||||
role: RegionRole,
|
||||
reported_role: RegionRole,
|
||||
) -> Option<RegionLeaseInfo> {
|
||||
if operating_regions.contains(®ion_id) {
|
||||
let region_lease_info = RegionLeaseInfo::operating(region_id, role);
|
||||
// First try to renew via region route
|
||||
if let Some(table_route) = table_metadata.get(®ion_id.table_id())
|
||||
&& let Ok(Some(region_route)) = table_route.region_route(region_id)
|
||||
&& let Some(region_lease) =
|
||||
renew_region_lease_via_region_route(®ion_route, datanode_id, region_id)
|
||||
{
|
||||
return Some(RegionLeaseInfo::from(region_lease));
|
||||
}
|
||||
// Then try to renew via operating regions, which covers the opening region without region route in metadata.
|
||||
if let Some(region_lease_info) = renew_region_lease_via_operating_regions(
|
||||
operating_regions,
|
||||
datanode_id,
|
||||
region_id,
|
||||
reported_role,
|
||||
) {
|
||||
return Some(region_lease_info);
|
||||
}
|
||||
|
||||
if let Some(table_route) = table_metadata.get(®ion_id.table_id()) {
|
||||
if let Ok(Some(region_route)) = table_route.region_route(region_id) {
|
||||
return renew_region_lease_via_region_route(®ion_route, datanode_id, region_id)
|
||||
.map(RegionLeaseInfo::from);
|
||||
} else {
|
||||
warn!(
|
||||
"Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region route is not found in table({})",
|
||||
region_id.table_id()
|
||||
);
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, table({}) is not found",
|
||||
region_id.table_id()
|
||||
);
|
||||
}
|
||||
warn!(
|
||||
"Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, no matching metadata or operating region found",
|
||||
);
|
||||
None
|
||||
}
|
||||
|
||||
async fn collect_metadata(
|
||||
&self,
|
||||
datanode_id: DatanodeId,
|
||||
mut region_ids: HashSet<RegionId>,
|
||||
) -> Result<(HashMap<TableId, TableRouteValue>, HashSet<RegionId>)> {
|
||||
// Filters out operating region first, improves the cache hit rate(reduce expensive remote fetches).
|
||||
region_ids: HashSet<RegionId>,
|
||||
) -> Result<(
|
||||
HashMap<TableId, TableRouteValue>,
|
||||
HashMap<RegionId, RegionRole>,
|
||||
)> {
|
||||
let operating_regions = self
|
||||
.memory_region_keeper
|
||||
.extract_operating_regions(datanode_id, &mut region_ids);
|
||||
.extract_operating_region_roles(datanode_id, ®ion_ids);
|
||||
let table_ids = region_ids
|
||||
.into_iter()
|
||||
.map(|region_id| region_id.table_id())
|
||||
@@ -224,13 +237,13 @@ impl RegionLeaseKeeper {
|
||||
let mut renewed = HashMap::new();
|
||||
let mut non_exists = HashSet::new();
|
||||
|
||||
for &(region, role) in regions {
|
||||
for &(region, reported_role) in regions {
|
||||
match self.renew_region_lease(
|
||||
&table_metadata,
|
||||
&operating_regions,
|
||||
datanode_id,
|
||||
region,
|
||||
role,
|
||||
reported_role,
|
||||
) {
|
||||
Some(region_lease_info) => {
|
||||
renewed.insert(region_lease_info.region_id, region_lease_info);
|
||||
@@ -376,12 +389,16 @@ mod tests {
|
||||
let opening_region_id = RegionId::new(1025, 1);
|
||||
let _guard = keeper
|
||||
.memory_region_keeper
|
||||
.register(leader_peer_id, opening_region_id)
|
||||
.register_with_role(leader_peer_id, opening_region_id, RegionRole::Leader)
|
||||
.unwrap();
|
||||
let another_opening_region_id = RegionId::new(1025, 2);
|
||||
let _guard2 = keeper
|
||||
.memory_region_keeper
|
||||
.register(follower_peer_id, another_opening_region_id)
|
||||
.register_with_role(
|
||||
follower_peer_id,
|
||||
another_opening_region_id,
|
||||
RegionRole::Follower,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (metadata, regions) = keeper
|
||||
@@ -395,8 +412,10 @@ mod tests {
|
||||
metadata.keys().cloned().collect::<Vec<_>>(),
|
||||
vec![region_id.table_id()]
|
||||
);
|
||||
assert!(regions.contains(&opening_region_id));
|
||||
assert_eq!(regions.len(), 1);
|
||||
assert_eq!(
|
||||
regions,
|
||||
HashMap::from([(opening_region_id, RegionRole::Leader)])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -481,17 +500,17 @@ mod tests {
|
||||
let opening_region_id = RegionId::new(2048, 1);
|
||||
let _guard = keeper
|
||||
.memory_region_keeper
|
||||
.register(leader_peer_id, opening_region_id)
|
||||
.register_with_role(leader_peer_id, opening_region_id, RegionRole::Leader)
|
||||
.unwrap();
|
||||
|
||||
// The opening region on the datanode.
|
||||
// NOTES: The procedure lock will ensure only one opening leader.
|
||||
for role in [RegionRole::Leader, RegionRole::Follower] {
|
||||
for reported_role in [RegionRole::Leader, RegionRole::Follower] {
|
||||
let RenewRegionLeasesResponse {
|
||||
non_exists,
|
||||
renewed,
|
||||
} = keeper
|
||||
.renew_region_leases(leader_peer_id, &[(opening_region_id, role)])
|
||||
.renew_region_leases(leader_peer_id, &[(opening_region_id, reported_role)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -500,7 +519,7 @@ mod tests {
|
||||
renewed,
|
||||
HashMap::from([(
|
||||
opening_region_id,
|
||||
RegionLeaseInfo::operating(opening_region_id, role)
|
||||
RegionLeaseInfo::operating(opening_region_id, RegionRole::Leader)
|
||||
)])
|
||||
);
|
||||
}
|
||||
@@ -676,14 +695,80 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_renew_region_leases_operating_region_preserves_reported_role() {
|
||||
let keeper = new_test_keeper();
|
||||
async fn test_renew_region_leases_metadata_role_beats_keeper_role() {
|
||||
let table_id = 2048;
|
||||
let table_info: TableInfo = new_test_table_info(table_id);
|
||||
|
||||
let datanode_id = 1024;
|
||||
let region_id = RegionId::new(2048, 1);
|
||||
let region_id = RegionId::new(table_id, 1);
|
||||
let region_route = RegionRouteBuilder::default()
|
||||
.region(Region::new_test(region_id))
|
||||
.leader_peer(Peer::empty(datanode_id))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let keeper = new_test_keeper();
|
||||
let table_metadata_manager = keeper.table_metadata_manager();
|
||||
table_metadata_manager
|
||||
.create_table_metadata(
|
||||
table_info,
|
||||
TableRouteValue::physical(vec![region_route]),
|
||||
HashMap::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let _guard = keeper
|
||||
.memory_region_keeper
|
||||
.register(datanode_id, region_id)
|
||||
.register_with_role(datanode_id, region_id, RegionRole::Follower)
|
||||
.unwrap();
|
||||
|
||||
let RenewRegionLeasesResponse {
|
||||
non_exists,
|
||||
renewed,
|
||||
} = keeper
|
||||
.renew_region_leases(datanode_id, &[(region_id, RegionRole::Follower)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(non_exists.is_empty());
|
||||
assert_eq!(
|
||||
renewed,
|
||||
HashMap::from([(
|
||||
region_id,
|
||||
RegionLeaseInfo::from((region_id, RegionRole::Leader))
|
||||
)])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_renew_region_leases_missing_route_falls_back_to_keeper_role() {
|
||||
let table_id = 2048;
|
||||
let table_info: TableInfo = new_test_table_info(table_id);
|
||||
|
||||
let datanode_id = 1024;
|
||||
let region_id = RegionId::new(table_id, 1);
|
||||
let another_region_id = RegionId::new(table_id, 2);
|
||||
let region_route = RegionRouteBuilder::default()
|
||||
.region(Region::new_test(another_region_id))
|
||||
.leader_peer(Peer::empty(datanode_id))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let keeper = new_test_keeper();
|
||||
let table_metadata_manager = keeper.table_metadata_manager();
|
||||
table_metadata_manager
|
||||
.create_table_metadata(
|
||||
table_info,
|
||||
TableRouteValue::physical(vec![region_route]),
|
||||
HashMap::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let _guard = keeper
|
||||
.memory_region_keeper
|
||||
.register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader)
|
||||
.unwrap();
|
||||
|
||||
let RenewRegionLeasesResponse {
|
||||
@@ -699,7 +784,36 @@ mod tests {
|
||||
renewed,
|
||||
HashMap::from([(
|
||||
region_id,
|
||||
RegionLeaseInfo::operating(region_id, RegionRole::StagingLeader)
|
||||
RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader)
|
||||
)])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_renew_region_leases_operating_region_uses_keeper_role() {
|
||||
let keeper = new_test_keeper();
|
||||
let datanode_id = 1024;
|
||||
let region_id = RegionId::new(2048, 1);
|
||||
|
||||
let _guard = keeper
|
||||
.memory_region_keeper
|
||||
.register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader)
|
||||
.unwrap();
|
||||
|
||||
let RenewRegionLeasesResponse {
|
||||
non_exists,
|
||||
renewed,
|
||||
} = keeper
|
||||
.renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(non_exists.is_empty());
|
||||
assert_eq!(
|
||||
renewed,
|
||||
HashMap::from([(
|
||||
region_id,
|
||||
RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader)
|
||||
)])
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user