chore: add tests for election reset and region lease failure handling (#7266)

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-11-20 20:06:51 +08:00
committed by GitHub
parent 7f1da17150
commit 01ea7e1468
4 changed files with 188 additions and 6 deletions

View File

@@ -1651,6 +1651,41 @@ mod tests {
drop_table(&leader_mysql_election.client, table_name).await;
}
#[tokio::test]
async fn test_reset_campaign() {
maybe_skip_mysql_integration_test!();
common_telemetry::init_default_ut_logging();
let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_reset_campaign_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let meta_lease_ttl = Duration::from_secs(2);
let execution_timeout = Duration::from_secs(10);
let idle_session_timeout = Duration::from_secs(0);
let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
.await
.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_mysql_election = MySqlElection {
leader_value,
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(table_name).build(),
};
leader_mysql_election
.is_leader
.store(true, Ordering::Relaxed);
leader_mysql_election.reset_campaign().await;
assert!(!leader_mysql_election.is_leader());
drop_table(&leader_mysql_election.client, table_name).await;
}
#[tokio::test]
async fn test_follower_action() {
maybe_skip_mysql_integration_test!();

View File

@@ -1582,6 +1582,44 @@ mod tests {
drop_table(&follower_pg_election, table_name).await;
}
#[tokio::test]
async fn test_reset_campaign() {
maybe_skip_postgres_integration_test!();
let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_reset_campaign_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value,
pg_client: RwLock::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
};
leader_pg_election.is_leader.store(true, Ordering::Relaxed);
leader_pg_election.reset_campaign().await;
assert!(!leader_pg_election.is_leader());
drop_table(&leader_pg_election, table_name).await;
}
#[tokio::test]
async fn test_idle_session_timeout() {
maybe_skip_postgres_integration_test!();

View File

@@ -129,18 +129,27 @@ impl HeartbeatHandler for RegionLeaseHandler {
#[cfg(test)]
mod test {
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::distributed_time_constants;
use common_meta::error::Result as MetaResult;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
@@ -415,4 +424,102 @@ mod test {
assert_eq!(granted, expected);
}
struct MockKvBackend;
#[async_trait::async_trait]
impl TxnService for MockKvBackend {
type Error = common_meta::error::Error;
async fn txn(&self, _txn: Txn) -> MetaResult<TxnResponse> {
unimplemented!()
}
fn max_txn_ops(&self) -> usize {
unimplemented!()
}
}
#[async_trait::async_trait]
impl KvBackend for MockKvBackend {
fn name(&self) -> &str {
"mock_kv_backend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, _req: RangeRequest) -> MetaResult<RangeResponse> {
unimplemented!()
}
async fn put(&self, _req: PutRequest) -> MetaResult<PutResponse> {
unimplemented!()
}
async fn batch_put(&self, _req: BatchPutRequest) -> MetaResult<BatchPutResponse> {
unimplemented!()
}
async fn batch_get(&self, _req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
unimplemented!()
}
async fn batch_delete(&self, _req: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
unimplemented!()
}
}
#[tokio::test]
async fn test_handle_renew_region_lease_failure() {
common_telemetry::init_default_ut_logging();
let kvbackend = Arc::new(MockKvBackend);
let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend));
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let another_region_id = RegionId::new(table_id, region_number + 1);
let no_exist_region_id = RegionId::new(table_id, region_number + 2);
let peer = Peer::empty(datanode_id);
let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let ctx = &mut metasrv.new_ctx();
let req = HeartbeatRequest {
duration_since_epoch: 1234,
..Default::default()
};
let acc = &mut HeartbeatAccumulator::default();
acc.stat = Some(Stat {
id: peer.id,
region_stats: vec![
new_empty_region_stat(region_id, RegionRole::Leader),
new_empty_region_stat(another_region_id, RegionRole::Leader),
new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
],
..Default::default()
});
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
Default::default(),
None,
);
handler.handle(&req, ctx, acc).await.unwrap();
assert!(acc.region_lease.is_none());
assert!(acc.inactive_region_ids.is_empty());
}
}

View File

@@ -131,7 +131,7 @@ pub struct RegionSupervisorTicker {
tick_handle: Mutex<Option<JoinHandle<()>>>,
/// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
initialization_handler: Mutex<Option<JoinHandle<()>>>,
initialization_handle: Mutex<Option<JoinHandle<()>>>,
/// The interval of tick.
tick_interval: Duration,
@@ -176,7 +176,7 @@ impl RegionSupervisorTicker {
);
Self {
tick_handle: Mutex::new(None),
initialization_handler: Mutex::new(None),
initialization_handle: Mutex::new(None),
tick_interval,
initialization_delay,
initialization_retry_period,
@@ -213,7 +213,7 @@ impl RegionSupervisorTicker {
}
}
});
*self.initialization_handler.lock().unwrap() = Some(initialization_handler);
*self.initialization_handle.lock().unwrap() = Some(initialization_handler);
let sender = self.sender.clone();
let ticker_loop = tokio::spawn(async move {
@@ -243,7 +243,7 @@ impl RegionSupervisorTicker {
handle.abort();
info!("The tick loop is stopped.");
}
let initialization_handler = self.initialization_handler.lock().unwrap().take();
let initialization_handler = self.initialization_handle.lock().unwrap().take();
if let Some(initialization_handler) = initialization_handler {
initialization_handler.abort();
info!("The initialization loop is stopped.");
@@ -929,7 +929,7 @@ pub(crate) mod tests {
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let ticker = RegionSupervisorTicker {
tick_handle: Mutex::new(None),
initialization_handler: Mutex::new(None),
initialization_handle: Mutex::new(None),
tick_interval: Duration::from_millis(10),
initialization_delay: Duration::from_millis(100),
initialization_retry_period: Duration::from_millis(100),
@@ -947,6 +947,8 @@ pub(crate) mod tests {
Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
);
}
assert!(ticker.initialization_handle.lock().unwrap().is_none());
assert!(ticker.tick_handle.lock().unwrap().is_none());
}
}
@@ -956,7 +958,7 @@ pub(crate) mod tests {
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let ticker = RegionSupervisorTicker {
tick_handle: Mutex::new(None),
initialization_handler: Mutex::new(None),
initialization_handle: Mutex::new(None),
tick_interval: Duration::from_millis(1000),
initialization_delay: Duration::from_millis(50),
initialization_retry_period: Duration::from_millis(50),