mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
fix: correct leader state reset and region migration locking consistency (#7199)
* fix(meta): remove table route cache in region migration ctx Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: fix unit tests Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: fix clippy Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: fix campaign reset not clearing leader state-s Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: gracefully handle region lease renewal errors Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use common_telemetry::{error, warn};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_time::Timestamp;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use sqlx::mysql::{MySqlArguments, MySqlRow};
|
||||
@@ -645,6 +645,13 @@ impl Election for MySqlElection {
|
||||
}
|
||||
|
||||
async fn reset_campaign(&self) {
|
||||
info!("Resetting campaign");
|
||||
if self.is_leader.load(Ordering::Relaxed) {
|
||||
if let Err(err) = self.step_down_without_lock().await {
|
||||
error!(err; "Failed to step down without lock");
|
||||
}
|
||||
info!("Step down without lock successfully, due to reset campaign");
|
||||
}
|
||||
if let Err(err) = self.client.lock().await.reset_client().await {
|
||||
error!(err; "Failed to reset client");
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use common_telemetry::{error, warn};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_time::Timestamp;
|
||||
use deadpool_postgres::{Manager, Pool};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
@@ -477,6 +477,13 @@ impl Election for PgElection {
|
||||
}
|
||||
|
||||
async fn reset_campaign(&self) {
|
||||
info!("Resetting campaign");
|
||||
if self.is_leader.load(Ordering::Relaxed) {
|
||||
if let Err(err) = self.step_down_without_lock().await {
|
||||
error!(err; "Failed to step down without lock");
|
||||
}
|
||||
info!("Step down without lock successfully, due to reset campaign");
|
||||
}
|
||||
if let Err(err) = self.pg_client.write().await.reset_client().await {
|
||||
error!(err; "Failed to reset client");
|
||||
}
|
||||
@@ -774,16 +781,12 @@ impl PgElection {
|
||||
key: key.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_ok()
|
||||
&& let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
send_leader_change_and_set_flags(
|
||||
&self.is_leader,
|
||||
&self.leader_infancy,
|
||||
&self.leader_watcher,
|
||||
LeaderChangeMessage::StepDown(Arc::new(leader_key)),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
|
||||
use async_trait::async_trait;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::region_keeper::MemoryRegionKeeperRef;
|
||||
use common_telemetry::error;
|
||||
use store_api::region_engine::GrantedRegion;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -83,36 +84,44 @@ impl HeartbeatHandler for RegionLeaseHandler {
|
||||
let regions = stat.regions();
|
||||
let datanode_id = stat.id;
|
||||
|
||||
let RenewRegionLeasesResponse {
|
||||
non_exists,
|
||||
renewed,
|
||||
} = self
|
||||
match self
|
||||
.region_lease_keeper
|
||||
.renew_region_leases(datanode_id, ®ions)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(RenewRegionLeasesResponse {
|
||||
non_exists,
|
||||
renewed,
|
||||
}) => {
|
||||
let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
|
||||
renewer
|
||||
.renew(ctx, renewed)
|
||||
.into_iter()
|
||||
.map(|region| region.into())
|
||||
.collect()
|
||||
} else {
|
||||
renewed
|
||||
.into_iter()
|
||||
.map(|(region_id, region_lease_info)| {
|
||||
GrantedRegion::new(region_id, region_lease_info.role).into()
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
|
||||
renewer
|
||||
.renew(ctx, renewed)
|
||||
.into_iter()
|
||||
.map(|region| region.into())
|
||||
.collect()
|
||||
} else {
|
||||
renewed
|
||||
.into_iter()
|
||||
.map(|(region_id, region_lease_info)| {
|
||||
GrantedRegion::new(region_id, region_lease_info.role).into()
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
acc.region_lease = Some(RegionLease {
|
||||
regions: renewed,
|
||||
duration_since_epoch: req.duration_since_epoch,
|
||||
lease_seconds: self.region_lease_seconds,
|
||||
closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
|
||||
});
|
||||
acc.inactive_region_ids = non_exists;
|
||||
acc.region_lease = Some(RegionLease {
|
||||
regions: renewed,
|
||||
duration_since_epoch: req.duration_since_epoch,
|
||||
lease_seconds: self.region_lease_seconds,
|
||||
closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
|
||||
});
|
||||
acc.inactive_region_ids = non_exists;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions);
|
||||
// If we throw error here, the datanode will be marked as failure by region failure handler.
|
||||
// So we only log the error and continue.
|
||||
}
|
||||
}
|
||||
|
||||
Ok(HandleControl::Continue)
|
||||
}
|
||||
|
||||
@@ -373,7 +373,8 @@ impl MetasrvBuilder {
|
||||
runtime_switch_manager.clone(),
|
||||
meta_peer_client.clone(),
|
||||
leader_cached_kv_backend.clone(),
|
||||
);
|
||||
)
|
||||
.with_state(state.clone());
|
||||
|
||||
Some(RegionFailureHandler::new(
|
||||
region_supervisor,
|
||||
|
||||
@@ -41,7 +41,7 @@ use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
|
||||
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
|
||||
use common_procedure::error::{
|
||||
@@ -231,8 +231,6 @@ pub struct VolatileContext {
|
||||
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
|
||||
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
|
||||
opening_region_guard: Option<OperatingRegionGuard>,
|
||||
/// `table_route` is stored via previous steps for future use.
|
||||
table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
|
||||
/// `datanode_table` is stored via previous steps for future use.
|
||||
from_peer_datanode_table: Option<DatanodeTableValue>,
|
||||
/// `table_info` is stored via previous steps for future use.
|
||||
@@ -399,29 +397,23 @@ impl Context {
|
||||
/// Retry:
|
||||
/// - Failed to retrieve the metadata of table.
|
||||
pub async fn get_table_route_value(
|
||||
&mut self,
|
||||
) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
|
||||
let table_route_value = &mut self.volatile_ctx.table_route;
|
||||
&self,
|
||||
) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
|
||||
let table_id = self.persistent_ctx.region_id.table_id();
|
||||
let table_route = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get_with_raw_bytes(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to get TableRoute: {table_id}"),
|
||||
})?
|
||||
.context(error::TableRouteNotFoundSnafu { table_id })?;
|
||||
|
||||
if table_route_value.is_none() {
|
||||
let table_id = self.persistent_ctx.region_id.table_id();
|
||||
let table_route = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get_with_raw_bytes(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to get TableRoute: {table_id}"),
|
||||
})?
|
||||
.context(error::TableRouteNotFoundSnafu { table_id })?;
|
||||
|
||||
*table_route_value = Some(table_route);
|
||||
}
|
||||
|
||||
Ok(table_route_value.as_ref().unwrap())
|
||||
Ok(table_route)
|
||||
}
|
||||
|
||||
/// Notifies the RegionSupervisor to register failure detectors of failed region.
|
||||
@@ -463,12 +455,6 @@ impl Context {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Removes the `table_route` of [VolatileContext], returns true if any.
|
||||
pub fn remove_table_route_value(&mut self) -> bool {
|
||||
let value = self.volatile_ctx.table_route.take();
|
||||
value.is_some()
|
||||
}
|
||||
|
||||
/// Returns the `table_info` of [VolatileContext] if any.
|
||||
/// Otherwise, returns the value retrieved from remote.
|
||||
///
|
||||
@@ -663,14 +649,13 @@ impl RegionMigrationProcedure {
|
||||
})
|
||||
}
|
||||
|
||||
async fn rollback_inner(&mut self) -> Result<()> {
|
||||
async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
|
||||
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
|
||||
.with_label_values(&["rollback"])
|
||||
.start_timer();
|
||||
|
||||
let table_id = self.context.region_id().table_id();
|
||||
let region_id = self.context.region_id();
|
||||
self.context.remove_table_route_value();
|
||||
let table_metadata_manager = self.context.table_metadata_manager.clone();
|
||||
let table_route = self.context.get_table_route_value().await?;
|
||||
|
||||
@@ -683,9 +668,11 @@ impl RegionMigrationProcedure {
|
||||
.any(|route| route.is_leader_downgrading());
|
||||
|
||||
if downgraded {
|
||||
let table_lock = TableLock::Write(region_id.table_id()).into();
|
||||
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
|
||||
info!("Rollbacking downgraded region leader table route, region: {region_id}");
|
||||
table_metadata_manager
|
||||
.update_leader_region_status(table_id, table_route, |route| {
|
||||
.update_leader_region_status(table_id, &table_route, |route| {
|
||||
if route.region.id == region_id {
|
||||
Some(None)
|
||||
} else {
|
||||
@@ -712,8 +699,8 @@ impl Procedure for RegionMigrationProcedure {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
|
||||
self.rollback_inner()
|
||||
async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
|
||||
self.rollback_inner(ctx)
|
||||
.await
|
||||
.map_err(ProcedureError::external)
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ impl UpdateMetadata {
|
||||
|
||||
// TODO(weny): ensures the leader region peer is the `from_peer`.
|
||||
if let Err(err) = table_metadata_manager
|
||||
.update_leader_region_status(table_id, current_table_route_value, |route| {
|
||||
.update_leader_region_status(table_id, ¤t_table_route_value, |route| {
|
||||
if route.region.id == region_id
|
||||
&& route
|
||||
.leader_peer
|
||||
@@ -61,7 +61,6 @@ impl UpdateMetadata {
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
{
|
||||
ctx.remove_table_route_value();
|
||||
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!(
|
||||
"Failed to update the table route during the downgrading leader region, region_id: {region_id}, from_peer_id: {from_peer_id}"
|
||||
@@ -69,8 +68,6 @@ impl UpdateMetadata {
|
||||
});
|
||||
}
|
||||
|
||||
ctx.remove_table_route_value();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -81,7 +78,7 @@ mod tests {
|
||||
|
||||
use common_meta::key::test_utils::new_test_table_info;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::Error;
|
||||
@@ -115,63 +112,6 @@ mod tests {
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_failed_to_update_table_route_error() {
|
||||
let state = UpdateMetadata::Downgrade;
|
||||
let persistent_context = new_persistent_context();
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let table_id = ctx.region_id().table_id();
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1, 2]).into();
|
||||
let region_routes = vec![
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(from_peer.clone()),
|
||||
..Default::default()
|
||||
},
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 2)),
|
||||
leader_peer: Some(Peer::empty(4)),
|
||||
..Default::default()
|
||||
},
|
||||
];
|
||||
|
||||
env.create_physical_table_metadata(table_info, region_routes)
|
||||
.await;
|
||||
|
||||
let table_metadata_manager = env.table_metadata_manager();
|
||||
let original_table_route = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get_with_raw_bytes(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// modifies the table route.
|
||||
table_metadata_manager
|
||||
.update_leader_region_status(table_id, &original_table_route, |route| {
|
||||
if route.region.id == RegionId::new(1024, 2) {
|
||||
Some(Some(LeaderState::Downgrading))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// sets the old table route.
|
||||
ctx.volatile_ctx.table_route = Some(original_table_route);
|
||||
|
||||
let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
|
||||
assert!(ctx.volatile_ctx.table_route.is_none());
|
||||
assert!(err.is_retryable());
|
||||
assert!(format!("{err:?}").contains("Failed to update the table route"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_only_downgrade_from_peer() {
|
||||
let mut state = Box::new(UpdateMetadata::Downgrade);
|
||||
@@ -212,7 +152,6 @@ mod tests {
|
||||
// It should remain unchanged.
|
||||
assert_eq!(latest_table_route.version().unwrap(), 0);
|
||||
assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
|
||||
assert!(ctx.volatile_ctx.table_route.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -254,6 +193,5 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
|
||||
assert!(ctx.volatile_ctx.table_route.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ impl UpdateMetadata {
|
||||
let current_table_route_value = ctx.get_table_route_value().await?;
|
||||
|
||||
if let Err(err) = table_metadata_manager
|
||||
.update_leader_region_status(table_id, current_table_route_value, |route| {
|
||||
.update_leader_region_status(table_id, ¤t_table_route_value, |route| {
|
||||
if route.region.id == region_id {
|
||||
Some(None)
|
||||
} else {
|
||||
@@ -45,14 +45,12 @@ impl UpdateMetadata {
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
{
|
||||
ctx.remove_table_route_value();
|
||||
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
|
||||
});
|
||||
}
|
||||
|
||||
ctx.register_failure_detectors().await;
|
||||
ctx.remove_table_route_value();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -61,7 +59,6 @@ impl UpdateMetadata {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::key::test_utils::new_test_table_info;
|
||||
use common_meta::peer::Peer;
|
||||
@@ -73,7 +70,6 @@ mod tests {
|
||||
use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
|
||||
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
|
||||
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
|
||||
use crate::region::supervisor::RegionFailureDetectorControl;
|
||||
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
|
||||
@@ -93,101 +89,6 @@ mod tests {
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_table_route_with_retry() {
|
||||
let state = UpdateMetadata::Rollback;
|
||||
let persistent_context = new_persistent_context();
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
|
||||
ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx));
|
||||
let table_id = ctx.region_id().table_id();
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
|
||||
let region_routes = vec![
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(from_peer.clone()),
|
||||
leader_state: Some(LeaderState::Downgrading),
|
||||
..Default::default()
|
||||
},
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 2)),
|
||||
leader_peer: Some(Peer::empty(4)),
|
||||
leader_state: Some(LeaderState::Downgrading),
|
||||
..Default::default()
|
||||
},
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 3)),
|
||||
leader_peer: Some(Peer::empty(5)),
|
||||
..Default::default()
|
||||
},
|
||||
];
|
||||
|
||||
let expected_region_routes = {
|
||||
let mut region_routes = region_routes.clone();
|
||||
region_routes[0].leader_state = None;
|
||||
region_routes[1].leader_state = None;
|
||||
region_routes
|
||||
};
|
||||
|
||||
env.create_physical_table_metadata(table_info, region_routes)
|
||||
.await;
|
||||
|
||||
let table_metadata_manager = env.table_metadata_manager();
|
||||
let old_table_route = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get_with_raw_bytes(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// modifies the table route.
|
||||
table_metadata_manager
|
||||
.update_leader_region_status(table_id, &old_table_route, |route| {
|
||||
if route.region.id == RegionId::new(1024, 2) {
|
||||
Some(None)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
ctx.volatile_ctx.table_route = Some(old_table_route);
|
||||
|
||||
let err = state
|
||||
.rollback_downgraded_region(&mut ctx)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(ctx.volatile_ctx.table_route.is_none());
|
||||
assert!(err.is_retryable());
|
||||
assert!(format!("{err:?}").contains("Failed to update the table route"));
|
||||
assert_eq!(rx.len(), 0);
|
||||
state.rollback_downgraded_region(&mut ctx).await.unwrap();
|
||||
let event = rx.try_recv().unwrap();
|
||||
let detecting_regions = event.into_region_failure_detectors();
|
||||
assert_eq!(
|
||||
detecting_regions,
|
||||
vec![(from_peer.id, ctx.persistent_ctx.region_id)]
|
||||
);
|
||||
|
||||
let table_route = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
&expected_region_routes,
|
||||
table_route.region_routes().unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_next_migration_end_state() {
|
||||
let mut state = Box::new(UpdateMetadata::Rollback);
|
||||
@@ -238,8 +139,6 @@ mod tests {
|
||||
.downcast_ref::<RegionMigrationAbort>()
|
||||
.unwrap();
|
||||
|
||||
assert!(ctx.volatile_ctx.table_route.is_none());
|
||||
|
||||
let table_route = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
|
||||
@@ -166,7 +166,7 @@ impl UpdateMetadata {
|
||||
region_options: region_options.clone(),
|
||||
region_wal_options: region_wal_options.clone(),
|
||||
},
|
||||
table_route_value,
|
||||
&table_route_value,
|
||||
region_routes,
|
||||
®ion_options,
|
||||
®ion_wal_options,
|
||||
@@ -174,13 +174,11 @@ impl UpdateMetadata {
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
{
|
||||
ctx.remove_table_route_value();
|
||||
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"),
|
||||
});
|
||||
};
|
||||
|
||||
ctx.remove_table_route_value();
|
||||
ctx.deregister_failure_detectors().await;
|
||||
// Consumes the guard.
|
||||
ctx.volatile_ctx.opening_region_guard.take();
|
||||
@@ -310,71 +308,6 @@ mod tests {
|
||||
assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_failed_to_update_table_route_error() {
|
||||
let state = UpdateMetadata::Upgrade;
|
||||
let env = TestingEnv::new();
|
||||
let persistent_context = new_persistent_context();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let opening_keeper = MemoryRegionKeeper::default();
|
||||
|
||||
let table_id = 1024;
|
||||
let table_info = new_test_table_info(table_id, vec![1]).into();
|
||||
let region_routes = vec![
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(table_id, 1)),
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
follower_peers: vec![Peer::empty(5), Peer::empty(3)],
|
||||
leader_state: Some(LeaderState::Downgrading),
|
||||
leader_down_since: Some(current_time_millis()),
|
||||
},
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(table_id, 2)),
|
||||
leader_peer: Some(Peer::empty(4)),
|
||||
leader_state: Some(LeaderState::Downgrading),
|
||||
..Default::default()
|
||||
},
|
||||
];
|
||||
|
||||
env.create_physical_table_metadata(table_info, region_routes)
|
||||
.await;
|
||||
|
||||
let table_metadata_manager = env.table_metadata_manager();
|
||||
let original_table_route = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get_with_raw_bytes(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// modifies the table route.
|
||||
table_metadata_manager
|
||||
.update_leader_region_status(table_id, &original_table_route, |route| {
|
||||
if route.region.id == RegionId::new(1024, 2) {
|
||||
// Removes the status.
|
||||
Some(None)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// sets the old table route.
|
||||
ctx.volatile_ctx.table_route = Some(original_table_route);
|
||||
let guard = opening_keeper
|
||||
.register(2, RegionId::new(table_id, 1))
|
||||
.unwrap();
|
||||
ctx.volatile_ctx.opening_region_guard = Some(guard);
|
||||
let err = state.upgrade_candidate_region(&mut ctx).await.unwrap_err();
|
||||
|
||||
assert!(ctx.volatile_ctx.table_route.is_none());
|
||||
assert!(ctx.volatile_ctx.opening_region_guard.is_some());
|
||||
assert!(err.is_retryable());
|
||||
assert!(format!("{err:?}").contains("Failed to update the table route"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_check_metadata() {
|
||||
let state = UpdateMetadata::Upgrade;
|
||||
@@ -492,7 +425,6 @@ mod tests {
|
||||
.unwrap();
|
||||
let region_routes = table_route.region_routes().unwrap();
|
||||
|
||||
assert!(ctx.volatile_ctx.table_route.is_none());
|
||||
assert!(ctx.volatile_ctx.opening_region_guard.is_none());
|
||||
assert_eq!(region_routes.len(), 1);
|
||||
assert!(!region_routes[0].is_leader_downgrading());
|
||||
|
||||
@@ -52,6 +52,7 @@ use crate::procedure::region_migration::{
|
||||
};
|
||||
use crate::region::failure_detector::RegionFailureDetector;
|
||||
use crate::selector::SelectorOptions;
|
||||
use crate::state::StateRef;
|
||||
|
||||
/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
|
||||
/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
|
||||
@@ -100,16 +101,6 @@ pub(crate) enum Event {
|
||||
Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl Event {
|
||||
pub(crate) fn into_region_failure_detectors(self) -> Vec<DetectingRegion> {
|
||||
match self {
|
||||
Self::RegisterFailureDetectors(detecting_regions) => detecting_regions,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Event {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
@@ -139,6 +130,9 @@ pub struct RegionSupervisorTicker {
|
||||
/// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
|
||||
tick_handle: Mutex<Option<JoinHandle<()>>>,
|
||||
|
||||
/// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
|
||||
initialization_handler: Mutex<Option<JoinHandle<()>>>,
|
||||
|
||||
/// The interval of tick.
|
||||
tick_interval: Duration,
|
||||
|
||||
@@ -182,6 +176,7 @@ impl RegionSupervisorTicker {
|
||||
);
|
||||
Self {
|
||||
tick_handle: Mutex::new(None),
|
||||
initialization_handler: Mutex::new(None),
|
||||
tick_interval,
|
||||
initialization_delay,
|
||||
initialization_retry_period,
|
||||
@@ -202,7 +197,7 @@ impl RegionSupervisorTicker {
|
||||
self.initialization_retry_period,
|
||||
);
|
||||
initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
common_runtime::spawn_global(async move {
|
||||
let initialization_handler = common_runtime::spawn_global(async move {
|
||||
loop {
|
||||
initialization_interval.tick().await;
|
||||
let (tx, rx) = oneshot::channel();
|
||||
@@ -218,6 +213,7 @@ impl RegionSupervisorTicker {
|
||||
}
|
||||
}
|
||||
});
|
||||
*self.initialization_handler.lock().unwrap() = Some(initialization_handler);
|
||||
|
||||
let sender = self.sender.clone();
|
||||
let ticker_loop = tokio::spawn(async move {
|
||||
@@ -247,6 +243,11 @@ impl RegionSupervisorTicker {
|
||||
handle.abort();
|
||||
info!("The tick loop is stopped.");
|
||||
}
|
||||
let initialization_handler = self.initialization_handler.lock().unwrap().take();
|
||||
if let Some(initialization_handler) = initialization_handler {
|
||||
initialization_handler.abort();
|
||||
info!("The initialization loop is stopped.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,6 +291,8 @@ pub struct RegionSupervisor {
|
||||
peer_resolver: PeerResolverRef,
|
||||
/// The kv backend.
|
||||
kv_backend: KvBackendRef,
|
||||
/// The meta state, used to check if the current metasrv is the leader.
|
||||
state: Option<StateRef>,
|
||||
}
|
||||
|
||||
/// Controller for managing failure detectors for regions.
|
||||
@@ -373,12 +376,29 @@ impl RegionSupervisor {
|
||||
runtime_switch_manager,
|
||||
peer_resolver,
|
||||
kv_backend,
|
||||
state: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the meta state.
|
||||
pub(crate) fn with_state(mut self, state: StateRef) -> Self {
|
||||
self.state = Some(state);
|
||||
self
|
||||
}
|
||||
|
||||
/// Runs the main loop.
|
||||
pub(crate) async fn run(&mut self) {
|
||||
while let Some(event) = self.receiver.recv().await {
|
||||
if let Some(state) = self.state.as_ref()
|
||||
&& !state.read().unwrap().is_leader()
|
||||
{
|
||||
warn!(
|
||||
"The current metasrv is not the leader, ignore {:?} event",
|
||||
event
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
match event {
|
||||
Event::InitializeAllRegions(sender) => {
|
||||
match self.is_maintenance_mode_enabled().await {
|
||||
@@ -413,7 +433,10 @@ impl RegionSupervisor {
|
||||
self.deregister_failure_detectors(detecting_regions).await
|
||||
}
|
||||
Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
|
||||
Event::Clear => self.clear(),
|
||||
Event::Clear => {
|
||||
self.clear();
|
||||
info!("Region supervisor is initialized.");
|
||||
}
|
||||
#[cfg(test)]
|
||||
Event::Dump(sender) => {
|
||||
let _ = sender.send(self.failure_detector.dump());
|
||||
@@ -906,6 +929,7 @@ pub(crate) mod tests {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
|
||||
let ticker = RegionSupervisorTicker {
|
||||
tick_handle: Mutex::new(None),
|
||||
initialization_handler: Mutex::new(None),
|
||||
tick_interval: Duration::from_millis(10),
|
||||
initialization_delay: Duration::from_millis(100),
|
||||
initialization_retry_period: Duration::from_millis(100),
|
||||
@@ -932,6 +956,7 @@ pub(crate) mod tests {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
|
||||
let ticker = RegionSupervisorTicker {
|
||||
tick_handle: Mutex::new(None),
|
||||
initialization_handler: Mutex::new(None),
|
||||
tick_interval: Duration::from_millis(1000),
|
||||
initialization_delay: Duration::from_millis(50),
|
||||
initialization_retry_period: Duration::from_millis(50),
|
||||
|
||||
@@ -79,6 +79,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
let res = handler_group
|
||||
.handle(req, ctx.clone())
|
||||
.await
|
||||
.inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", ))
|
||||
.map_err(|e| e.into());
|
||||
|
||||
is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());
|
||||
|
||||
@@ -75,6 +75,12 @@ impl State {
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns true if the current state is a leader.
|
||||
pub fn is_leader(&self) -> bool {
|
||||
matches!(self, State::Leader(_))
|
||||
}
|
||||
|
||||
/// Returns true if the leader cache is enabled.
|
||||
pub fn enable_leader_cache(&self) -> bool {
|
||||
match &self {
|
||||
State::Leader(leader) => leader.enable_leader_cache,
|
||||
|
||||
Reference in New Issue
Block a user