feat: add election logic for PgElection (#5249)

* feat: init PgElection

* fix: release advisory lock

* fix: handle duplicate keys

* chore: update comments

* fix: unlock if acquired the lock

* chore: add TODO and avoid unwrap

* refactor: check both lock and expire time, add more comments

* test: add unit test for pg election

* chore: fmt

* chore: typo

* fix: add feature gate

* chore: visibility

* chore: follow review comments
This commit is contained in:
Yohan Wal
2024-12-30 17:45:04 +08:00
committed by GitHub
parent 13ed10556a
commit 89f2e15ffb
4 changed files with 739 additions and 54 deletions

View File

@@ -48,6 +48,10 @@ use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::postgres::PgElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::CANDIDATE_LEASE_SECS;
#[cfg(feature = "pg_kvbackend")]
use crate::error::InvalidArgumentsSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::metasrv::builder::MetasrvBuilder;
@@ -229,7 +233,15 @@ pub async fn metasrv_builder(
let kv_backend = PgStore::with_pg_client(pg_client)
.await
.context(error::KvBackendSnafu)?;
(kv_backend, None)
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
)
.await?;
(kv_backend, Some(election))
}
};

View File

@@ -19,7 +19,9 @@ pub mod postgres;
use std::fmt::{self, Debug};
use std::sync::Arc;
use tokio::sync::broadcast::Receiver;
use common_telemetry::{info, warn};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::{self, Receiver, Sender};
use crate::error::Result;
use crate::metasrv::MetasrvNodeInfo;
@@ -75,6 +77,37 @@ impl fmt::Display for LeaderChangeMessage {
}
}
fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
let (tx, mut rx) = broadcast::channel(100);
let _handle = common_runtime::spawn_global(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_value}] is elected as leader: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_value}] is stepping down: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
},
Err(RecvError::Lagged(_)) => {
warn!("Log printing is too slow or leader changed too fast!");
}
Err(RecvError::Closed) => break,
}
}
});
tx
}
#[async_trait::async_trait]
pub trait Election: Send + Sync {
type Leader;

View File

@@ -23,13 +23,12 @@ use etcd_client::{
};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::time::{timeout, MissedTickBehavior};
use crate::election::{
Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY,
KEEP_ALIVE_INTERVAL_SECS,
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT,
CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
};
use crate::error;
use crate::error::Result;
@@ -88,36 +87,7 @@ impl EtcdElection {
E: AsRef<str>,
{
let leader_value: String = leader_value.as_ref().into();
let leader_ident = leader_value.clone();
let (tx, mut rx) = broadcast::channel(100);
let _handle = common_runtime::spawn_global(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_ident}] is elected as leader: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_ident}] is stepping down: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
},
Err(RecvError::Lagged(_)) => {
warn!("Log printing is too slow or leader changed too fast!");
}
Err(RecvError::Closed) => break,
}
}
});
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
client,

View File

