mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-22 07:50:38 +00:00
refactor: extract some common functions and structs in election module (#6172)
* refactor: extract some common functions and structs in election module * chore: add comments and modify a function name * chore: add comments and modify a function name * fix: missing 2 lines in license header * fix: acqrel * chore: apply comment suggestions * Update src/meta-srv/src/election.rs Co-authored-by: jeremyhi <jiachun_feng@proton.me> --------- Co-authored-by: jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
@@ -61,9 +61,9 @@ use tonic::transport::server::{Router, TcpIncoming};
|
||||
|
||||
use crate::election::etcd::EtcdElection;
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
use crate::election::mysql::MySqlElection;
|
||||
use crate::election::rds::mysql::MySqlElection;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use crate::election::postgres::PgElection;
|
||||
use crate::election::rds::postgres::PgElection;
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
use crate::election::CANDIDATE_LEASE_SECS;
|
||||
use crate::metasrv::builder::MetasrvBuilder;
|
||||
|
||||
@@ -13,15 +13,14 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod etcd;
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
pub mod mysql;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
pub mod postgres;
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
pub mod rds;
|
||||
|
||||
use std::fmt::{self, Debug};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::{info, warn};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::broadcast::{self, Receiver, Sender};
|
||||
|
||||
@@ -110,6 +109,28 @@ fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
|
||||
tx
|
||||
}
|
||||
|
||||
/// Sends a leader change message to the channel and sets the `is_leader` flag.
|
||||
/// If a leader is elected, it will also set the `leader_infancy` flag to true.
|
||||
fn send_leader_change_and_set_flags(
|
||||
is_leader: &AtomicBool,
|
||||
leader_infancy: &AtomicBool,
|
||||
tx: &Sender<LeaderChangeMessage>,
|
||||
msg: LeaderChangeMessage,
|
||||
) {
|
||||
let is_elected = matches!(msg, LeaderChangeMessage::Elected(_));
|
||||
if is_leader
|
||||
.compare_exchange(!is_elected, is_elected, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_ok()
|
||||
{
|
||||
if is_elected {
|
||||
leader_infancy.store(true, Ordering::Release);
|
||||
}
|
||||
if let Err(e) = tx.send(msg) {
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Election: Send + Sync {
|
||||
type Leader;
|
||||
|
||||
@@ -27,8 +27,8 @@ use tokio::sync::broadcast::Receiver;
|
||||
use tokio::time::{timeout, MissedTickBehavior};
|
||||
|
||||
use crate::election::{
|
||||
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT,
|
||||
CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
|
||||
listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
|
||||
LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
|
||||
};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
@@ -247,18 +247,12 @@ impl Election for EtcdElection {
|
||||
}
|
||||
}
|
||||
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_ok()
|
||||
{
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
send_leader_change_and_set_flags(
|
||||
&self.is_leader,
|
||||
&self.infancy,
|
||||
&self.leader_watcher,
|
||||
LeaderChangeMessage::StepDown(Arc::new(leader.clone())),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -305,20 +299,12 @@ impl EtcdElection {
|
||||
);
|
||||
|
||||
// Only after a successful `keep_alive` is the leader considered official.
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_ok()
|
||||
{
|
||||
self.infancy.store(true, Ordering::Release);
|
||||
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::Elected(Arc::new(leader)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
send_leader_change_and_set_flags(
|
||||
&self.is_leader,
|
||||
&self.infancy,
|
||||
&self.leader_watcher,
|
||||
LeaderChangeMessage::Elected(Arc::new(leader.clone())),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
90
src/meta-srv/src/election/rds.rs
Normal file
90
src/meta-srv/src/election/rds.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
pub mod mysql;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
pub mod postgres;
|
||||
|
||||
use common_time::Timestamp;
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::election::LeaderKey;
|
||||
use crate::error::{Result, UnexpectedSnafu};
|
||||
|
||||
// Separator between value and expire time in the lease string.
|
||||
// A lease is put into rds election in the format:
|
||||
// <node_info> || __metadata_lease_sep || <expire_time>
|
||||
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
|
||||
|
||||
/// Parses the value and expire time from the given string retrieved from rds.
|
||||
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
|
||||
let (value, expire_time) =
|
||||
value
|
||||
.split(LEASE_SEP)
|
||||
.collect_tuple()
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Invalid value {}, expect node info || {} || expire time",
|
||||
value, LEASE_SEP
|
||||
),
|
||||
})?;
|
||||
// Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS'
|
||||
let expire_time = match Timestamp::from_str(expire_time, None) {
|
||||
Ok(ts) => ts,
|
||||
Err(_) => UnexpectedSnafu {
|
||||
violated: format!("Invalid timestamp: {}", expire_time),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
Ok((value.to_string(), expire_time))
|
||||
}
|
||||
|
||||
/// LeaderKey used for [LeaderChangeMessage] in rds election components.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct RdsLeaderKey {
|
||||
name: Vec<u8>,
|
||||
key: Vec<u8>,
|
||||
rev: i64,
|
||||
lease: i64,
|
||||
}
|
||||
|
||||
impl LeaderKey for RdsLeaderKey {
|
||||
fn name(&self) -> &[u8] {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn key(&self) -> &[u8] {
|
||||
&self.key
|
||||
}
|
||||
|
||||
fn revision(&self) -> i64 {
|
||||
self.rev
|
||||
}
|
||||
|
||||
fn lease_id(&self) -> i64 {
|
||||
self.lease
|
||||
}
|
||||
}
|
||||
|
||||
/// Lease information for rds election.
|
||||
#[derive(Default, Clone, Debug)]
|
||||
struct Lease {
|
||||
leader_value: String,
|
||||
expire_time: Timestamp,
|
||||
current: Timestamp,
|
||||
// `origin` is the original value of the lease, used for CAS.
|
||||
origin: String,
|
||||
}
|
||||
@@ -16,9 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::{error, warn};
|
||||
use common_telemetry::warn;
|
||||
use common_time::Timestamp;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sqlx::mysql::{MySqlArguments, MySqlRow};
|
||||
use sqlx::query::Query;
|
||||
@@ -26,8 +25,10 @@ use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row};
|
||||
use tokio::sync::{broadcast, Mutex, MutexGuard};
|
||||
use tokio::time::MissedTickBehavior;
|
||||
|
||||
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
|
||||
use crate::election::{
|
||||
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
|
||||
listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
|
||||
CANDIDATES_ROOT, ELECTION_KEY,
|
||||
};
|
||||
use crate::error::{
|
||||
DeserializeFromJsonSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result, SerializeToJsonSnafu,
|
||||
@@ -35,20 +36,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
|
||||
|
||||
// Separator between value and expire time.
|
||||
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
|
||||
|
||||
/// Lease information.
|
||||
/// TODO(CookiePie): PgElection can also use this struct. Refactor it to a common module.
|
||||
#[derive(Default, Clone, Debug)]
|
||||
struct Lease {
|
||||
leader_value: String,
|
||||
expire_time: Timestamp,
|
||||
current: Timestamp,
|
||||
// origin is the origin value of the lease, used for CAS.
|
||||
origin: String,
|
||||
}
|
||||
|
||||
struct ElectionSqlFactory<'a> {
|
||||
table_name: &'a str,
|
||||
meta_lease_ttl_secs: u64,
|
||||
@@ -204,55 +191,6 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time".
|
||||
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
|
||||
let (value, expire_time) =
|
||||
value
|
||||
.split(LEASE_SEP)
|
||||
.collect_tuple()
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Invalid value {}, expect node info || {} || expire time",
|
||||
value, LEASE_SEP
|
||||
),
|
||||
})?;
|
||||
// Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS'
|
||||
let expire_time = match Timestamp::from_str(expire_time, None) {
|
||||
Ok(ts) => ts,
|
||||
Err(_) => UnexpectedSnafu {
|
||||
violated: format!("Invalid timestamp: {}", expire_time),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
Ok((value.to_string(), expire_time))
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct MySqlLeaderKey {
|
||||
name: Vec<u8>,
|
||||
key: Vec<u8>,
|
||||
rev: i64,
|
||||
lease: i64,
|
||||
}
|
||||
|
||||
impl LeaderKey for MySqlLeaderKey {
|
||||
fn name(&self) -> &[u8] {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn key(&self) -> &[u8] {
|
||||
&self.key
|
||||
}
|
||||
|
||||
fn revision(&self) -> i64 {
|
||||
self.rev
|
||||
}
|
||||
|
||||
fn lease_id(&self) -> i64 {
|
||||
self.lease
|
||||
}
|
||||
}
|
||||
|
||||
enum Executor<'a> {
|
||||
Default(MutexGuard<'a, MySqlConnection>),
|
||||
Txn(MySqlTransaction<'a>),
|
||||
@@ -767,23 +705,17 @@ impl MySqlElection {
|
||||
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
|
||||
async fn step_down_without_lock(&self) -> Result<()> {
|
||||
let key = self.election_key().into_bytes();
|
||||
let leader_key = MySqlLeaderKey {
|
||||
let leader_key = RdsLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_ok()
|
||||
{
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
send_leader_change_and_set_flags(
|
||||
&self.is_leader,
|
||||
&self.leader_infancy,
|
||||
&self.leader_watcher,
|
||||
LeaderChangeMessage::StepDown(Arc::new(leader_key)),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -791,7 +723,7 @@ impl MySqlElection {
|
||||
/// Caution: Should only elected while holding the lock.
|
||||
async fn elected(&self, executor: &mut Executor<'_>) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
let leader_key = MySqlLeaderKey {
|
||||
let leader_key = RdsLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone().into_bytes(),
|
||||
..Default::default()
|
||||
@@ -800,20 +732,12 @@ impl MySqlElection {
|
||||
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs, executor)
|
||||
.await?;
|
||||
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_ok()
|
||||
{
|
||||
self.leader_infancy.store(true, Ordering::Release);
|
||||
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
send_leader_change_and_set_flags(
|
||||
&self.is_leader,
|
||||
&self.leader_infancy,
|
||||
&self.leader_watcher,
|
||||
LeaderChangeMessage::Elected(Arc::new(leader_key)),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -18,15 +18,16 @@ use std::time::Duration;
|
||||
|
||||
use common_telemetry::{error, warn};
|
||||
use common_time::Timestamp;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::Client;
|
||||
|
||||
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
|
||||
use crate::election::{
|
||||
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
|
||||
listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
|
||||
CANDIDATES_ROOT, ELECTION_KEY,
|
||||
};
|
||||
use crate::error::{
|
||||
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
|
||||
@@ -34,9 +35,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
|
||||
|
||||
// Separator between value and expire time.
|
||||
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
|
||||
|
||||
struct ElectionSqlFactory<'a> {
|
||||
lock_id: u64,
|
||||
table_name: &'a str,
|
||||
@@ -173,54 +171,6 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time".
|
||||
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
|
||||
let (value, expire_time) = value
|
||||
.split(LEASE_SEP)
|
||||
.collect_tuple()
|
||||
.context(UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Invalid value {}, expect node info || {} || expire time",
|
||||
value, LEASE_SEP
|
||||
),
|
||||
})?;
|
||||
// Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS'
|
||||
let expire_time = match Timestamp::from_str(expire_time, None) {
|
||||
Ok(ts) => ts,
|
||||
Err(_) => UnexpectedSnafu {
|
||||
violated: format!("Invalid timestamp: {}", expire_time),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
Ok((value.to_string(), expire_time))
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct PgLeaderKey {
|
||||
name: Vec<u8>,
|
||||
key: Vec<u8>,
|
||||
rev: i64,
|
||||
lease: i64,
|
||||
}
|
||||
|
||||
impl LeaderKey for PgLeaderKey {
|
||||
fn name(&self) -> &[u8] {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn key(&self) -> &[u8] {
|
||||
&self.key
|
||||
}
|
||||
|
||||
fn revision(&self) -> i64 {
|
||||
self.rev
|
||||
}
|
||||
|
||||
fn lease_id(&self) -> i64 {
|
||||
self.lease
|
||||
}
|
||||
}
|
||||
|
||||
/// PostgreSql implementation of Election.
|
||||
pub struct PgElection {
|
||||
leader_value: String,
|
||||
@@ -314,27 +264,31 @@ impl Election for PgElection {
|
||||
loop {
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
|
||||
let (_, prev_expire_time, current_time, origin) = self
|
||||
.get_value_with_lease(&key, true)
|
||||
let lease = self
|
||||
.get_value_with_lease(&key)
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
.context(UnexpectedSnafu {
|
||||
violated: format!("Failed to get lease for key: {:?}", key),
|
||||
})?;
|
||||
|
||||
ensure!(
|
||||
prev_expire_time > current_time,
|
||||
lease.expire_time > lease.current,
|
||||
UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Candidate lease expired at {:?} (current time {:?}), key: {:?}",
|
||||
prev_expire_time,
|
||||
current_time,
|
||||
String::from_utf8_lossy(&key.into_bytes())
|
||||
lease.expire_time, lease.current, key
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
|
||||
let origin = origin.unwrap();
|
||||
self.update_value_with_lease(&key, &origin, &node_info, self.candidate_lease_ttl_secs)
|
||||
.await?;
|
||||
self.update_value_with_lease(
|
||||
&key,
|
||||
&lease.origin,
|
||||
&node_info,
|
||||
self.candidate_lease_ttl_secs,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -400,11 +354,9 @@ impl Election for PgElection {
|
||||
Ok(self.leader_value.as_bytes().into())
|
||||
} else {
|
||||
let key = self.election_key();
|
||||
if let Some((leader, expire_time, current, _)) =
|
||||
self.get_value_with_lease(&key, false).await?
|
||||
{
|
||||
ensure!(expire_time > current, NoLeaderSnafu);
|
||||
Ok(leader.as_bytes().into())
|
||||
if let Some(lease) = self.get_value_with_lease(&key).await? {
|
||||
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
|
||||
Ok(lease.leader_value.as_bytes().into())
|
||||
} else {
|
||||
NoLeaderSnafu.fail()
|
||||
}
|
||||
@@ -422,11 +374,7 @@ impl Election for PgElection {
|
||||
|
||||
impl PgElection {
|
||||
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
|
||||
async fn get_value_with_lease(
|
||||
&self,
|
||||
key: &str,
|
||||
with_origin: bool,
|
||||
) -> Result<Option<(String, Timestamp, Timestamp, Option<String>)>> {
|
||||
async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
|
||||
let key = key.as_bytes();
|
||||
let res = self
|
||||
.client
|
||||
@@ -451,16 +399,12 @@ impl PgElection {
|
||||
String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
|
||||
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
|
||||
|
||||
if with_origin {
|
||||
Ok(Some((
|
||||
value,
|
||||
expire_time,
|
||||
current_time,
|
||||
Some(value_and_expire_time.to_string()),
|
||||
)))
|
||||
} else {
|
||||
Ok(Some((value, expire_time, current_time, None)))
|
||||
}
|
||||
Ok(Some(Lease {
|
||||
leader_value: value,
|
||||
expire_time,
|
||||
current: current_time,
|
||||
origin: value_and_expire_time.to_string(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -579,16 +523,18 @@ impl PgElection {
|
||||
let key = self.election_key();
|
||||
// Case 1
|
||||
if self.is_leader() {
|
||||
match self.get_value_with_lease(&key, true).await? {
|
||||
Some((prev_leader, expire_time, current, prev)) => {
|
||||
match (prev_leader == self.leader_value, expire_time > current) {
|
||||
match self.get_value_with_lease(&key).await? {
|
||||
Some(lease) => {
|
||||
match (
|
||||
lease.leader_value == self.leader_value,
|
||||
lease.expire_time > lease.current,
|
||||
) {
|
||||
// Case 1.1
|
||||
(true, true) => {
|
||||
// Safety: prev is Some since we are using `get_value_with_lease` with `true`.
|
||||
let prev = prev.unwrap();
|
||||
self.update_value_with_lease(
|
||||
&key,
|
||||
&prev,
|
||||
&lease.origin,
|
||||
&self.leader_value,
|
||||
self.meta_lease_ttl_secs,
|
||||
)
|
||||
@@ -635,12 +581,12 @@ impl PgElection {
|
||||
if self.is_leader() {
|
||||
self.step_down_without_lock().await?;
|
||||
}
|
||||
let (_, expire_time, current, _) = self
|
||||
.get_value_with_lease(&key, false)
|
||||
let lease = self
|
||||
.get_value_with_lease(&key)
|
||||
.await?
|
||||
.context(NoLeaderSnafu)?;
|
||||
// Case 2
|
||||
ensure!(expire_time > current, NoLeaderSnafu);
|
||||
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
|
||||
// Case 3
|
||||
Ok(())
|
||||
}
|
||||
@@ -653,35 +599,29 @@ impl PgElection {
|
||||
/// Should only step down while holding the advisory lock.
|
||||
async fn step_down(&self) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
let leader_key = PgLeaderKey {
|
||||
let leader_key = RdsLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone().into_bytes(),
|
||||
..Default::default()
|
||||
};
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_ok()
|
||||
{
|
||||
self.delete_value(&key).await?;
|
||||
self.client
|
||||
.query(&self.sql_set.step_down, &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
self.delete_value(&key).await?;
|
||||
self.client
|
||||
.query(&self.sql_set.step_down, &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
send_leader_change_and_set_flags(
|
||||
&self.is_leader,
|
||||
&self.leader_infancy,
|
||||
&self.leader_watcher,
|
||||
LeaderChangeMessage::StepDown(Arc::new(leader_key)),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
|
||||
async fn step_down_without_lock(&self) -> Result<()> {
|
||||
let key = self.election_key().into_bytes();
|
||||
let leader_key = PgLeaderKey {
|
||||
let leader_key = RdsLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone(),
|
||||
..Default::default()
|
||||
@@ -705,7 +645,7 @@ impl PgElection {
|
||||
/// Caution: Should only elected while holding the advisory lock.
|
||||
async fn elected(&self) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
let leader_key = PgLeaderKey {
|
||||
let leader_key = RdsLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone().into_bytes(),
|
||||
..Default::default()
|
||||
@@ -800,23 +740,22 @@ mod tests {
|
||||
.unwrap();
|
||||
assert!(res);
|
||||
|
||||
let (value_get, _, _, prev) = pg_election
|
||||
.get_value_with_lease(&key, true)
|
||||
let lease = pg_election
|
||||
.get_value_with_lease(&key)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(value_get, value);
|
||||
assert_eq!(lease.leader_value, value);
|
||||
|
||||
let prev = prev.unwrap();
|
||||
pg_election
|
||||
.update_value_with_lease(&key, &prev, &value, pg_election.meta_lease_ttl_secs)
|
||||
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let res = pg_election.delete_value(&key).await.unwrap();
|
||||
assert!(res);
|
||||
|
||||
let res = pg_election.get_value_with_lease(&key, false).await.unwrap();
|
||||
let res = pg_election.get_value_with_lease(&key).await.unwrap();
|
||||
assert!(res.is_none());
|
||||
|
||||
for i in 0..10 {
|
||||
@@ -963,13 +902,13 @@ mod tests {
|
||||
};
|
||||
|
||||
leader_pg_election.elected().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
let lease = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(lease.leader_value == leader_value);
|
||||
assert!(lease.expire_time > lease.current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
@@ -986,12 +925,12 @@ mod tests {
|
||||
}
|
||||
|
||||
leader_pg_election.step_down_without_lock().await.unwrap();
|
||||
let (leader, _, _, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
let lease = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(lease.leader_value == leader_value);
|
||||
assert!(!leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
@@ -1008,13 +947,13 @@ mod tests {
|
||||
}
|
||||
|
||||
leader_pg_election.elected().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
let lease = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(lease.leader_value == leader_value);
|
||||
assert!(lease.expire_time > lease.current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
@@ -1032,7 +971,7 @@ mod tests {
|
||||
|
||||
leader_pg_election.step_down().await.unwrap();
|
||||
let res = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res.is_none());
|
||||
@@ -1085,13 +1024,13 @@ mod tests {
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
let lease = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(lease.leader_value == leader_value);
|
||||
assert!(lease.expire_time > lease.current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
@@ -1116,13 +1055,15 @@ mod tests {
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let (leader, new_expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
let new_lease = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(new_expire_time > current && new_expire_time > expire_time);
|
||||
assert!(new_lease.leader_value == leader_value);
|
||||
assert!(
|
||||
new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
|
||||
);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
// Step 3: Something wrong, the leader lease expired.
|
||||
@@ -1137,7 +1078,7 @@ mod tests {
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let res = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res.is_none());
|
||||
@@ -1164,13 +1105,13 @@ mod tests {
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
let lease = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(lease.leader_value == leader_value);
|
||||
assert!(lease.expire_time > lease.current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
@@ -1193,7 +1134,7 @@ mod tests {
|
||||
.unwrap();
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let res = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res.is_none());
|
||||
@@ -1221,13 +1162,13 @@ mod tests {
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
let lease = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(lease.leader_value == leader_value);
|
||||
assert!(lease.expire_time > lease.current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
@@ -1261,7 +1202,7 @@ mod tests {
|
||||
.unwrap();
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let res = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.get_value_with_lease(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res.is_none());
|
||||
Reference in New Issue
Block a user