From 083c22b90a2cd793384f15d52336de6e2adbaa45 Mon Sep 17 00:00:00 2001 From: Yuhan Wang <1035325592@qq.com> Date: Tue, 3 Jun 2025 19:31:30 +0800 Subject: [PATCH] refactor: extract some common functions and structs in election module (#6172) * refactor: extract some common functions and structs in election module * chore: add comments and modify a function name * chore: add comments and modify a function name * fix: missing 2 lines in license header * fix: acqrel * chore: apply comment suggestions * Update src/meta-srv/src/election.rs Co-authored-by: jeremyhi --------- Co-authored-by: jeremyhi --- src/meta-srv/src/bootstrap.rs | 4 +- src/meta-srv/src/election.rs | 31 ++- src/meta-srv/src/election/etcd.rs | 42 ++-- src/meta-srv/src/election/rds.rs | 90 +++++++ src/meta-srv/src/election/{ => rds}/mysql.rs | 112 ++------- .../src/election/{ => rds}/postgres.rs | 237 +++++++----------- 6 files changed, 239 insertions(+), 277 deletions(-) create mode 100644 src/meta-srv/src/election/rds.rs rename src/meta-srv/src/election/{ => rds}/mysql.rs (94%) rename src/meta-srv/src/election/{ => rds}/postgres.rs (89%) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 15e7d8d11f..717aae19ec 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -61,9 +61,9 @@ use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; #[cfg(feature = "mysql_kvbackend")] -use crate::election::mysql::MySqlElection; +use crate::election::rds::mysql::MySqlElection; #[cfg(feature = "pg_kvbackend")] -use crate::election::postgres::PgElection; +use crate::election::rds::postgres::PgElection; #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use crate::election::CANDIDATE_LEASE_SECS; use crate::metasrv::builder::MetasrvBuilder; diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 235a0b25ce..e4b569b99e 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -13,15 +13,14 @@ // limitations under the License. pub mod etcd; -#[cfg(feature = "mysql_kvbackend")] -pub mod mysql; -#[cfg(feature = "pg_kvbackend")] -pub mod postgres; +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] +pub mod rds; use std::fmt::{self, Debug}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use common_telemetry::{info, warn}; +use common_telemetry::{error, info, warn}; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::{self, Receiver, Sender}; @@ -110,6 +109,28 @@ fn listen_leader_change(leader_value: String) -> Sender { tx } +/// Sends a leader change message to the channel and sets the `is_leader` flag. +/// If a leader is elected, it will also set the `leader_infancy` flag to true. +fn send_leader_change_and_set_flags( + is_leader: &AtomicBool, + leader_infancy: &AtomicBool, + tx: &Sender, + msg: LeaderChangeMessage, +) { + let is_elected = matches!(msg, LeaderChangeMessage::Elected(_)); + if is_leader + .compare_exchange(!is_elected, is_elected, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + if is_elected { + leader_infancy.store(true, Ordering::Release); + } + if let Err(e) = tx.send(msg) { + error!(e; "Failed to send leader change message"); + } + } +} + #[async_trait::async_trait] pub trait Election: Send + Sync { type Leader; diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 003b8e6a44..bf4bfa049d 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -27,8 +27,8 @@ use tokio::sync::broadcast::Receiver; use tokio::time::{timeout, MissedTickBehavior}; use crate::election::{ - listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, - CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS, + listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage, + LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS, }; use crate::error; use crate::error::Result; @@ -247,18 +247,12 @@ impl Election for EtcdElection { } } - if self - .is_leader - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone()))) - { - error!(e; "Failed to send leader change message"); - } - } + send_leader_change_and_set_flags( + &self.is_leader, + &self.infancy, + &self.leader_watcher, + LeaderChangeMessage::StepDown(Arc::new(leader.clone())), + ); } Ok(()) @@ -305,20 +299,12 @@ impl EtcdElection { ); // Only after a successful `keep_alive` is the leader considered official. - if self - .is_leader - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - self.infancy.store(true, Ordering::Release); - - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader))) - { - error!(e; "Failed to send leader change message"); - } - } + send_leader_change_and_set_flags( + &self.is_leader, + &self.infancy, + &self.leader_watcher, + LeaderChangeMessage::Elected(Arc::new(leader.clone())), + ); } Ok(()) diff --git a/src/meta-srv/src/election/rds.rs b/src/meta-srv/src/election/rds.rs new file mode 100644 index 0000000000..16e113415a --- /dev/null +++ b/src/meta-srv/src/election/rds.rs @@ -0,0 +1,90 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(feature = "mysql_kvbackend")] +pub mod mysql; +#[cfg(feature = "pg_kvbackend")] +pub mod postgres; + +use common_time::Timestamp; +use itertools::Itertools; +use snafu::OptionExt; + +use crate::election::LeaderKey; +use crate::error::{Result, UnexpectedSnafu}; + +// Separator between value and expire time in the lease string. +// A lease is put into rds election in the format: +// || __metadata_lease_sep || +const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; + +/// Parses the value and expire time from the given string retrieved from rds. +fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { + let (value, expire_time) = + value + .split(LEASE_SEP) + .collect_tuple() + .with_context(|| UnexpectedSnafu { + violated: format!( + "Invalid value {}, expect node info || {} || expire time", + value, LEASE_SEP + ), + })?; + // Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS' + let expire_time = match Timestamp::from_str(expire_time, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", expire_time), + } + .fail()?, + }; + Ok((value.to_string(), expire_time)) +} + +/// LeaderKey used for [LeaderChangeMessage] in rds election components. +#[derive(Debug, Clone, Default)] +struct RdsLeaderKey { + name: Vec, + key: Vec, + rev: i64, + lease: i64, +} + +impl LeaderKey for RdsLeaderKey { + fn name(&self) -> &[u8] { + &self.name + } + + fn key(&self) -> &[u8] { + &self.key + } + + fn revision(&self) -> i64 { + self.rev + } + + fn lease_id(&self) -> i64 { + self.lease + } +} + +/// Lease information for rds election. +#[derive(Default, Clone, Debug)] +struct Lease { + leader_value: String, + expire_time: Timestamp, + current: Timestamp, + // `origin` is the original value of the lease, used for CAS. + origin: String, +} diff --git a/src/meta-srv/src/election/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs similarity index 94% rename from src/meta-srv/src/election/mysql.rs rename to src/meta-srv/src/election/rds/mysql.rs index 24905228c8..6971302f4e 100644 --- a/src/meta-srv/src/election/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -16,9 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use common_telemetry::{error, warn}; +use common_telemetry::warn; use common_time::Timestamp; -use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; use sqlx::mysql::{MySqlArguments, MySqlRow}; use sqlx::query::Query; @@ -26,8 +25,10 @@ use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row}; use tokio::sync::{broadcast, Mutex, MutexGuard}; use tokio::time::MissedTickBehavior; +use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP}; use crate::election::{ - listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY, + listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage, + CANDIDATES_ROOT, ELECTION_KEY, }; use crate::error::{ DeserializeFromJsonSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result, SerializeToJsonSnafu, @@ -35,20 +36,6 @@ use crate::error::{ }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; -// Separator between value and expire time. -const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; - -/// Lease information. -/// TODO(CookiePie): PgElection can also use this struct. Refactor it to a common module. -#[derive(Default, Clone, Debug)] -struct Lease { - leader_value: String, - expire_time: Timestamp, - current: Timestamp, - // origin is the origin value of the lease, used for CAS. - origin: String, -} - struct ElectionSqlFactory<'a> { table_name: &'a str, meta_lease_ttl_secs: u64, @@ -204,55 +191,6 @@ impl<'a> ElectionSqlFactory<'a> { } } -/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time". -fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { - let (value, expire_time) = - value - .split(LEASE_SEP) - .collect_tuple() - .with_context(|| UnexpectedSnafu { - violated: format!( - "Invalid value {}, expect node info || {} || expire time", - value, LEASE_SEP - ), - })?; - // Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS' - let expire_time = match Timestamp::from_str(expire_time, None) { - Ok(ts) => ts, - Err(_) => UnexpectedSnafu { - violated: format!("Invalid timestamp: {}", expire_time), - } - .fail()?, - }; - Ok((value.to_string(), expire_time)) -} - -#[derive(Debug, Clone, Default)] -struct MySqlLeaderKey { - name: Vec, - key: Vec, - rev: i64, - lease: i64, -} - -impl LeaderKey for MySqlLeaderKey { - fn name(&self) -> &[u8] { - &self.name - } - - fn key(&self) -> &[u8] { - &self.key - } - - fn revision(&self) -> i64 { - self.rev - } - - fn lease_id(&self) -> i64 { - self.lease - } -} - enum Executor<'a> { Default(MutexGuard<'a, MySqlConnection>), Txn(MySqlTransaction<'a>), @@ -767,23 +705,17 @@ impl MySqlElection { /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key. async fn step_down_without_lock(&self) -> Result<()> { let key = self.election_key().into_bytes(); - let leader_key = MySqlLeaderKey { + let leader_key = RdsLeaderKey { name: self.leader_value.clone().into_bytes(), key: key.clone(), ..Default::default() }; - if self - .is_leader - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) - { - error!(e; "Failed to send leader change message"); - } - } + send_leader_change_and_set_flags( + &self.is_leader, + &self.leader_infancy, + &self.leader_watcher, + LeaderChangeMessage::StepDown(Arc::new(leader_key)), + ); Ok(()) } @@ -791,7 +723,7 @@ impl MySqlElection { /// Caution: Should only elected while holding the lock. async fn elected(&self, executor: &mut Executor<'_>) -> Result<()> { let key = self.election_key(); - let leader_key = MySqlLeaderKey { + let leader_key = RdsLeaderKey { name: self.leader_value.clone().into_bytes(), key: key.clone().into_bytes(), ..Default::default() @@ -800,20 +732,12 @@ impl MySqlElection { self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs, executor) .await?; - if self - .is_leader - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - self.leader_infancy.store(true, Ordering::Release); - - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader_key))) - { - error!(e; "Failed to send leader change message"); - } - } + send_leader_change_and_set_flags( + &self.is_leader, + &self.leader_infancy, + &self.leader_watcher, + LeaderChangeMessage::Elected(Arc::new(leader_key)), + ); Ok(()) } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs similarity index 89% rename from src/meta-srv/src/election/postgres.rs rename to src/meta-srv/src/election/rds/postgres.rs index d30e0ce57c..1e970c40df 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -18,15 +18,16 @@ use std::time::Duration; use common_telemetry::{error, warn}; use common_time::Timestamp; -use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; use tokio::time::MissedTickBehavior; use tokio_postgres::types::ToSql; use tokio_postgres::Client; +use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP}; use crate::election::{ - listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY, + listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage, + CANDIDATES_ROOT, ELECTION_KEY, }; use crate::error::{ DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, @@ -34,9 +35,6 @@ use crate::error::{ }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; -// Separator between value and expire time. -const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; - struct ElectionSqlFactory<'a> { lock_id: u64, table_name: &'a str, @@ -173,54 +171,6 @@ impl<'a> ElectionSqlFactory<'a> { } } -/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time". -fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { - let (value, expire_time) = value - .split(LEASE_SEP) - .collect_tuple() - .context(UnexpectedSnafu { - violated: format!( - "Invalid value {}, expect node info || {} || expire time", - value, LEASE_SEP - ), - })?; - // Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS' - let expire_time = match Timestamp::from_str(expire_time, None) { - Ok(ts) => ts, - Err(_) => UnexpectedSnafu { - violated: format!("Invalid timestamp: {}", expire_time), - } - .fail()?, - }; - Ok((value.to_string(), expire_time)) -} - -#[derive(Debug, Clone, Default)] -struct PgLeaderKey { - name: Vec, - key: Vec, - rev: i64, - lease: i64, -} - -impl LeaderKey for PgLeaderKey { - fn name(&self) -> &[u8] { - &self.name - } - - fn key(&self) -> &[u8] { - &self.key - } - - fn revision(&self) -> i64 { - self.rev - } - - fn lease_id(&self) -> i64 { - self.lease - } -} - /// PostgreSql implementation of Election. pub struct PgElection { leader_value: String, @@ -314,27 +264,31 @@ impl Election for PgElection { loop { let _ = keep_alive_interval.tick().await; - let (_, prev_expire_time, current_time, origin) = self - .get_value_with_lease(&key, true) + let lease = self + .get_value_with_lease(&key) .await? - .unwrap_or_default(); + .context(UnexpectedSnafu { + violated: format!("Failed to get lease for key: {:?}", key), + })?; ensure!( - prev_expire_time > current_time, + lease.expire_time > lease.current, UnexpectedSnafu { violated: format!( "Candidate lease expired at {:?} (current time {:?}), key: {:?}", - prev_expire_time, - current_time, - String::from_utf8_lossy(&key.into_bytes()) + lease.expire_time, lease.current, key ), } ); // Safety: origin is Some since we are using `get_value_with_lease` with `true`. - let origin = origin.unwrap(); - self.update_value_with_lease(&key, &origin, &node_info, self.candidate_lease_ttl_secs) - .await?; + self.update_value_with_lease( + &key, + &lease.origin, + &node_info, + self.candidate_lease_ttl_secs, + ) + .await?; } } @@ -400,11 +354,9 @@ impl Election for PgElection { Ok(self.leader_value.as_bytes().into()) } else { let key = self.election_key(); - if let Some((leader, expire_time, current, _)) = - self.get_value_with_lease(&key, false).await? - { - ensure!(expire_time > current, NoLeaderSnafu); - Ok(leader.as_bytes().into()) + if let Some(lease) = self.get_value_with_lease(&key).await? { + ensure!(lease.expire_time > lease.current, NoLeaderSnafu); + Ok(lease.leader_value.as_bytes().into()) } else { NoLeaderSnafu.fail() } @@ -422,11 +374,7 @@ impl Election for PgElection { impl PgElection { /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned. - async fn get_value_with_lease( - &self, - key: &str, - with_origin: bool, - ) -> Result)>> { + async fn get_value_with_lease(&self, key: &str) -> Result> { let key = key.as_bytes(); let res = self .client @@ -451,16 +399,12 @@ impl PgElection { String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default()); let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?; - if with_origin { - Ok(Some(( - value, - expire_time, - current_time, - Some(value_and_expire_time.to_string()), - ))) - } else { - Ok(Some((value, expire_time, current_time, None))) - } + Ok(Some(Lease { + leader_value: value, + expire_time, + current: current_time, + origin: value_and_expire_time.to_string(), + })) } } @@ -579,16 +523,18 @@ impl PgElection { let key = self.election_key(); // Case 1 if self.is_leader() { - match self.get_value_with_lease(&key, true).await? { - Some((prev_leader, expire_time, current, prev)) => { - match (prev_leader == self.leader_value, expire_time > current) { + match self.get_value_with_lease(&key).await? { + Some(lease) => { + match ( + lease.leader_value == self.leader_value, + lease.expire_time > lease.current, + ) { // Case 1.1 (true, true) => { // Safety: prev is Some since we are using `get_value_with_lease` with `true`. - let prev = prev.unwrap(); self.update_value_with_lease( &key, - &prev, + &lease.origin, &self.leader_value, self.meta_lease_ttl_secs, ) @@ -635,12 +581,12 @@ impl PgElection { if self.is_leader() { self.step_down_without_lock().await?; } - let (_, expire_time, current, _) = self - .get_value_with_lease(&key, false) + let lease = self + .get_value_with_lease(&key) .await? .context(NoLeaderSnafu)?; // Case 2 - ensure!(expire_time > current, NoLeaderSnafu); + ensure!(lease.expire_time > lease.current, NoLeaderSnafu); // Case 3 Ok(()) } @@ -653,35 +599,29 @@ impl PgElection { /// Should only step down while holding the advisory lock. async fn step_down(&self) -> Result<()> { let key = self.election_key(); - let leader_key = PgLeaderKey { + let leader_key = RdsLeaderKey { name: self.leader_value.clone().into_bytes(), key: key.clone().into_bytes(), ..Default::default() }; - if self - .is_leader - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - self.delete_value(&key).await?; - self.client - .query(&self.sql_set.step_down, &[]) - .await - .context(PostgresExecutionSnafu)?; - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) - { - error!(e; "Failed to send leader change message"); - } - } + self.delete_value(&key).await?; + self.client + .query(&self.sql_set.step_down, &[]) + .await + .context(PostgresExecutionSnafu)?; + send_leader_change_and_set_flags( + &self.is_leader, + &self.leader_infancy, + &self.leader_watcher, + LeaderChangeMessage::StepDown(Arc::new(leader_key)), + ); Ok(()) } /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key. async fn step_down_without_lock(&self) -> Result<()> { let key = self.election_key().into_bytes(); - let leader_key = PgLeaderKey { + let leader_key = RdsLeaderKey { name: self.leader_value.clone().into_bytes(), key: key.clone(), ..Default::default() @@ -705,7 +645,7 @@ impl PgElection { /// Caution: Should only elected while holding the advisory lock. async fn elected(&self) -> Result<()> { let key = self.election_key(); - let leader_key = PgLeaderKey { + let leader_key = RdsLeaderKey { name: self.leader_value.clone().into_bytes(), key: key.clone().into_bytes(), ..Default::default() @@ -800,23 +740,22 @@ mod tests { .unwrap(); assert!(res); - let (value_get, _, _, prev) = pg_election - .get_value_with_lease(&key, true) + let lease = pg_election + .get_value_with_lease(&key) .await .unwrap() .unwrap(); - assert_eq!(value_get, value); + assert_eq!(lease.leader_value, value); - let prev = prev.unwrap(); pg_election - .update_value_with_lease(&key, &prev, &value, pg_election.meta_lease_ttl_secs) + .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs) .await .unwrap(); let res = pg_election.delete_value(&key).await.unwrap(); assert!(res); - let res = pg_election.get_value_with_lease(&key, false).await.unwrap(); + let res = pg_election.get_value_with_lease(&key).await.unwrap(); assert!(res.is_none()); for i in 0..10 { @@ -963,13 +902,13 @@ mod tests { }; leader_pg_election.elected().await.unwrap(); - let (leader, expire_time, current, _) = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + let lease = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap() .unwrap(); - assert!(leader == leader_value); - assert!(expire_time > current); + assert!(lease.leader_value == leader_value); + assert!(lease.expire_time > lease.current); assert!(leader_pg_election.is_leader()); match rx.recv().await { @@ -986,12 +925,12 @@ mod tests { } leader_pg_election.step_down_without_lock().await.unwrap(); - let (leader, _, _, _) = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + let lease = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap() .unwrap(); - assert!(leader == leader_value); + assert!(lease.leader_value == leader_value); assert!(!leader_pg_election.is_leader()); match rx.recv().await { @@ -1008,13 +947,13 @@ mod tests { } leader_pg_election.elected().await.unwrap(); - let (leader, expire_time, current, _) = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + let lease = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap() .unwrap(); - assert!(leader == leader_value); - assert!(expire_time > current); + assert!(lease.leader_value == leader_value); + assert!(lease.expire_time > lease.current); assert!(leader_pg_election.is_leader()); match rx.recv().await { @@ -1032,7 +971,7 @@ mod tests { leader_pg_election.step_down().await.unwrap(); let res = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap(); assert!(res.is_none()); @@ -1085,13 +1024,13 @@ mod tests { let res: bool = res[0].get(0); assert!(res); leader_pg_election.leader_action().await.unwrap(); - let (leader, expire_time, current, _) = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + let lease = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap() .unwrap(); - assert!(leader == leader_value); - assert!(expire_time > current); + assert!(lease.leader_value == leader_value); + assert!(lease.expire_time > lease.current); assert!(leader_pg_election.is_leader()); match rx.recv().await { @@ -1116,13 +1055,15 @@ mod tests { let res: bool = res[0].get(0); assert!(res); leader_pg_election.leader_action().await.unwrap(); - let (leader, new_expire_time, current, _) = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + let new_lease = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap() .unwrap(); - assert!(leader == leader_value); - assert!(new_expire_time > current && new_expire_time > expire_time); + assert!(new_lease.leader_value == leader_value); + assert!( + new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time + ); assert!(leader_pg_election.is_leader()); // Step 3: Something wrong, the leader lease expired. @@ -1137,7 +1078,7 @@ mod tests { assert!(res); leader_pg_election.leader_action().await.unwrap(); let res = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap(); assert!(res.is_none()); @@ -1164,13 +1105,13 @@ mod tests { let res: bool = res[0].get(0); assert!(res); leader_pg_election.leader_action().await.unwrap(); - let (leader, expire_time, current, _) = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + let lease = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap() .unwrap(); - assert!(leader == leader_value); - assert!(expire_time > current); + assert!(lease.leader_value == leader_value); + assert!(lease.expire_time > lease.current); assert!(leader_pg_election.is_leader()); match rx.recv().await { @@ -1193,7 +1134,7 @@ mod tests { .unwrap(); leader_pg_election.leader_action().await.unwrap(); let res = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap(); assert!(res.is_none()); @@ -1221,13 +1162,13 @@ mod tests { let res: bool = res[0].get(0); assert!(res); leader_pg_election.leader_action().await.unwrap(); - let (leader, expire_time, current, _) = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + let lease = leader_pg_election + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap() .unwrap(); - assert!(leader == leader_value); - assert!(expire_time > current); + assert!(lease.leader_value == leader_value); + assert!(lease.expire_time > lease.current); assert!(leader_pg_election.is_leader()); match rx.recv().await { @@ -1261,7 +1202,7 @@ mod tests { .unwrap(); leader_pg_election.leader_action().await.unwrap(); let res = leader_pg_election - .get_value_with_lease(&leader_pg_election.election_key(), false) + .get_value_with_lease(&leader_pg_election.election_key()) .await .unwrap(); assert!(res.is_none());