chore: pick #7199 and #7266 to release/v0.15 (#7267)

* fix: correct leader state reset and region migration locking consistency (#7199)

* fix(meta): remove table route cache in region migration ctx

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fix clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix campaign reset not clearing leader state-s

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: gracefully handle region lease renewal errors

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add tests for election reset and region lease failure handling (#7266)

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-11-21 11:25:44 +08:00
committed by GitHub
parent 50eaa3c80a
commit 40bd6ef79e
11 changed files with 299 additions and 324 deletions

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
use common_telemetry::{error, warn};
use common_telemetry::{error, info, warn};
use common_time::Timestamp;
use snafu::{ensure, OptionExt, ResultExt};
use sqlx::mysql::{MySqlArguments, MySqlRow};
@@ -645,6 +645,13 @@ impl Election for MySqlElection {
}
async fn reset_campaign(&self) {
info!("Resetting campaign");
if self.is_leader.load(Ordering::Relaxed) {
if let Err(err) = self.step_down_without_lock().await {
error!(err; "Failed to step down without lock");
}
info!("Step down without lock successfully, due to reset campaign");
}
if let Err(err) = self.client.lock().await.reset_client().await {
error!(err; "Failed to reset client");
}
@@ -1639,6 +1646,41 @@ mod tests {
drop_table(&leader_mysql_election.client, table_name).await;
}
#[tokio::test]
async fn test_reset_campaign() {
maybe_skip_mysql_integration_test!();
common_telemetry::init_default_ut_logging();
let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_reset_campaign_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let meta_lease_ttl = Duration::from_secs(2);
let execution_timeout = Duration::from_secs(10);
let idle_session_timeout = Duration::from_secs(0);
let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
.await
.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_mysql_election = MySqlElection {
leader_value,
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(table_name).build(),
};
leader_mysql_election
.is_leader
.store(true, Ordering::Relaxed);
leader_mysql_election.reset_campaign().await;
assert!(!leader_mysql_election.is_leader());
drop_table(&leader_mysql_election.client, table_name).await;
}
#[tokio::test]
async fn test_follower_action() {
maybe_skip_mysql_integration_test!();

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
use common_telemetry::{error, warn};
use common_telemetry::{error, info, warn};
use common_time::Timestamp;
use deadpool_postgres::{Manager, Pool};
use snafu::{ensure, OptionExt, ResultExt};
@@ -454,6 +454,13 @@ impl Election for PgElection {
}
async fn reset_campaign(&self) {
info!("Resetting campaign");
if self.is_leader.load(Ordering::Relaxed) {
if let Err(err) = self.step_down_without_lock().await {
error!(err; "Failed to step down without lock");
}
info!("Step down without lock successfully, due to reset campaign");
}
if let Err(err) = self.pg_client.write().await.reset_client().await {
error!(err; "Failed to reset client");
}
@@ -749,18 +756,12 @@ impl PgElection {
key: key.clone(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
send_leader_change_and_set_flags(
&self.is_leader,
&self.leader_infancy,
&self.leader_watcher,
LeaderChangeMessage::StepDown(Arc::new(leader_key)),
);
Ok(())
}
@@ -1551,6 +1552,44 @@ mod tests {
drop_table(&follower_pg_election, table_name).await;
}
#[tokio::test]
async fn test_reset_campaign() {
maybe_skip_postgres_integration_test!();
let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_reset_campaign_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value,
pg_client: RwLock::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28321, table_name).build(),
};
leader_pg_election.is_leader.store(true, Ordering::Relaxed);
leader_pg_election.reset_campaign().await;
assert!(!leader_pg_election.is_leader());
drop_table(&leader_pg_election, table_name).await;
}
#[tokio::test]
async fn test_idle_session_timeout() {
maybe_skip_postgres_integration_test!();

View File

@@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_telemetry::error;
use store_api::region_engine::GrantedRegion;
use store_api::storage::RegionId;
@@ -83,36 +84,44 @@ impl HeartbeatHandler for RegionLeaseHandler {
let regions = stat.regions();
let datanode_id = stat.id;
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = self
match self
.region_lease_keeper
.renew_region_leases(datanode_id, &regions)
.await?;
.await
{
Ok(RenewRegionLeasesResponse {
non_exists,
renewed,
}) => {
let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
renewer
.renew(ctx, renewed)
.into_iter()
.map(|region| region.into())
.collect()
} else {
renewed
.into_iter()
.map(|(region_id, region_lease_info)| {
GrantedRegion::new(region_id, region_lease_info.role).into()
})
.collect::<Vec<_>>()
};
let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
renewer
.renew(ctx, renewed)
.into_iter()
.map(|region| region.into())
.collect()
} else {
renewed
.into_iter()
.map(|(region_id, region_lease_info)| {
GrantedRegion::new(region_id, region_lease_info.role).into()
})
.collect::<Vec<_>>()
};
acc.region_lease = Some(RegionLease {
regions: renewed,
duration_since_epoch: req.duration_since_epoch,
lease_seconds: self.region_lease_seconds,
closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
});
acc.inactive_region_ids = non_exists;
acc.region_lease = Some(RegionLease {
regions: renewed,
duration_since_epoch: req.duration_since_epoch,
lease_seconds: self.region_lease_seconds,
closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
});
acc.inactive_region_ids = non_exists;
}
Err(e) => {
error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions);
// If we throw error here, the datanode will be marked as failure by region failure handler.
// So we only log the error and continue.
}
}
Ok(HandleControl::Continue)
}
@@ -120,18 +129,27 @@ impl HeartbeatHandler for RegionLeaseHandler {
#[cfg(test)]
mod test {
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::distributed_time_constants;
use common_meta::error::Result as MetaResult;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
@@ -404,4 +422,102 @@ mod test {
assert_eq!(granted, expected);
}
struct MockKvBackend;
#[async_trait::async_trait]
impl TxnService for MockKvBackend {
type Error = common_meta::error::Error;
async fn txn(&self, _txn: Txn) -> MetaResult<TxnResponse> {
unimplemented!()
}
fn max_txn_ops(&self) -> usize {
unimplemented!()
}
}
#[async_trait::async_trait]
impl KvBackend for MockKvBackend {
fn name(&self) -> &str {
"mock_kv_backend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, _req: RangeRequest) -> MetaResult<RangeResponse> {
unimplemented!()
}
async fn put(&self, _req: PutRequest) -> MetaResult<PutResponse> {
unimplemented!()
}
async fn batch_put(&self, _req: BatchPutRequest) -> MetaResult<BatchPutResponse> {
unimplemented!()
}
async fn batch_get(&self, _req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
unimplemented!()
}
async fn batch_delete(&self, _req: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
unimplemented!()
}
}
#[tokio::test]
async fn test_handle_renew_region_lease_failure() {
common_telemetry::init_default_ut_logging();
let kvbackend = Arc::new(MockKvBackend);
let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend));
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let another_region_id = RegionId::new(table_id, region_number + 1);
let no_exist_region_id = RegionId::new(table_id, region_number + 2);
let peer = Peer::empty(datanode_id);
let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let ctx = &mut metasrv.new_ctx();
let req = HeartbeatRequest {
duration_since_epoch: 1234,
..Default::default()
};
let acc = &mut HeartbeatAccumulator::default();
acc.stat = Some(Stat {
id: peer.id,
region_stats: vec![
new_empty_region_stat(region_id, RegionRole::Leader),
new_empty_region_stat(another_region_id, RegionRole::Leader),
new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
],
..Default::default()
});
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
Default::default(),
None,
);
handler.handle(&req, ctx, acc).await.unwrap();
assert!(acc.region_lease.is_none());
assert!(acc.inactive_region_ids.is_empty());
}
}