@@ -16,18 +16,32 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::time::MissedTickBehavior;
use tokio_postgres::Client;
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::error::{
DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu,
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
// TODO(CookiePie): The lock id should be configurable.
const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(28319)";
const STEP_DOWN: &str = "SELECT pg_advisory_unlock(28319)";
const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = $1";
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
// Either the leader reconnects and step down or the session expires and the lock is released.
const IDLE_SESSION_TIMEOUT: &str = "10s";
// Separator between value and expire time.
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
@@ -81,8 +95,33 @@ fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
Ok((value.to_string(), expire_time))
}
#[derive(Debug, Clone, Default)]
struct PgLeaderKey {
name: Vec<u8>,
key: Vec<u8>,
rev: i64,
lease: i64,
}
impl LeaderKey for PgLeaderKey {
fn name(&self) -> &[u8] {
&self.name
}
fn key(&self) -> &[u8] {
&self.key
}
fn revision(&self) -> i64 {
self.rev
}
fn lease_id(&self) -> i64 {
self.lease
}
}
/// PostgreSql implementation of Election.
/// TODO(CookiePie): Currently only support candidate registration. Add election logic.
pub struct PgElection {
leader_value: String,
client: Client,
@@ -100,7 +139,13 @@ impl PgElection {
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
) -> Result<ElectionRef> {
let (tx, _) = broadcast::channel(100);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(SET_IDLE_SESSION_TIMEOUT, &[&IDLE_SESSION_TIMEOUT])
.await
.context(PostgresExecutionSnafu)?;
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
client,
@@ -112,7 +157,7 @@ impl PgElection {
}))
}
fn _election_key(&self) -> String {
fn election_key(&self) -> String {
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
@@ -146,11 +191,14 @@ impl Election for PgElection {
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
input: format!("{node_info:?}"),
})?;
let res = self.put_value_with_lease(&key, &node_info).await?;
let res = self
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
// May registered before, just update the lease.
if !res {
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &node_info).await?;
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
}
// Check if the current lease has expired and renew the lease.
@@ -197,12 +245,65 @@ impl Election for PgElection {
Ok(valid_candidates)
}
/// Attempts to acquire leadership by executing a campaign. This function continuously checks
/// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
///
/// The function operates in a loop, where it:
///
/// 1. Waits for a predefined interval before attempting to acquire the lock again.
/// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
/// 3. Checks the result of the query:
/// - If the lock is successfully acquired (result is true), it calls the `leader_action` method
/// to perform actions as the leader.
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
/// to perform actions as a follower.
async fn campaign(&self) -> Result<()> {
todo!()
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let res = self
.client
.query(CAMPAIGN, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Some(row) = res.first() {
match row.try_get(0) {
Ok(true) => self.leader_action().await?,
Ok(false) => self.follower_action().await?,
Err(_) => {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock"
.to_string(),
}
.fail();
}
}
} else {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock".to_string(),
}
.fail();
}
let _ = keep_alive_interval.tick().await;
}
}
async fn leader(&self) -> Result<Self::Leader> {
todo!()
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
} else {
let key = self.election_key();
if let Some((leader, expire_time, current, _)) =
self.get_value_with_lease(&key, false).await?
{
ensure!(expire_time > current, NoLeaderSnafu);
Ok(leader.as_bytes().into())
} else {
NoLeaderSnafu.fail()
}
}
}
async fn resign(&self) -> Result<()> {
@@ -315,17 +416,17 @@ impl PgElection {
}
/// Returns `true` if the insertion is successful
async fn put_value_with_lease(&self, key: &str, value: &str) -> Result<bool> {
async fn put_value_with_lease(
&self,
key: &str,
value: &str,
lease_ttl_secs: u64,
) -> Result<bool> {
let res = self
.client
.query(
PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME,
&[
&key,
&value,
&LEASE_SEP,
&(self.candidate_lease_ttl_secs as f64),
],
&[&key, &value, &LEASE_SEP, &(lease_ttl_secs as f64)],
)
.await
.context(PostgresExecutionSnafu)?;
@@ -343,6 +444,177 @@ impl PgElection {
Ok(res.len() == 1)
}
/// Handles the actions of a leader in the election process.
///
/// This function performs the following checks and actions:
///
/// - **Case 1**: If the current instance believes it is the leader from the previous term,
/// it attempts to renew the lease. It checks if the lease is still valid and either renews it
/// or steps down if it has expired.
///
/// - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
/// by updating the value associated with the election key.
/// - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
/// and steps down, initiating a new campaign for leadership.
/// - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
/// indicating that it still holds the lock and steps down to re-initiate the campaign. This may
/// happen if the leader has failed to renew the lease and the session has expired, and recovery
/// after a period of time during which other leaders have been elected and stepped down.
/// - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
///
/// - **Case 2**: If the current instance is not leader previously, it calls the
/// `elected` method as a newly elected leader.
async fn leader_action(&self) -> Result<()> {
let key = self.election_key();
// Case 1
if self.is_leader() {
match self.get_value_with_lease(&key, true).await? {
Some((prev_leader, expire_time, current, prev)) => {
match (prev_leader == self.leader_value, expire_time > current) {
// Case 1.1
(true, true) => {
// Safety: prev is Some since we are using `get_value_with_lease` with `true`.
let prev = prev.unwrap();
self.update_value_with_lease(&key, &prev, &self.leader_value)
.await?;
}
// Case 1.2
(true, false) => {
warn!("Leader lease expired, now stepping down.");
self.step_down().await?;
}
// Case 1.3
(false, _) => {
warn!("Leader lease not found, but still hold the lock. Now stepping down.");
self.step_down().await?;
}
}
}
// Case 1.4
None => {
warn!("Leader lease not found, but still hold the lock. Now stepping down.");
self.step_down().await?;
}
}
// Case 2
} else {
self.elected().await?;
}
Ok(())
}
/// Handles the actions of a follower in the election process.
///
/// This function performs the following checks and actions:
///
/// - **Case 1**: If the current instance believes it is the leader from the previous term,
/// it steps down without deleting the key.
/// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
/// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
/// will be released.
/// - **Case 3**: If all checks pass, the function returns without performing any actions.
async fn follower_action(&self) -> Result<()> {
let key = self.election_key();
// Case 1
if self.is_leader() {
self.step_down_without_lock().await?;
}
let (_, expire_time, current, _) = self
.get_value_with_lease(&key, false)
.await?
.context(NoLeaderSnafu)?;
// Case 2
ensure!(expire_time > current, NoLeaderSnafu);
// Case 3
Ok(())
}
/// Step down the leader. The leader should delete the key and notify the leader watcher.
///
/// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
///
/// ## Caution:
/// Should only step down while holding the advisory lock.
async fn step_down(&self) -> Result<()> {
let key = self.election_key();
let leader_key = PgLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone().into_bytes(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.delete_value(&key).await?;
self.client
.query(STEP_DOWN, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
async fn step_down_without_lock(&self) -> Result<()> {
let key = self.election_key().into_bytes();
let leader_key = PgLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Elected as leader. The leader should put the key and notify the leader watcher.
/// Caution: Should only elected while holding the advisory lock.
async fn elected(&self) -> Result<()> {
let key = self.election_key();
let leader_key = PgLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone().into_bytes(),
..Default::default()
};
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS)
.await?;
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.leader_infancy.store(true, Ordering::Relaxed);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
}
#[cfg(test)]
@@ -390,7 +662,7 @@ mod tests {
};
let res = pg_election
.put_value_with_lease(&key, &value)
.put_value_with_lease(&key, &value, 10)
.await
.unwrap();
assert!(res);
@@ -418,7 +690,7 @@ mod tests {
let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i);
pg_election
.put_value_with_lease(&key, &value)
.put_value_with_lease(&key, &value, 10)
.await
.unwrap();
}
@@ -478,7 +750,7 @@ mod tests {
handles.push(handle);
}
// Wait for candidates to registrate themselves and renew their leases at least once.
tokio::time::sleep(Duration::from_secs(6)).await;
tokio::time::sleep(Duration::from_secs(3)).await;
let client = create_postgres_client().await.unwrap();
@@ -516,4 +788,402 @@ mod tests {
assert!(res);
}
}
#[tokio::test]
async fn test_elected_and_step_down() {
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
leader_pg_election.elected().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
leader_pg_election.step_down_without_lock().await.unwrap();
let (leader, _, _, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
leader_pg_election.elected().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
leader_pg_election.step_down().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
}
#[tokio::test]
async fn test_leader_action() {
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
// Step 1: No leader exists, campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 2: As a leader, renew the lease.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, new_expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(new_expire_time > current && new_expire_time > expire_time);
assert!(leader_pg_election.is_leader());
// Step 3: Something wrong, the leader lease expired.
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Step 4: Re-campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 5: Something wrong, the leader key is deleted by other followers.
leader_pg_election
.delete_value(&leader_pg_election.election_key())
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Step 6: Re-campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election
.delete_value(&leader_pg_election.election_key())
.await
.unwrap();
leader_pg_election
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
}
#[tokio::test]
async fn test_follower_action() {
let candidate_lease_ttl_secs = 5;
let follower_client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let follower_pg_election = PgElection {
leader_value: "test_follower".to_string(),
client: follower_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
let leader_client = create_postgres_client().await.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: "test_leader".to_string(),
client: leader_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
leader_pg_election.elected().await.unwrap();
// Step 1: As a follower, the leader exists and the lease is not expired.
follower_pg_election.follower_action().await.unwrap();
// Step 2: As a follower, the leader exists but the lease expired.
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
assert!(follower_pg_election.follower_action().await.is_err());
// Step 3: As a follower, the leader does not exist.
leader_pg_election
.delete_value(&leader_pg_election.election_key())
.await
.unwrap();
assert!(follower_pg_election.follower_action().await.is_err());
// Step 4: Follower thinks it's the leader but failed to acquire the lock.
follower_pg_election
.is_leader
.store(true, Ordering::Relaxed);
assert!(follower_pg_election.follower_action().await.is_err());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
assert_eq!(
String::from_utf8_lossy(key.key()),
follower_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
}
}