chore: ut and some fix (#5752)

* chore: ut and some fix

* fix: remove NOWAIT

* refactor: use param for meta lease ttl

* chore: feature gate

* chore: add comments

* chore: apply comments

* fix: advice by claude 3.7 sonnet

* chore: apply comments
This commit is contained in:
Yuhan Wang
2025-03-24 17:05:06 +08:00
committed by GitHub
parent c4ac242c69
commit a36901a653
3 changed files with 738 additions and 89 deletions

View File

@@ -20,6 +20,8 @@ use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_config::Configurable;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use common_meta::distributed_time_constants::META_LEASE_SECS;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -249,6 +251,7 @@ pub async fn metasrv_builder(
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
META_LEASE_SECS,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
@@ -270,6 +273,7 @@ pub async fn metasrv_builder(
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
META_LEASE_SECS,
&election_table_name,
)
.await?;

View File

@@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use itertools::Itertools;
@@ -25,7 +24,7 @@ use sqlx::mysql::{MySqlArguments, MySqlRow};
use sqlx::query::Query;
use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row};
use tokio::sync::{broadcast, Mutex, MutexGuard};
use tokio::time::{Interval, MissedTickBehavior};
use tokio::time::MissedTickBehavior;
use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
@@ -41,7 +40,7 @@ const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
/// Lease information.
/// TODO(CookiePie): PgElection can also use this struct. Refactor it to a common module.
#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
struct Lease {
leader_value: String,
expire_time: Timestamp,
@@ -52,6 +51,7 @@ struct Lease {
struct ElectionSqlFactory<'a> {
table_name: &'a str,
meta_lease_ttl_secs: u64,
}
struct ElectionSqlSet {
@@ -99,8 +99,11 @@ struct ElectionSqlSet {
}
impl<'a> ElectionSqlFactory<'a> {
fn new(table_name: &'a str) -> Self {
Self { table_name }
fn new(table_name: &'a str, meta_lease_ttl_secs: u64) -> Self {
Self {
table_name,
meta_lease_ttl_secs,
}
}
fn build(self) -> ElectionSqlSet {
@@ -117,7 +120,10 @@ impl<'a> ElectionSqlFactory<'a> {
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!("SET SESSION wait_timeout = {};", META_LEASE_SECS + 1)
format!(
"SET SESSION wait_timeout = {};",
self.meta_lease_ttl_secs + 1
)
}
fn set_lock_wait_timeout_sql(&self) -> &str {
@@ -147,6 +153,8 @@ impl<'a> ElectionSqlFactory<'a> {
"SELECT @@version;"
}
/// Use `SELECT FOR UPDATE` to lock for compatibility with other MySQL-compatible databases
/// instead of directly using `GET_LOCK`.
fn campaign_sql(&self) -> String {
format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
}
@@ -315,6 +323,7 @@ pub struct MySqlElection {
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
sql_set: ElectionSqlSet,
}
@@ -324,9 +333,10 @@ impl MySqlElection {
mut client: sqlx::MySqlConnection,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
table_name: &str,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(table_name);
let sql_factory = ElectionSqlFactory::new(table_name, meta_lease_ttl_secs);
sqlx::query(&sql_factory.create_table_sql())
.execute(&mut client)
.await
@@ -365,6 +375,7 @@ impl MySqlElection {
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
meta_lease_ttl_secs,
sql_set: sql_factory.build(),
}))
}
@@ -452,8 +463,14 @@ impl Election for MySqlElection {
}
);
self.update_value_with_lease(&key, &lease.origin, &node_info, &mut executor)
.await?;
self.update_value_with_lease(
&key,
&lease.origin,
&node_info,
self.candidate_lease_ttl_secs,
&mut executor,
)
.await?;
std::mem::drop(executor);
}
}
@@ -480,10 +497,11 @@ impl Election for MySqlElection {
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let _ = self.do_campaign(&mut keep_alive_interval).await;
let _ = self.do_campaign().await;
keep_alive_interval.tick().await;
}
}
@@ -514,7 +532,7 @@ impl Election for MySqlElection {
}
impl MySqlElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
/// Returns value, expire time and current time.
async fn get_value_with_lease(
&self,
key: &str,
@@ -587,6 +605,7 @@ impl MySqlElection {
key: &str,
prev: &str,
updated: &str,
lease_ttl: u64,
executor: &mut Executor<'_>,
) -> Result<()> {
let key = key.as_bytes();
@@ -595,7 +614,7 @@ impl MySqlElection {
let query = sqlx::query(&self.sql_set.update_value_with_lease)
.bind(updated)
.bind(self.candidate_lease_ttl_secs as f64)
.bind(lease_ttl as f64)
.bind(key)
.bind(prev);
let res = executor
@@ -627,9 +646,9 @@ impl MySqlElection {
.bind(value)
.bind(lease_ttl_secs);
let res = executor
.query(query, &self.sql_set.put_value_with_lease)
.execute(query, &self.sql_set.put_value_with_lease)
.await?;
Ok(res.is_empty())
Ok(res == 1)
}
/// Returns `true` if the deletion is successful.
@@ -644,62 +663,82 @@ impl MySqlElection {
/// Attempts to acquire leadership by executing a campaign. This function continuously checks
/// if the current lease is still valid.
async fn do_campaign(&self, interval: &mut Interval) -> Result<()> {
async fn do_campaign(&self) -> Result<()> {
// Need to restrict the scope of the client to avoid ambiguous overloads.
use sqlx::Acquire;
loop {
let client = self.client.lock().await;
let executor = Executor::Default(client);
let mut lease = Lease::default();
match (
self.lease_check(executor, &mut lease).await,
self.is_leader(),
) {
// If the leader lease is valid and I'm the leader, renew the lease.
(Ok(_), true) => {
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.renew_lease(executor, lease).await?;
}
// If the leader lease expires and I'm the leader, notify the leader watcher and step down.
// Another instance should be elected as the leader in this case.
(Err(_), true) => {
warn!("Leader lease expired, re-initiate the campaign");
self.step_down_without_lock().await?;
}
// If the leader lease expires and I'm not the leader, elect myself.
(Err(_), false) => {
warn!("Leader lease expired, re-initiate the campaign");
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.elected(&mut executor).await?;
executor.commit().await?;
}
// If the leader lease is valid and I'm not the leader, do nothing.
(Ok(_), false) => {}
let client = self.client.lock().await;
let executor = Executor::Default(client);
let mut lease = Lease::default();
match (
self.lease_check(executor, &mut lease).await,
self.is_leader(),
self.leader_value == lease.leader_value,
) {
// If the leader lease is valid and I'm the leader, renew the lease.
(Ok(_), true, true) => {
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.renew_lease(executor, lease).await?;
}
interval.tick().await;
// If the leader lease expires and I'm the leader, notify the leader watcher and step down.
// Another instance should be elected as the leader in this case.
(Err(_), true, _) | (Ok(_), true, false) => {
warn!("Leader lease expired, step down...");
self.step_down_without_lock().await?;
}
// If the leader lease expires and I'm not the leader, elect myself.
(Err(_), false, _) => {
warn!("Leader lease expired, elected.");
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.elected(&mut executor).await?;
executor.commit().await?;
}
// If the leader lease is valid and I'm the leader, but I don't think I'm the leader.
// Just re-elect myself.
(Ok(_), false, true) => {
warn!("I should be the leader, but I don't think so. Something went wrong.");
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.elected(&mut executor).await?;
executor.commit().await?;
}
// If the leader lease is valid and I'm not the leader, do nothing.
(Ok(_), false, false) => {}
}
Ok(())
}
/// Renew the lease
async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
let key = self.election_key();
self.update_value_with_lease(&key, &lease.origin, &self.leader_value, &mut executor)
.await?;
self.update_value_with_lease(
&key,
&lease.origin,
&self.leader_value,
self.meta_lease_ttl_secs,
&mut executor,
)
.await?;
executor.commit().await?;
Ok(())
}
@@ -758,7 +797,7 @@ impl MySqlElection {
..Default::default()
};
self.delete_value(&key, executor).await?;
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS, executor)
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs, executor)
.await?;
if self
@@ -784,7 +823,7 @@ impl MySqlElection {
match query.fetch_one(client).await {
Ok(row) => {
let version: String = row.try_get(0).unwrap();
if !version.starts_with("8.0") || !version.starts_with("5.7") {
if !version.starts_with("8.0") && !version.starts_with("5.7") {
warn!(
"Unsupported MySQL version: {}, expected: [5.7, 8.0]",
version
@@ -798,3 +837,589 @@ impl MySqlElection {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::env;
use common_telemetry::init_default_ut_logging;
use sqlx::Connection;
use super::*;
use crate::error::MySqlExecutionSnafu;
async fn create_mysql_client(table_name: Option<&str>) -> Result<Mutex<MySqlConnection>> {
init_default_ut_logging();
let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
if endpoint.is_empty() {
return UnexpectedSnafu {
violated: "MySQL endpoint is empty".to_string(),
}
.fail();
}
let mut client = MySqlConnection::connect(&endpoint).await.unwrap();
if let Some(table_name) = table_name {
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS {}(k VARCHAR(255) PRIMARY KEY, v BLOB);",
table_name
);
sqlx::query(&create_table_sql)
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: create_table_sql,
})?;
}
Ok(Mutex::new(client))
}
async fn drop_table(client: &Mutex<MySqlConnection>, table_name: &str) {
let mut client = client.lock().await;
let sql = format!("DROP TABLE IF EXISTS {};", table_name);
sqlx::query(&sql)
.execute(&mut *client)
.await
.context(MySqlExecutionSnafu { sql })
.unwrap();
}
#[tokio::test]
async fn test_mysql_crud() {
let key = "test_key".to_string();
let value = "test_value".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_mysql_crud_greptime_metakv";
let client = create_mysql_client(Some(table_name)).await.unwrap();
{
let mut a = client.lock().await;
let txn = a.begin().await.unwrap();
let mut executor = Executor::Txn(txn);
let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
let query = sqlx::query(&raw_query);
let _ = executor.query(query, &raw_query).await.unwrap();
}
let (tx, _) = broadcast::channel(100);
let mysql_election = MySqlElection {
leader_value: "test_leader".to_string(),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs: 10,
meta_lease_ttl_secs: 1,
sql_set: ElectionSqlFactory::new(table_name, 1).build(),
};
let client = mysql_election.client.lock().await;
let mut executor = Executor::Default(client);
let res = mysql_election
.put_value_with_lease(&key, &value, 10, &mut executor)
.await
.unwrap();
assert!(res);
let lease = mysql_election
.get_value_with_lease(&key, &mut executor)
.await
.unwrap()
.unwrap();
assert_eq!(lease.leader_value, value);
mysql_election
.update_value_with_lease(&key, &lease.origin, &value, 10, &mut executor)
.await
.unwrap();
let res = mysql_election
.delete_value(&key, &mut executor)
.await
.unwrap();
assert!(res);
let res = mysql_election
.get_value_with_lease(&key, &mut executor)
.await
.unwrap();
assert!(res.is_none());
for i in 0..10 {
let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i);
mysql_election
.put_value_with_lease(&key, &value, 10, &mut executor)
.await
.unwrap();
}
let key_prefix = "test_key".to_string();
let (res, _) = mysql_election
.get_value_with_lease_by_prefix(&key_prefix, &mut executor)
.await
.unwrap();
assert_eq!(res.len(), 10);
for i in 0..10 {
let key = format!("test_key_{}", i);
let res = mysql_election
.delete_value(&key, &mut executor)
.await
.unwrap();
assert!(res);
}
let (res, current) = mysql_election
.get_value_with_lease_by_prefix(&key_prefix, &mut executor)
.await
.unwrap();
assert!(res.is_empty());
assert!(current == Timestamp::default());
// Should drop manually.
std::mem::drop(executor);
drop_table(&mysql_election.client, table_name).await;
}
async fn candidate(
leader_value: String,
candidate_lease_ttl_secs: u64,
store_key_prefix: String,
table_name: String,
) {
let client = create_mysql_client(None).await.unwrap();
let (tx, _) = broadcast::channel(100);
let mysql_election = MySqlElection {
leader_value,
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 1,
sql_set: ElectionSqlFactory::new(&table_name, 1).build(),
};
let node_info = MetasrvNodeInfo {
addr: "test_addr".to_string(),
version: "test_version".to_string(),
git_commit: "test_git_commit".to_string(),
start_time_ms: 0,
};
mysql_election.register_candidate(&node_info).await.unwrap();
}
#[tokio::test]
async fn test_candidate_registration() {
let leader_value_prefix = "test_leader".to_string();
let candidate_lease_ttl_secs = 2;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_candidate_registration_greptime_metakv";
let mut handles = vec![];
let client = create_mysql_client(Some(table_name)).await.unwrap();
for i in 0..10 {
let leader_value = format!("{}{}", leader_value_prefix, i);
let handle = tokio::spawn(candidate(
leader_value,
candidate_lease_ttl_secs,
uuid.clone(),
table_name.to_string(),
));
handles.push(handle);
}
// Wait for candidates to registrate themselves and renew their leases at least once.
tokio::time::sleep(Duration::from_secs(candidate_lease_ttl_secs / 2 + 1)).await;
let (tx, _) = broadcast::channel(100);
let leader_value = "test_leader".to_string();
let mysql_election = MySqlElection {
leader_value,
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 1,
sql_set: ElectionSqlFactory::new(table_name, 1).build(),
};
let candidates = mysql_election.all_candidates().await.unwrap();
assert_eq!(candidates.len(), 10);
for handle in handles {
handle.abort();
}
// Wait for the candidate leases to expire.
tokio::time::sleep(Duration::from_secs(candidate_lease_ttl_secs + 1)).await;
let candidates = mysql_election.all_candidates().await.unwrap();
assert!(candidates.is_empty());
// Garbage collection
let client = mysql_election.client.lock().await;
let mut executor = Executor::Default(client);
for i in 0..10 {
let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
let res = mysql_election
.delete_value(&key, &mut executor)
.await
.unwrap();
assert!(res);
}
// Should drop manually.
std::mem::drop(executor);
drop_table(&mysql_election.client, table_name).await;
}
async fn elected(election: &MySqlElection, table_name: &str) {
let mut client = election.client.lock().await;
let txn = client.begin().await.unwrap();
let mut executor = Executor::Txn(txn);
let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
let query = sqlx::query(&raw_query);
let _ = executor.query(query, &raw_query).await.unwrap();
election.elected(&mut executor).await.unwrap();
executor.commit().await.unwrap();
}
async fn get_lease(election: &MySqlElection) -> Option<Lease> {
let client = election.client.lock().await;
let mut executor = Executor::Default(client);
election
.get_value_with_lease(&election.election_key(), &mut executor)
.await
.unwrap()
}
#[tokio::test]
async fn test_elected_and_step_down() {
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 1;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_elected_and_step_down_greptime_metakv";
let client = create_mysql_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_mysql_election = MySqlElection {
leader_value: leader_value.clone(),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 1,
sql_set: ElectionSqlFactory::new(table_name, 1).build(),
};
elected(&leader_mysql_election, table_name).await;
let lease = get_lease(&leader_mysql_election).await.unwrap();
assert_eq!(lease.leader_value, leader_value);
assert!(lease.expire_time > lease.current);
assert!(leader_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
leader_mysql_election
.step_down_without_lock()
.await
.unwrap();
let lease = get_lease(&leader_mysql_election).await.unwrap();
assert_eq!(lease.leader_value, leader_value);
assert!(!leader_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
elected(&leader_mysql_election, table_name).await;
let lease = get_lease(&leader_mysql_election).await.unwrap();
assert_eq!(lease.leader_value, leader_value);
assert!(lease.expire_time > lease.current);
assert!(leader_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
drop_table(&leader_mysql_election.client, table_name).await;
}
#[tokio::test]
async fn test_campaign() {
let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_leader_action_greptime_metakv";
let candidate_lease_ttl_secs = 5;
let meta_lease_ttl_secs = 1;
let client = create_mysql_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_mysql_election = MySqlElection {
leader_value: leader_value.clone(),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs,
meta_lease_ttl_secs,
sql_set: ElectionSqlFactory::new(table_name, 1).build(),
};
// Step 1: No leader exists, campaign and elected.
leader_mysql_election.do_campaign().await.unwrap();
let lease = get_lease(&leader_mysql_election).await.unwrap();
assert_eq!(lease.leader_value, leader_value);
assert!(lease.expire_time > lease.current);
assert!(leader_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 2: As a leader, renew the lease.
leader_mysql_election.do_campaign().await.unwrap();
let new_lease = get_lease(&leader_mysql_election).await.unwrap();
assert_eq!(lease.leader_value, leader_value);
// The lease should be renewed.
assert!(new_lease.expire_time > lease.expire_time);
assert!(new_lease.expire_time > new_lease.current);
assert!(leader_mysql_election.is_leader());
// Step 3: Something wrong, the leader lease expired.
tokio::time::sleep(Duration::from_secs(meta_lease_ttl_secs + 1)).await;
leader_mysql_election.do_campaign().await.unwrap();
let lease = get_lease(&leader_mysql_election).await.unwrap();
assert_eq!(lease.leader_value, leader_value);
assert!(lease.expire_time <= lease.current);
assert!(!leader_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Step 4: Re-elect itself.
leader_mysql_election.do_campaign().await.unwrap();
let lease = get_lease(&leader_mysql_election).await.unwrap();
assert_eq!(lease.leader_value, leader_value);
assert!(lease.expire_time > lease.current);
assert!(leader_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 5: Something wrong, the leader key is deleted by other followers.
{
let client = leader_mysql_election.client.lock().await;
let mut executor = Executor::Default(client);
leader_mysql_election
.delete_value(&leader_mysql_election.election_key(), &mut executor)
.await
.unwrap();
}
leader_mysql_election.do_campaign().await.unwrap();
let res = get_lease(&leader_mysql_election).await;
assert!(res.is_none());
assert!(!leader_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Step 6: Re-elect itself.
leader_mysql_election.do_campaign().await.unwrap();
let lease = get_lease(&leader_mysql_election).await.unwrap();
assert_eq!(lease.leader_value, leader_value);
assert!(lease.expire_time > lease.current);
assert!(leader_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 7: Something wrong, the leader key changed by others.
let another_leader_key = "another_leader";
{
let client = leader_mysql_election.client.lock().await;
let mut executor = Executor::Default(client);
leader_mysql_election
.delete_value(&leader_mysql_election.election_key(), &mut executor)
.await
.unwrap();
leader_mysql_election
.put_value_with_lease(
&leader_mysql_election.election_key(),
another_leader_key,
10,
&mut executor,
)
.await
.unwrap();
}
leader_mysql_election.do_campaign().await.unwrap();
let lease = get_lease(&leader_mysql_election).await.unwrap();
// Different from pg, mysql will not delete the key, just step down.
assert_eq!(lease.leader_value, another_leader_key);
assert!(lease.expire_time > lease.current);
assert!(!leader_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
drop_table(&leader_mysql_election.client, table_name).await;
}
#[tokio::test]
async fn test_follower_action() {
common_telemetry::init_default_ut_logging();
let candidate_lease_ttl_secs = 5;
let meta_lease_ttl_secs = 1;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_follower_action_greptime_metakv";
let follower_client = create_mysql_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let follower_mysql_election = MySqlElection {
leader_value: "test_follower".to_string(),
client: follower_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs,
sql_set: ElectionSqlFactory::new(table_name, 1).build(),
};
let leader_client = create_mysql_client(Some(table_name)).await.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_mysql_election = MySqlElection {
leader_value: "test_leader".to_string(),
client: leader_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs,
meta_lease_ttl_secs,
sql_set: ElectionSqlFactory::new(table_name, 1).build(),
};
leader_mysql_election.do_campaign().await.unwrap();
// Step 1: As a follower, the leader exists and the lease is not expired. Do nothing.
follower_mysql_election.do_campaign().await.unwrap();
// Step 2: As a follower, the leader exists but the lease expired. Re-elect itself.
tokio::time::sleep(Duration::from_secs(meta_lease_ttl_secs + 1)).await;
follower_mysql_election.do_campaign().await.unwrap();
assert!(follower_mysql_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
assert_eq!(
String::from_utf8_lossy(key.key()),
follower_mysql_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
drop_table(&follower_mysql_election.client, table_name).await;
}
}

View File

@@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use itertools::Itertools;
@@ -41,6 +40,7 @@ const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
struct ElectionSqlFactory<'a> {
lock_id: u64,
table_name: &'a str,
meta_lease_ttl_secs: u64,
}
struct ElectionSqlSet {
@@ -90,10 +90,11 @@ struct ElectionSqlSet {
}
impl<'a> ElectionSqlFactory<'a> {
fn new(lock_id: u64, table_name: &'a str) -> Self {
fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self {
Self {
lock_id,
table_name,
meta_lease_ttl_secs,
}
}
@@ -112,7 +113,10 @@ impl<'a> ElectionSqlFactory<'a> {
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!("SET idle_session_timeout = '{}s';", META_LEASE_SECS + 1)
format!(
"SET idle_session_timeout = '{}s';",
self.meta_lease_ttl_secs + 1
)
}
fn campaign_sql(&self) -> String {
@@ -226,6 +230,7 @@ pub struct PgElection {
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
sql_set: ElectionSqlSet,
}
@@ -235,10 +240,11 @@ impl PgElection {
client: Client,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
table_name: &str,
lock_id: u64,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
@@ -254,6 +260,7 @@ impl PgElection {
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
meta_lease_ttl_secs,
sql_set: sql_factory.build(),
}))
}
@@ -326,7 +333,7 @@ impl Election for PgElection {
// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
let origin = origin.unwrap();
self.update_value_with_lease(&key, &origin, &node_info)
self.update_value_with_lease(&key, &origin, &node_info, self.candidate_lease_ttl_secs)
.await?;
}
}
@@ -361,7 +368,7 @@ impl Election for PgElection {
/// to perform actions as a follower.
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
@@ -489,19 +496,20 @@ impl PgElection {
Ok((values_with_leases, current))
}
async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> {
async fn update_value_with_lease(
&self,
key: &str,
prev: &str,
updated: &str,
lease_ttl: u64,
) -> Result<()> {
let key = key.as_bytes();
let prev = prev.as_bytes();
let res = self
.client
.execute(
&self.sql_set.update_value_with_lease,
&[
&key,
&prev,
&updated,
&(self.candidate_lease_ttl_secs as f64),
],
&[&key, &prev, &updated, &(lease_ttl as f64)],
)
.await
.context(PostgresExecutionSnafu)?;
@@ -578,8 +586,13 @@ impl PgElection {
(true, true) => {
// Safety: prev is Some since we are using `get_value_with_lease` with `true`.
let prev = prev.unwrap();
self.update_value_with_lease(&key, &prev, &self.leader_value)
.await?;
self.update_value_with_lease(
&key,
&prev,
&self.leader_value,
self.meta_lease_ttl_secs,
)
.await?;
}
// Case 1.2
(true, false) => {
@@ -698,7 +711,7 @@ impl PgElection {
..Default::default()
};
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS)
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs)
.await?;
if self
@@ -775,7 +788,8 @@ mod tests {
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs: 10,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
};
let res = pg_election
@@ -793,7 +807,7 @@ mod tests {
let prev = prev.unwrap();
pg_election
.update_value_with_lease(&key, &prev, &value)
.update_value_with_lease(&key, &prev, &value, pg_election.meta_lease_ttl_secs)
.await
.unwrap();
@@ -852,7 +866,8 @@ mod tests {
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(),
};
let node_info = MetasrvNodeInfo {
@@ -896,7 +911,8 @@ mod tests {
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl_secs,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
};
let candidates = pg_election.all_candidates().await.unwrap();
@@ -938,7 +954,8 @@ mod tests {
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs,
sql_set: ElectionSqlFactory::new(28320, table_name).build(),
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(),
};
leader_pg_election.elected().await.unwrap();
@@ -1050,7 +1067,8 @@ mod tests {
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs,
sql_set: ElectionSqlFactory::new(28321, table_name).build(),
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(),
};
// Step 1: No leader exists, campaign and elected.
@@ -1103,7 +1121,7 @@ mod tests {
assert!(leader_pg_election.is_leader());
// Step 3: Something wrong, the leader lease expired.
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
tokio::time::sleep(Duration::from_secs(2)).await;
let res = leader_pg_election
.client
@@ -1284,7 +1302,8 @@ mod tests {
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl_secs,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
};
let leader_client = create_postgres_client(Some(table_name)).await.unwrap();
@@ -1297,7 +1316,8 @@ mod tests {
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
};
leader_pg_election
@@ -1311,7 +1331,7 @@ mod tests {
follower_pg_election.follower_action().await.unwrap();
// Step 2: As a follower, the leader exists but the lease expired.
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(follower_pg_election.follower_action().await.is_err());
// Step 3: As a follower, the leader does not exist.