View File

@@ -345,7 +345,8 @@ impl MetasrvBuilder {
region_migration_manager.clone(),
runtime_switch_manager.clone(),
peer_lookup_service.clone(),
);
)
.with_state(state.clone());
Some(RegionFailureHandler::new(
region_supervisor,

View File

@@ -38,7 +38,7 @@ use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_procedure::error::{
@@ -215,8 +215,6 @@ pub struct VolatileContext {
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
opening_region_guard: Option<OperatingRegionGuard>,
/// `table_route` is stored via previous steps for future use.
table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
/// `datanode_table` is stored via previous steps for future use.
from_peer_datanode_table: Option<DatanodeTableValue>,
/// `table_info` is stored via previous steps for future use.
@@ -383,29 +381,23 @@ impl Context {
/// Retry:
/// - Failed to retrieve the metadata of table.
pub async fn get_table_route_value(
&mut self,
) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
let table_route_value = &mut self.volatile_ctx.table_route;
&self,
) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
let table_id = self.persistent_ctx.region_id.table_id();
let table_route = self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;
if table_route_value.is_none() {
let table_id = self.persistent_ctx.region_id.table_id();
let table_route = self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;
*table_route_value = Some(table_route);
}
Ok(table_route_value.as_ref().unwrap())
Ok(table_route)
}
/// Notifies the RegionSupervisor to register failure detectors of failed region.
@@ -447,12 +439,6 @@ impl Context {
.await;
}
/// Removes the `table_route` of [VolatileContext], returns true if any.
pub fn remove_table_route_value(&mut self) -> bool {
let value = self.volatile_ctx.table_route.take();
value.is_some()
}
/// Returns the `table_info` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
@@ -627,14 +613,13 @@ impl RegionMigrationProcedure {
})
}
async fn rollback_inner(&mut self) -> Result<()> {
async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&["rollback"])
.start_timer();
let table_id = self.context.region_id().table_id();
let region_id = self.context.region_id();
self.context.remove_table_route_value();
let table_metadata_manager = self.context.table_metadata_manager.clone();
let table_route = self.context.get_table_route_value().await?;
@@ -647,9 +632,11 @@ impl RegionMigrationProcedure {
.any(|route| route.is_leader_downgrading());
if downgraded {
let table_lock = TableLock::Write(region_id.table_id()).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
info!("Rollbacking downgraded region leader table route, region: {region_id}");
table_metadata_manager
.update_leader_region_status(table_id, table_route, |route| {
.update_leader_region_status(table_id, &table_route, |route| {
if route.region.id == region_id {
Some(None)
} else {
@@ -676,8 +663,8 @@ impl Procedure for RegionMigrationProcedure {
Self::TYPE_NAME
}
async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner()
async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner(ctx)
.await
.map_err(ProcedureError::external)
}

View File

@@ -46,7 +46,7 @@ impl UpdateMetadata {
// TODO(weny): ensures the leader region peer is the `from_peer`.
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, current_table_route_value, |route| {
.update_leader_region_status(table_id, &current_table_route_value, |route| {
if route.region.id == region_id
&& route
.leader_peer
@@ -61,7 +61,6 @@ impl UpdateMetadata {
.await
.context(error::TableMetadataManagerSnafu)
{
ctx.remove_table_route_value();
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the downgrading leader region, region_id: {region_id}, from_peer_id: {from_peer_id}"
@@ -69,8 +68,6 @@ impl UpdateMetadata {
});
}
ctx.remove_table_route_value();
Ok(())
}
}
@@ -81,7 +78,7 @@ mod tests {
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use crate::error::Error;
@@ -115,63 +112,6 @@ mod tests {
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_failed_to_update_table_route_error() {
let state = UpdateMetadata::Downgrade;
let persistent_context = new_persistent_context();
let from_peer = persistent_context.from_peer.clone();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.region_id().table_id();
let table_info = new_test_table_info(1024, vec![1, 2]).into();
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(from_peer.clone()),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(4)),
..Default::default()
},
];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let original_table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();
// modifies the table route.
table_metadata_manager
.update_leader_region_status(table_id, &original_table_route, |route| {
if route.region.id == RegionId::new(1024, 2) {
Some(Some(LeaderState::Downgrading))
} else {
None
}
})
.await
.unwrap();
// sets the old table route.
ctx.volatile_ctx.table_route = Some(original_table_route);
let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
assert!(ctx.volatile_ctx.table_route.is_none());
assert!(err.is_retryable());
assert!(format!("{err:?}").contains("Failed to update the table route"));
}
#[tokio::test]
async fn test_only_downgrade_from_peer() {
let mut state = Box::new(UpdateMetadata::Downgrade);
@@ -212,7 +152,6 @@ mod tests {
// It should remain unchanged.
assert_eq!(latest_table_route.version().unwrap(), 0);
assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
assert!(ctx.volatile_ctx.table_route.is_none());
}
#[tokio::test]
@@ -254,6 +193,5 @@ mod tests {
.unwrap();
assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
assert!(ctx.volatile_ctx.table_route.is_none());
}
}

View File

@@ -35,7 +35,7 @@ impl UpdateMetadata {
let current_table_route_value = ctx.get_table_route_value().await?;
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, current_table_route_value, |route| {
.update_leader_region_status(table_id, &current_table_route_value, |route| {
if route.region.id == region_id {
Some(None)
} else {
@@ -45,14 +45,12 @@ impl UpdateMetadata {
.await
.context(error::TableMetadataManagerSnafu)
{
ctx.remove_table_route_value();
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
});
}
ctx.register_failure_detectors().await;
ctx.remove_table_route_value();
Ok(())
}
@@ -61,7 +59,6 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -73,7 +70,6 @@ mod tests {
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
use crate::region::supervisor::RegionFailureDetectorControl;
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
@@ -93,101 +89,6 @@ mod tests {
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_update_table_route_with_retry() {
let state = UpdateMetadata::Rollback;
let persistent_context = new_persistent_context();
let from_peer = persistent_context.from_peer.clone();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx));
let table_id = ctx.region_id().table_id();
let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(from_peer.clone()),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(4)),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 3)),
leader_peer: Some(Peer::empty(5)),
..Default::default()
},
];
let expected_region_routes = {
let mut region_routes = region_routes.clone();
region_routes[0].leader_state = None;
region_routes[1].leader_state = None;
region_routes
};
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let old_table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();
// modifies the table route.
table_metadata_manager
.update_leader_region_status(table_id, &old_table_route, |route| {
if route.region.id == RegionId::new(1024, 2) {
Some(None)
} else {
None
}
})
.await
.unwrap();
ctx.volatile_ctx.table_route = Some(old_table_route);
let err = state
.rollback_downgraded_region(&mut ctx)
.await
.unwrap_err();
assert!(ctx.volatile_ctx.table_route.is_none());
assert!(err.is_retryable());
assert!(format!("{err:?}").contains("Failed to update the table route"));
assert_eq!(rx.len(), 0);
state.rollback_downgraded_region(&mut ctx).await.unwrap();
let event = rx.try_recv().unwrap();
let detecting_regions = event.into_region_failure_detectors();
assert_eq!(
detecting_regions,
vec![(from_peer.id, ctx.persistent_ctx.region_id)]
);
let table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()
.unwrap();
assert_eq!(
&expected_region_routes,
table_route.region_routes().unwrap()
);
}
#[tokio::test]
async fn test_next_migration_end_state() {
let mut state = Box::new(UpdateMetadata::Rollback);
@@ -238,8 +139,6 @@ mod tests {
.downcast_ref::<RegionMigrationAbort>()
.unwrap();
assert!(ctx.volatile_ctx.table_route.is_none());
let table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()

View File

@@ -160,7 +160,7 @@ impl UpdateMetadata {
region_options: region_options.clone(),
region_wal_options: region_wal_options.clone(),
},
table_route_value,
&table_route_value,
region_routes,
&region_options,
&region_wal_options,
@@ -168,13 +168,11 @@ impl UpdateMetadata {
.await
.context(error::TableMetadataManagerSnafu)
{
ctx.remove_table_route_value();
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"),
});
};
ctx.remove_table_route_value();
ctx.deregister_failure_detectors().await;
// Consumes the guard.
ctx.volatile_ctx.opening_region_guard.take();
@@ -304,71 +302,6 @@ mod tests {
assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
}
#[tokio::test]
async fn test_failed_to_update_table_route_error() {
let state = UpdateMetadata::Upgrade;
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let opening_keeper = MemoryRegionKeeper::default();
let table_id = 1024;
let table_info = new_test_table_info(table_id, vec![1]).into();
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5), Peer::empty(3)],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: Some(current_time_millis()),
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(4)),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let original_table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();
// modifies the table route.
table_metadata_manager
.update_leader_region_status(table_id, &original_table_route, |route| {
if route.region.id == RegionId::new(1024, 2) {
// Removes the status.
Some(None)
} else {
None
}
})
.await
.unwrap();
// sets the old table route.
ctx.volatile_ctx.table_route = Some(original_table_route);
let guard = opening_keeper
.register(2, RegionId::new(table_id, 1))
.unwrap();
ctx.volatile_ctx.opening_region_guard = Some(guard);
let err = state.upgrade_candidate_region(&mut ctx).await.unwrap_err();
assert!(ctx.volatile_ctx.table_route.is_none());
assert!(ctx.volatile_ctx.opening_region_guard.is_some());
assert!(err.is_retryable());
assert!(format!("{err:?}").contains("Failed to update the table route"));
}
#[tokio::test]
async fn test_check_metadata() {
let state = UpdateMetadata::Upgrade;
@@ -486,7 +419,6 @@ mod tests {
.unwrap();
let region_routes = table_route.region_routes().unwrap();
assert!(ctx.volatile_ctx.table_route.is_none());
assert!(ctx.volatile_ctx.opening_region_guard.is_none());
assert_eq!(region_routes.len(), 1);
assert!(!region_routes[0].is_leader_downgrading());

View File

@@ -42,6 +42,7 @@ use crate::procedure::region_migration::{
};
use crate::region::failure_detector::RegionFailureDetector;
use crate::selector::SelectorOptions;
use crate::state::StateRef;
/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
@@ -86,16 +87,6 @@ pub(crate) enum Event {
Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
}
#[cfg(test)]
impl Event {
pub(crate) fn into_region_failure_detectors(self) -> Vec<DetectingRegion> {
match self {
Self::RegisterFailureDetectors(detecting_regions) => detecting_regions,
_ => unreachable!(),
}
}
}
impl Debug for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@@ -228,6 +219,8 @@ pub struct RegionSupervisor {
runtime_switch_manager: RuntimeSwitchManagerRef,
/// Peer lookup service
peer_lookup: PeerLookupServiceRef,
/// The meta state, used to check if the current metasrv is the leader.
state: Option<StateRef>,
}
/// Controller for managing failure detectors for regions.
@@ -308,12 +301,29 @@ impl RegionSupervisor {
region_migration_manager,
runtime_switch_manager,
peer_lookup,
state: None,
}
}
/// Sets the meta state.
pub(crate) fn with_state(mut self, state: StateRef) -> Self {
self.state = Some(state);
self
}
/// Runs the main loop.
pub(crate) async fn run(&mut self) {
while let Some(event) = self.receiver.recv().await {
if let Some(state) = self.state.as_ref()
&& !state.read().unwrap().is_leader()
{
warn!(
"The current metasrv is not the leader, ignore {:?} event",
event
);
continue;
}
match event {
Event::Tick => {
let regions = self.detect_region_failure();
@@ -326,7 +336,10 @@ impl RegionSupervisor {
self.deregister_failure_detectors(detecting_regions).await
}
Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
Event::Clear => self.clear(),
Event::Clear => {
self.clear();
info!("Region supervisor is initialized.");
}
#[cfg(test)]
Event::Dump(sender) => {
let _ = sender.send(self.failure_detector.dump());
@@ -759,6 +772,7 @@ pub(crate) mod tests {
while let Ok(event) = rx.try_recv() {
assert_matches!(event, Event::Tick | Event::Clear);
}
assert!(ticker.tick_handle.lock().unwrap().is_none());
}
}

View File

@@ -79,6 +79,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
let res = handler_group
.handle(req, ctx.clone())
.await
.inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", ))
.map_err(|e| e.into());
is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());

View File

@@ -75,6 +75,12 @@ impl State {
})
}
/// Returns true if the current state is a leader.
pub fn is_leader(&self) -> bool {
matches!(self, State::Leader(_))
}
/// Returns true if the leader cache is enabled.
pub fn enable_leader_cache(&self) -> bool {
match &self {
State::Leader(leader) => leader.enable_leader_cache,