diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index b4408db7ac..1c504387c7 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -48,6 +48,10 @@ use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; #[cfg(feature = "pg_kvbackend")] +use crate::election::postgres::PgElection; +#[cfg(feature = "pg_kvbackend")] +use crate::election::CANDIDATE_LEASE_SECS; +#[cfg(feature = "pg_kvbackend")] use crate::error::InvalidArgumentsSnafu; use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::metasrv::builder::MetasrvBuilder; @@ -229,7 +233,15 @@ pub async fn metasrv_builder( let kv_backend = PgStore::with_pg_client(pg_client) .await .context(error::KvBackendSnafu)?; - (kv_backend, None) + let election_client = create_postgres_client(opts).await?; + let election = PgElection::with_pg_client( + opts.server_addr.clone(), + election_client, + opts.store_key_prefix.clone(), + CANDIDATE_LEASE_SECS, + ) + .await?; + (kv_backend, Some(election)) } }; diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 232b1481a9..a414c0cd1c 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -19,7 +19,9 @@ pub mod postgres; use std::fmt::{self, Debug}; use std::sync::Arc; -use tokio::sync::broadcast::Receiver; +use common_telemetry::{info, warn}; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::{self, Receiver, Sender}; use crate::error::Result; use crate::metasrv::MetasrvNodeInfo; @@ -75,6 +77,37 @@ impl fmt::Display for LeaderChangeMessage { } } +fn listen_leader_change(leader_value: String) -> Sender { + let (tx, mut rx) = broadcast::channel(100); + let _handle = common_runtime::spawn_global(async move { + loop { + match rx.recv().await { + Ok(msg) => match msg { + LeaderChangeMessage::Elected(key) => { + info!( + "[{leader_value}] is elected as leader: {:?}, lease: {}", + String::from_utf8_lossy(key.name()), + key.lease_id() + ); + } + LeaderChangeMessage::StepDown(key) => { + warn!( + "[{leader_value}] is stepping down: {:?}, lease: {}", + String::from_utf8_lossy(key.name()), + key.lease_id() + ); + } + }, + Err(RecvError::Lagged(_)) => { + warn!("Log printing is too slow or leader changed too fast!"); + } + Err(RecvError::Closed) => break, + } + } + }); + tx +} + #[async_trait::async_trait] pub trait Election: Send + Sync { type Leader; diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 5f2cf33420..49766cf4f1 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -23,13 +23,12 @@ use etcd_client::{ }; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; -use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::time::{timeout, MissedTickBehavior}; use crate::election::{ - Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, - KEEP_ALIVE_INTERVAL_SECS, + listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, + CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS, }; use crate::error; use crate::error::Result; @@ -88,36 +87,7 @@ impl EtcdElection { E: AsRef, { let leader_value: String = leader_value.as_ref().into(); - - let leader_ident = leader_value.clone(); - let (tx, mut rx) = broadcast::channel(100); - let _handle = common_runtime::spawn_global(async move { - loop { - match rx.recv().await { - Ok(msg) => match msg { - LeaderChangeMessage::Elected(key) => { - info!( - "[{leader_ident}] is elected as leader: {:?}, lease: {}", - String::from_utf8_lossy(key.name()), - key.lease_id() - ); - } - LeaderChangeMessage::StepDown(key) => { - warn!( - "[{leader_ident}] is stepping down: {:?}, lease: {}", - String::from_utf8_lossy(key.name()), - key.lease_id() - ); - } - }, - Err(RecvError::Lagged(_)) => { - warn!("Log printing is too slow or leader changed too fast!"); - } - Err(RecvError::Closed) => break, - } - } - }); - + let tx = listen_leader_change(leader_value.clone()); Ok(Arc::new(Self { leader_value, client, diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 805b9b8bd2..22bde22850 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -16,18 +16,32 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; +use common_telemetry::{error, warn}; use common_time::Timestamp; use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; +use tokio::time::MissedTickBehavior; use tokio_postgres::Client; -use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY}; +use crate::election::{ + listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY, +}; use crate::error::{ - DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu, + DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, + UnexpectedSnafu, }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; +// TODO(CookiePie): The lock id should be configurable. +const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(28319)"; +const STEP_DOWN: &str = "SELECT pg_advisory_unlock(28319)"; +const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = $1"; +// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. +// Either the leader reconnects and step down or the session expires and the lock is released. +const IDLE_SESSION_TIMEOUT: &str = "10s"; + // Separator between value and expire time. const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; @@ -81,8 +95,33 @@ fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { Ok((value.to_string(), expire_time)) } +#[derive(Debug, Clone, Default)] +struct PgLeaderKey { + name: Vec, + key: Vec, + rev: i64, + lease: i64, +} + +impl LeaderKey for PgLeaderKey { + fn name(&self) -> &[u8] { + &self.name + } + + fn key(&self) -> &[u8] { + &self.key + } + + fn revision(&self) -> i64 { + self.rev + } + + fn lease_id(&self) -> i64 { + self.lease + } +} + /// PostgreSql implementation of Election. -/// TODO(CookiePie): Currently only support candidate registration. Add election logic. pub struct PgElection { leader_value: String, client: Client, @@ -100,7 +139,13 @@ impl PgElection { store_key_prefix: String, candidate_lease_ttl_secs: u64, ) -> Result { - let (tx, _) = broadcast::channel(100); + // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. + client + .execute(SET_IDLE_SESSION_TIMEOUT, &[&IDLE_SESSION_TIMEOUT]) + .await + .context(PostgresExecutionSnafu)?; + + let tx = listen_leader_change(leader_value.clone()); Ok(Arc::new(Self { leader_value, client, @@ -112,7 +157,7 @@ impl PgElection { })) } - fn _election_key(&self) -> String { + fn election_key(&self) -> String { format!("{}{}", self.store_key_prefix, ELECTION_KEY) } @@ -146,11 +191,14 @@ impl Election for PgElection { serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu { input: format!("{node_info:?}"), })?; - let res = self.put_value_with_lease(&key, &node_info).await?; + let res = self + .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs) + .await?; // May registered before, just update the lease. if !res { self.delete_value(&key).await?; - self.put_value_with_lease(&key, &node_info).await?; + self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs) + .await?; } // Check if the current lease has expired and renew the lease. @@ -197,12 +245,65 @@ impl Election for PgElection { Ok(valid_candidates) } + /// Attempts to acquire leadership by executing a campaign. This function continuously checks + /// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database. + /// + /// The function operates in a loop, where it: + /// + /// 1. Waits for a predefined interval before attempting to acquire the lock again. + /// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock. + /// 3. Checks the result of the query: + /// - If the lock is successfully acquired (result is true), it calls the `leader_action` method + /// to perform actions as the leader. + /// - If the lock is not acquired (result is false), it calls the `follower_action` method + /// to perform actions as a follower. async fn campaign(&self) -> Result<()> { - todo!() + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)); + keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + let res = self + .client + .query(CAMPAIGN, &[]) + .await + .context(PostgresExecutionSnafu)?; + if let Some(row) = res.first() { + match row.try_get(0) { + Ok(true) => self.leader_action().await?, + Ok(false) => self.follower_action().await?, + Err(_) => { + return UnexpectedSnafu { + violated: "Failed to get the result of acquiring advisory lock" + .to_string(), + } + .fail(); + } + } + } else { + return UnexpectedSnafu { + violated: "Failed to get the result of acquiring advisory lock".to_string(), + } + .fail(); + } + let _ = keep_alive_interval.tick().await; + } } async fn leader(&self) -> Result { - todo!() + if self.is_leader.load(Ordering::Relaxed) { + Ok(self.leader_value.as_bytes().into()) + } else { + let key = self.election_key(); + if let Some((leader, expire_time, current, _)) = + self.get_value_with_lease(&key, false).await? + { + ensure!(expire_time > current, NoLeaderSnafu); + Ok(leader.as_bytes().into()) + } else { + NoLeaderSnafu.fail() + } + } } async fn resign(&self) -> Result<()> { @@ -315,17 +416,17 @@ impl PgElection { } /// Returns `true` if the insertion is successful - async fn put_value_with_lease(&self, key: &str, value: &str) -> Result { + async fn put_value_with_lease( + &self, + key: &str, + value: &str, + lease_ttl_secs: u64, + ) -> Result { let res = self .client .query( PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME, - &[ - &key, - &value, - &LEASE_SEP, - &(self.candidate_lease_ttl_secs as f64), - ], + &[&key, &value, &LEASE_SEP, &(lease_ttl_secs as f64)], ) .await .context(PostgresExecutionSnafu)?; @@ -343,6 +444,177 @@ impl PgElection { Ok(res.len() == 1) } + + /// Handles the actions of a leader in the election process. + /// + /// This function performs the following checks and actions: + /// + /// - **Case 1**: If the current instance believes it is the leader from the previous term, + /// it attempts to renew the lease. It checks if the lease is still valid and either renews it + /// or steps down if it has expired. + /// + /// - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease + /// by updating the value associated with the election key. + /// - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning + /// and steps down, initiating a new campaign for leadership. + /// - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning + /// indicating that it still holds the lock and steps down to re-initiate the campaign. This may + /// happen if the leader has failed to renew the lease and the session has expired, and recovery + /// after a period of time during which other leaders have been elected and stepped down. + /// - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign. + /// + /// - **Case 2**: If the current instance is not leader previously, it calls the + /// `elected` method as a newly elected leader. + async fn leader_action(&self) -> Result<()> { + let key = self.election_key(); + // Case 1 + if self.is_leader() { + match self.get_value_with_lease(&key, true).await? { + Some((prev_leader, expire_time, current, prev)) => { + match (prev_leader == self.leader_value, expire_time > current) { + // Case 1.1 + (true, true) => { + // Safety: prev is Some since we are using `get_value_with_lease` with `true`. + let prev = prev.unwrap(); + self.update_value_with_lease(&key, &prev, &self.leader_value) + .await?; + } + // Case 1.2 + (true, false) => { + warn!("Leader lease expired, now stepping down."); + self.step_down().await?; + } + // Case 1.3 + (false, _) => { + warn!("Leader lease not found, but still hold the lock. Now stepping down."); + self.step_down().await?; + } + } + } + // Case 1.4 + None => { + warn!("Leader lease not found, but still hold the lock. Now stepping down."); + self.step_down().await?; + } + } + // Case 2 + } else { + self.elected().await?; + } + Ok(()) + } + + /// Handles the actions of a follower in the election process. + /// + /// This function performs the following checks and actions: + /// + /// - **Case 1**: If the current instance believes it is the leader from the previous term, + /// it steps down without deleting the key. + /// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error + /// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock + /// will be released. + /// - **Case 3**: If all checks pass, the function returns without performing any actions. + async fn follower_action(&self) -> Result<()> { + let key = self.election_key(); + // Case 1 + if self.is_leader() { + self.step_down_without_lock().await?; + } + let (_, expire_time, current, _) = self + .get_value_with_lease(&key, false) + .await? + .context(NoLeaderSnafu)?; + // Case 2 + ensure!(expire_time > current, NoLeaderSnafu); + // Case 3 + Ok(()) + } + + /// Step down the leader. The leader should delete the key and notify the leader watcher. + /// + /// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected. + /// + /// ## Caution: + /// Should only step down while holding the advisory lock. + async fn step_down(&self) -> Result<()> { + let key = self.election_key(); + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone().into_bytes(), + ..Default::default() + }; + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + self.delete_value(&key).await?; + self.client + .query(STEP_DOWN, &[]) + .await + .context(PostgresExecutionSnafu)?; + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + { + error!(e; "Failed to send leader change message"); + } + } + Ok(()) + } + + /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key. + async fn step_down_without_lock(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone(), + ..Default::default() + }; + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + { + error!(e; "Failed to send leader change message"); + } + } + Ok(()) + } + + /// Elected as leader. The leader should put the key and notify the leader watcher. + /// Caution: Should only elected while holding the advisory lock. + async fn elected(&self) -> Result<()> { + let key = self.election_key(); + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone().into_bytes(), + ..Default::default() + }; + self.delete_value(&key).await?; + self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS) + .await?; + + if self + .is_leader + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + self.leader_infancy.store(true, Ordering::Relaxed); + + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader_key))) + { + error!(e; "Failed to send leader change message"); + } + } + Ok(()) + } } #[cfg(test)] @@ -390,7 +662,7 @@ mod tests { }; let res = pg_election - .put_value_with_lease(&key, &value) + .put_value_with_lease(&key, &value, 10) .await .unwrap(); assert!(res); @@ -418,7 +690,7 @@ mod tests { let key = format!("test_key_{}", i); let value = format!("test_value_{}", i); pg_election - .put_value_with_lease(&key, &value) + .put_value_with_lease(&key, &value, 10) .await .unwrap(); } @@ -478,7 +750,7 @@ mod tests { handles.push(handle); } // Wait for candidates to registrate themselves and renew their leases at least once. - tokio::time::sleep(Duration::from_secs(6)).await; + tokio::time::sleep(Duration::from_secs(3)).await; let client = create_postgres_client().await.unwrap(); @@ -516,4 +788,402 @@ mod tests { assert!(res); } } + + #[tokio::test] + async fn test_elected_and_step_down() { + let leader_value = "test_leader".to_string(); + let candidate_lease_ttl_secs = 5; + let client = create_postgres_client().await.unwrap(); + + let (tx, mut rx) = broadcast::channel(100); + let leader_pg_election = PgElection { + leader_value: leader_value.clone(), + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl_secs, + }; + + leader_pg_election.elected().await.unwrap(); + let (leader, expire_time, current, _) = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap() + .unwrap(); + assert!(leader == leader_value); + assert!(expire_time > current); + assert!(leader_pg_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + leader_pg_election.step_down_without_lock().await.unwrap(); + let (leader, _, _, _) = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap() + .unwrap(); + assert!(leader == leader_value); + assert!(!leader_pg_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + + leader_pg_election.elected().await.unwrap(); + let (leader, expire_time, current, _) = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap() + .unwrap(); + assert!(leader == leader_value); + assert!(expire_time > current); + assert!(leader_pg_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + leader_pg_election.step_down().await.unwrap(); + let res = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap(); + assert!(res.is_none()); + assert!(!leader_pg_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + } + + #[tokio::test] + async fn test_leader_action() { + let leader_value = "test_leader".to_string(); + let candidate_lease_ttl_secs = 5; + let client = create_postgres_client().await.unwrap(); + + let (tx, mut rx) = broadcast::channel(100); + let leader_pg_election = PgElection { + leader_value: leader_value.clone(), + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl_secs, + }; + + // Step 1: No leader exists, campaign and elected. + let res = leader_pg_election + .client + .query(CAMPAIGN, &[]) + .await + .unwrap(); + let res: bool = res[0].get(0); + assert!(res); + leader_pg_election.leader_action().await.unwrap(); + let (leader, expire_time, current, _) = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap() + .unwrap(); + assert!(leader == leader_value); + assert!(expire_time > current); + assert!(leader_pg_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + // Step 2: As a leader, renew the lease. + let res = leader_pg_election + .client + .query(CAMPAIGN, &[]) + .await + .unwrap(); + let res: bool = res[0].get(0); + assert!(res); + leader_pg_election.leader_action().await.unwrap(); + let (leader, new_expire_time, current, _) = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap() + .unwrap(); + assert!(leader == leader_value); + assert!(new_expire_time > current && new_expire_time > expire_time); + assert!(leader_pg_election.is_leader()); + + // Step 3: Something wrong, the leader lease expired. + tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await; + + let res = leader_pg_election + .client + .query(CAMPAIGN, &[]) + .await + .unwrap(); + let res: bool = res[0].get(0); + assert!(res); + leader_pg_election.leader_action().await.unwrap(); + let res = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap(); + assert!(res.is_none()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + + // Step 4: Re-campaign and elected. + let res = leader_pg_election + .client + .query(CAMPAIGN, &[]) + .await + .unwrap(); + let res: bool = res[0].get(0); + assert!(res); + leader_pg_election.leader_action().await.unwrap(); + let (leader, expire_time, current, _) = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap() + .unwrap(); + assert!(leader == leader_value); + assert!(expire_time > current); + assert!(leader_pg_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + // Step 5: Something wrong, the leader key is deleted by other followers. + leader_pg_election + .delete_value(&leader_pg_election.election_key()) + .await + .unwrap(); + leader_pg_election.leader_action().await.unwrap(); + let res = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap(); + assert!(res.is_none()); + assert!(!leader_pg_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + + // Step 6: Re-campaign and elected. + let res = leader_pg_election + .client + .query(CAMPAIGN, &[]) + .await + .unwrap(); + let res: bool = res[0].get(0); + assert!(res); + leader_pg_election.leader_action().await.unwrap(); + let (leader, expire_time, current, _) = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap() + .unwrap(); + assert!(leader == leader_value); + assert!(expire_time > current); + assert!(leader_pg_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + // Step 7: Something wrong, the leader key changed by others. + let res = leader_pg_election + .client + .query(CAMPAIGN, &[]) + .await + .unwrap(); + let res: bool = res[0].get(0); + assert!(res); + leader_pg_election + .delete_value(&leader_pg_election.election_key()) + .await + .unwrap(); + leader_pg_election + .put_value_with_lease(&leader_pg_election.election_key(), "test", 10) + .await + .unwrap(); + leader_pg_election.leader_action().await.unwrap(); + let res = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key(), false) + .await + .unwrap(); + assert!(res.is_none()); + assert!(!leader_pg_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + } + + #[tokio::test] + async fn test_follower_action() { + let candidate_lease_ttl_secs = 5; + + let follower_client = create_postgres_client().await.unwrap(); + let (tx, mut rx) = broadcast::channel(100); + let follower_pg_election = PgElection { + leader_value: "test_follower".to_string(), + client: follower_client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl_secs, + }; + + let leader_client = create_postgres_client().await.unwrap(); + let (tx, _) = broadcast::channel(100); + let leader_pg_election = PgElection { + leader_value: "test_leader".to_string(), + client: leader_client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl_secs, + }; + + leader_pg_election + .client + .query(CAMPAIGN, &[]) + .await + .unwrap(); + leader_pg_election.elected().await.unwrap(); + + // Step 1: As a follower, the leader exists and the lease is not expired. + follower_pg_election.follower_action().await.unwrap(); + + // Step 2: As a follower, the leader exists but the lease expired. + tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await; + assert!(follower_pg_election.follower_action().await.is_err()); + + // Step 3: As a follower, the leader does not exist. + leader_pg_election + .delete_value(&leader_pg_election.election_key()) + .await + .unwrap(); + assert!(follower_pg_election.follower_action().await.is_err()); + + // Step 4: Follower thinks it's the leader but failed to acquire the lock. + follower_pg_election + .is_leader + .store(true, Ordering::Relaxed); + assert!(follower_pg_election.follower_action().await.is_err()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), "test_follower"); + assert_eq!( + String::from_utf8_lossy(key.key()), + follower_pg_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + } }