mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
feat: init PgElection with candidate registration (#5209)
* feat: init PgElection fix: release advisory lock fix: handle duplicate keys chore: update comments fix: unlock if acquired the lock chore: add TODO and avoid unwrap refactor: check both lock and expire time, add more comments chore: fmt fix: deal with multiple edge cases feat: init PgElection with candidate registration chore: fmt chore: remove * test: add unit test for pg candidate registration * test: add unit test for pg candidate registration * chore: update pg env * chore: make ci happy * fix: spawn a background connection thread * chore: typo * fix: shadow the election client for now * fix: fix ci * chore: readability * chore: follow review comments * refactor: use kvbackend for pg election * chore: rename * chore: make clippy happy * refactor: use pg server time instead of local ones * chore: typo * chore: rename infancy to leader_infancy for clarification * chore: clean up * chore: follow review comments * chore: follow review comments * ci: unit test should test all features * ci: fix * ci: just test pg
This commit is contained in:
2
.github/workflows/develop.yml
vendored
2
.github/workflows/develop.yml
vendored
@@ -697,7 +697,7 @@ jobs:
|
||||
working-directory: tests-integration/fixtures/postgres
|
||||
run: docker compose -f docker-compose-standalone.yml up -d --wait
|
||||
- name: Run nextest cases
|
||||
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
|
||||
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard -F pg_kvbackend
|
||||
env:
|
||||
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::any::Any;
|
||||
use std::borrow::Cow;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::error;
|
||||
use snafu::ResultExt;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::{Client, NoTls};
|
||||
@@ -97,7 +98,11 @@ impl PgStore {
|
||||
let (client, conn) = tokio_postgres::connect(url, NoTls)
|
||||
.await
|
||||
.context(ConnectPostgresSnafu)?;
|
||||
tokio::spawn(async move { conn.await.context(ConnectPostgresSnafu) });
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
error!(e; "connection error");
|
||||
}
|
||||
});
|
||||
Self::with_pg_client(client).await
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ license.workspace = true
|
||||
|
||||
[features]
|
||||
mock = []
|
||||
pg_kvbackend = ["dep:tokio-postgres"]
|
||||
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
@@ -14,6 +14,7 @@ workspace = true
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
async-trait = "0.1"
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
client.workspace = true
|
||||
common-base.workspace = true
|
||||
@@ -55,7 +56,7 @@ snafu.workspace = true
|
||||
store-api.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-postgres = { workspace = true, optional = true }
|
||||
tokio-postgres = { workspace = true, optional = true, features = ["with-chrono-0_4"] }
|
||||
tokio-stream = { workspace = true, features = ["net"] }
|
||||
toml.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
@@ -26,6 +26,8 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use common_meta::kv_backend::postgres::PgStore;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::info;
|
||||
use etcd_client::Client;
|
||||
use futures::future;
|
||||
@@ -224,8 +226,9 @@ pub async fn metasrv_builder(
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
(None, BackendImpl::PostgresStore) => {
|
||||
let pg_client = create_postgres_client(opts).await?;
|
||||
let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap();
|
||||
// TODO(jeremy, weny): implement election for postgres
|
||||
let kv_backend = PgStore::with_pg_client(pg_client)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
(kv_backend, None)
|
||||
}
|
||||
};
|
||||
@@ -275,8 +278,14 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres:
|
||||
let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
let (client, _) = tokio_postgres::connect(postgres_url, NoTls)
|
||||
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
|
||||
.await
|
||||
.context(error::ConnectPostgresSnafu)?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!(e; "connection error");
|
||||
}
|
||||
});
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
@@ -13,11 +13,12 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod etcd;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
pub mod postgres;
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::{self, Debug};
|
||||
use std::sync::Arc;
|
||||
|
||||
use etcd_client::LeaderKey;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -26,10 +27,31 @@ use crate::metasrv::MetasrvNodeInfo;
|
||||
pub const ELECTION_KEY: &str = "__metasrv_election";
|
||||
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
|
||||
|
||||
pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600;
|
||||
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
|
||||
|
||||
/// Messages sent when the leader changes.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum LeaderChangeMessage {
|
||||
Elected(Arc<LeaderKey>),
|
||||
StepDown(Arc<LeaderKey>),
|
||||
Elected(Arc<dyn LeaderKey>),
|
||||
StepDown(Arc<dyn LeaderKey>),
|
||||
}
|
||||
|
||||
/// LeaderKey is a key that represents the leader of metasrv.
|
||||
/// The structure is corresponding to [etcd_client::LeaderKey].
|
||||
pub trait LeaderKey: Send + Sync + Debug {
|
||||
/// The name in byte. name is the election identifier that corresponds to the leadership key.
|
||||
fn name(&self) -> &[u8];
|
||||
|
||||
/// The key in byte. key is an opaque key representing the ownership of the election. If the key
|
||||
/// is deleted, then leadership is lost.
|
||||
fn key(&self) -> &[u8];
|
||||
|
||||
/// The creation revision of the key.
|
||||
fn revision(&self) -> i64;
|
||||
|
||||
/// The lease ID of the election leader.
|
||||
fn lease_id(&self) -> i64;
|
||||
}
|
||||
|
||||
impl fmt::Display for LeaderChangeMessage {
|
||||
@@ -47,8 +69,8 @@ impl fmt::Display for LeaderChangeMessage {
|
||||
write!(f, "LeaderKey {{ ")?;
|
||||
write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
|
||||
write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
|
||||
write!(f, ", rev: {}", leader_key.rev())?;
|
||||
write!(f, ", lease: {}", leader_key.lease())?;
|
||||
write!(f, ", rev: {}", leader_key.revision())?;
|
||||
write!(f, ", lease: {}", leader_key.lease_id())?;
|
||||
write!(f, " }})")
|
||||
}
|
||||
}
|
||||
@@ -65,7 +87,7 @@ pub trait Election: Send + Sync {
|
||||
/// initialization operations can be performed.
|
||||
///
|
||||
/// note: a new leader will only return true on the first call.
|
||||
fn in_infancy(&self) -> bool;
|
||||
fn in_leader_infancy(&self) -> bool;
|
||||
|
||||
/// Registers a candidate for the election.
|
||||
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
|
||||
|
||||
@@ -18,18 +18,41 @@ use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions};
|
||||
use etcd_client::{
|
||||
Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
|
||||
};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::time::{timeout, MissedTickBehavior};
|
||||
|
||||
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use crate::election::{
|
||||
Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY,
|
||||
KEEP_ALIVE_INTERVAL_SECS,
|
||||
};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
|
||||
|
||||
impl LeaderKey for EtcdLeaderKey {
|
||||
fn name(&self) -> &[u8] {
|
||||
self.name()
|
||||
}
|
||||
|
||||
fn key(&self) -> &[u8] {
|
||||
self.key()
|
||||
}
|
||||
|
||||
fn revision(&self) -> i64 {
|
||||
self.rev()
|
||||
}
|
||||
|
||||
fn lease_id(&self) -> i64 {
|
||||
self.lease()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EtcdElection {
|
||||
leader_value: String,
|
||||
client: Client,
|
||||
@@ -75,15 +98,15 @@ impl EtcdElection {
|
||||
LeaderChangeMessage::Elected(key) => {
|
||||
info!(
|
||||
"[{leader_ident}] is elected as leader: {:?}, lease: {}",
|
||||
key.name_str(),
|
||||
key.lease()
|
||||
String::from_utf8_lossy(key.name()),
|
||||
key.lease_id()
|
||||
);
|
||||
}
|
||||
LeaderChangeMessage::StepDown(key) => {
|
||||
warn!(
|
||||
"[{leader_ident}] is stepping down: {:?}, lease: {}",
|
||||
key.name_str(),
|
||||
key.lease()
|
||||
String::from_utf8_lossy(key.name()),
|
||||
key.lease_id()
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -126,16 +149,13 @@ impl Election for EtcdElection {
|
||||
self.is_leader.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn in_infancy(&self) -> bool {
|
||||
fn in_leader_infancy(&self) -> bool {
|
||||
self.infancy
|
||||
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
|
||||
const CANDIDATE_LEASE_SECS: u64 = 600;
|
||||
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
|
||||
|
||||
let mut lease_client = self.client.lease_client();
|
||||
let res = lease_client
|
||||
.grant(CANDIDATE_LEASE_SECS as i64, None)
|
||||
@@ -239,7 +259,7 @@ impl Election for EtcdElection {
|
||||
// The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
|
||||
match timeout(
|
||||
keep_lease_duration,
|
||||
self.keep_alive(&mut keeper, &mut receiver, leader),
|
||||
self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -303,7 +323,7 @@ impl EtcdElection {
|
||||
&self,
|
||||
keeper: &mut LeaseKeeper,
|
||||
receiver: &mut LeaseKeepAliveStream,
|
||||
leader: &LeaderKey,
|
||||
leader: EtcdLeaderKey,
|
||||
) -> Result<()> {
|
||||
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
|
||||
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
|
||||
@@ -324,7 +344,7 @@ impl EtcdElection {
|
||||
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
|
||||
.send(LeaderChangeMessage::Elected(Arc::new(leader)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
|
||||
519
src/meta-srv/src/election/postgres.rs
Normal file
519
src/meta-srv/src/election/postgres.rs
Normal file
@@ -0,0 +1,519 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_time::Timestamp;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio_postgres::Client;
|
||||
|
||||
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use crate::error::{
|
||||
DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
|
||||
|
||||
// Separator between value and expire time.
|
||||
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
|
||||
|
||||
// SQL to put a value with expire time. Parameters: key, value, LEASE_SEP, expire_time
|
||||
const PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME: &str = r#"
|
||||
WITH prev AS (
|
||||
SELECT k, v FROM greptime_metakv WHERE k = $1
|
||||
), insert AS (
|
||||
INSERT INTO greptime_metakv
|
||||
VALUES($1, $2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'))
|
||||
ON CONFLICT (k) DO NOTHING
|
||||
)
|
||||
|
||||
SELECT k, v FROM prev;
|
||||
"#;
|
||||
|
||||
// SQL to update a value with expire time. Parameters: key, prev_value_with_lease, updated_value, LEASE_SEP, expire_time
|
||||
const CAS_WITH_EXPIRE_TIME: &str = r#"
|
||||
UPDATE greptime_metakv
|
||||
SET k=$1,
|
||||
v=$3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS')
|
||||
WHERE
|
||||
k=$1 AND v=$2
|
||||
"#;
|
||||
|
||||
const GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k = $1"#;
|
||||
|
||||
const PREFIX_GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k LIKE $1"#;
|
||||
|
||||
const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k,v;";
|
||||
|
||||
/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time".
|
||||
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
|
||||
let (value, expire_time) = value
|
||||
.split(LEASE_SEP)
|
||||
.collect_tuple()
|
||||
.context(UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Invalid value {}, expect node info || {} || expire time",
|
||||
value, LEASE_SEP
|
||||
),
|
||||
})?;
|
||||
// Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS'
|
||||
let expire_time = match Timestamp::from_str(expire_time, None) {
|
||||
Ok(ts) => ts,
|
||||
Err(_) => UnexpectedSnafu {
|
||||
violated: format!("Invalid timestamp: {}", expire_time),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
Ok((value.to_string(), expire_time))
|
||||
}
|
||||
|
||||
/// PostgreSql implementation of Election.
|
||||
/// TODO(CookiePie): Currently only support candidate registration. Add election logic.
|
||||
pub struct PgElection {
|
||||
leader_value: String,
|
||||
client: Client,
|
||||
is_leader: AtomicBool,
|
||||
leader_infancy: AtomicBool,
|
||||
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
|
||||
store_key_prefix: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
}
|
||||
|
||||
impl PgElection {
|
||||
pub async fn with_pg_client(
|
||||
leader_value: String,
|
||||
client: Client,
|
||||
store_key_prefix: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
) -> Result<ElectionRef> {
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
Ok(Arc::new(Self {
|
||||
leader_value,
|
||||
client,
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(false),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix,
|
||||
candidate_lease_ttl_secs,
|
||||
}))
|
||||
}
|
||||
|
||||
fn _election_key(&self) -> String {
|
||||
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
|
||||
}
|
||||
|
||||
fn candidate_root(&self) -> String {
|
||||
format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
|
||||
}
|
||||
|
||||
fn candidate_key(&self) -> String {
|
||||
format!("{}{}", self.candidate_root(), self.leader_value)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Election for PgElection {
|
||||
type Leader = LeaderValue;
|
||||
|
||||
fn is_leader(&self) -> bool {
|
||||
self.is_leader.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn in_leader_infancy(&self) -> bool {
|
||||
self.leader_infancy
|
||||
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
/// TODO(CookiePie): Split the candidate registration and keep alive logic into separate methods, so that upper layers can call them separately.
|
||||
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
|
||||
let key = self.candidate_key();
|
||||
let node_info =
|
||||
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
|
||||
input: format!("{node_info:?}"),
|
||||
})?;
|
||||
let res = self.put_value_with_lease(&key, &node_info).await?;
|
||||
// May registered before, just update the lease.
|
||||
if !res {
|
||||
self.delete_value(&key).await?;
|
||||
self.put_value_with_lease(&key, &node_info).await?;
|
||||
}
|
||||
|
||||
// Check if the current lease has expired and renew the lease.
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
|
||||
loop {
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
|
||||
let (_, prev_expire_time, current_time, origin) = self
|
||||
.get_value_with_lease(&key, true)
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
|
||||
ensure!(
|
||||
prev_expire_time > current_time,
|
||||
UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Candidate lease expired, key: {:?}",
|
||||
String::from_utf8_lossy(&key.into_bytes())
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
|
||||
let origin = origin.unwrap();
|
||||
self.update_value_with_lease(&key, &origin, &node_info)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
|
||||
let key_prefix = self.candidate_root();
|
||||
let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
|
||||
// Remove expired candidates
|
||||
candidates.retain(|c| c.1 > current);
|
||||
let mut valid_candidates = Vec::with_capacity(candidates.len());
|
||||
for (c, _) in candidates {
|
||||
let node_info: MetasrvNodeInfo =
|
||||
serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
|
||||
input: format!("{:?}", c),
|
||||
})?;
|
||||
valid_candidates.push(node_info);
|
||||
}
|
||||
Ok(valid_candidates)
|
||||
}
|
||||
|
||||
async fn campaign(&self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn leader(&self) -> Result<Self::Leader> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn resign(&self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
|
||||
self.leader_watcher.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
impl PgElection {
|
||||
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
|
||||
async fn get_value_with_lease(
|
||||
&self,
|
||||
key: &String,
|
||||
with_origin: bool,
|
||||
) -> Result<Option<(String, Timestamp, Timestamp, Option<String>)>> {
|
||||
let res = self
|
||||
.client
|
||||
.query(GET_WITH_CURRENT_TIMESTAMP, &[&key])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
|
||||
if res.is_empty() {
|
||||
Ok(None)
|
||||
} else {
|
||||
// Safety: Checked if res is empty above.
|
||||
let current_time_str = res[0].get(1);
|
||||
let current_time = match Timestamp::from_str(current_time_str, None) {
|
||||
Ok(ts) => ts,
|
||||
Err(_) => UnexpectedSnafu {
|
||||
violated: format!("Invalid timestamp: {}", current_time_str),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
// Safety: Checked if res is empty above.
|
||||
let value_and_expire_time = res[0].get(0);
|
||||
let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?;
|
||||
|
||||
if with_origin {
|
||||
Ok(Some((
|
||||
value,
|
||||
expire_time,
|
||||
current_time,
|
||||
Some(value_and_expire_time.to_string()),
|
||||
)))
|
||||
} else {
|
||||
Ok(Some((value, expire_time, current_time, None)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns all values and expire time with the given key prefix. Also returns the current time.
|
||||
async fn get_value_with_lease_by_prefix(
|
||||
&self,
|
||||
key_prefix: &str,
|
||||
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
|
||||
let key_prefix = format!("{}%", key_prefix);
|
||||
let res = self
|
||||
.client
|
||||
.query(PREFIX_GET_WITH_CURRENT_TIMESTAMP, &[&key_prefix])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
|
||||
let mut values_with_leases = vec![];
|
||||
let mut current = Timestamp::default();
|
||||
for row in res {
|
||||
let current_time_str = row.get(1);
|
||||
current = match Timestamp::from_str(current_time_str, None) {
|
||||
Ok(ts) => ts,
|
||||
Err(_) => UnexpectedSnafu {
|
||||
violated: format!("Invalid timestamp: {}", current_time_str),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
|
||||
let value_and_expire_time = row.get(0);
|
||||
let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?;
|
||||
|
||||
values_with_leases.push((value, expire_time));
|
||||
}
|
||||
Ok((values_with_leases, current))
|
||||
}
|
||||
|
||||
async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> {
|
||||
let res = self
|
||||
.client
|
||||
.execute(
|
||||
CAS_WITH_EXPIRE_TIME,
|
||||
&[
|
||||
&key,
|
||||
&prev,
|
||||
&updated,
|
||||
&LEASE_SEP,
|
||||
&(self.candidate_lease_ttl_secs as f64),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
|
||||
ensure!(
|
||||
res == 1,
|
||||
UnexpectedSnafu {
|
||||
violated: format!("Failed to update key: {}", key),
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns `true` if the insertion is successful
|
||||
async fn put_value_with_lease(&self, key: &str, value: &str) -> Result<bool> {
|
||||
let res = self
|
||||
.client
|
||||
.query(
|
||||
PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME,
|
||||
&[
|
||||
&key,
|
||||
&value,
|
||||
&LEASE_SEP,
|
||||
&(self.candidate_lease_ttl_secs as f64),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
Ok(res.is_empty())
|
||||
}
|
||||
|
||||
/// Returns `true` if the deletion is successful.
|
||||
/// Caution: Should only delete the key if the lease is expired.
|
||||
async fn delete_value(&self, key: &String) -> Result<bool> {
|
||||
let res = self
|
||||
.client
|
||||
.query(POINT_DELETE, &[&key])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
|
||||
Ok(res.len() == 1)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::env;
|
||||
|
||||
use tokio_postgres::{Client, NoTls};
|
||||
|
||||
use super::*;
|
||||
use crate::error::PostgresExecutionSnafu;
|
||||
|
||||
async fn create_postgres_client() -> Result<Client> {
|
||||
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
|
||||
if endpoint.is_empty() {
|
||||
return UnexpectedSnafu {
|
||||
violated: "Postgres endpoint is empty".to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let (client, connection) = tokio_postgres::connect(&endpoint, NoTls)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
tokio::spawn(async move {
|
||||
connection.await.context(PostgresExecutionSnafu).unwrap();
|
||||
});
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_postgres_crud() {
|
||||
let client = create_postgres_client().await.unwrap();
|
||||
|
||||
let key = "test_key".to_string();
|
||||
let value = "test_value".to_string();
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let pg_election = PgElection {
|
||||
leader_value: "test_leader".to_string(),
|
||||
client,
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: "test_prefix".to_string(),
|
||||
candidate_lease_ttl_secs: 10,
|
||||
};
|
||||
|
||||
let res = pg_election
|
||||
.put_value_with_lease(&key, &value)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res);
|
||||
|
||||
let (value, _, _, prev) = pg_election
|
||||
.get_value_with_lease(&key, true)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(value, value);
|
||||
|
||||
let prev = prev.unwrap();
|
||||
pg_election
|
||||
.update_value_with_lease(&key, &prev, &value)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let res = pg_election.delete_value(&key).await.unwrap();
|
||||
assert!(res);
|
||||
|
||||
let res = pg_election.get_value_with_lease(&key, false).await.unwrap();
|
||||
assert!(res.is_none());
|
||||
|
||||
for i in 0..10 {
|
||||
let key = format!("test_key_{}", i);
|
||||
let value = format!("test_value_{}", i);
|
||||
pg_election
|
||||
.put_value_with_lease(&key, &value)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let key_prefix = "test_key".to_string();
|
||||
let (res, _) = pg_election
|
||||
.get_value_with_lease_by_prefix(&key_prefix)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.len(), 10);
|
||||
|
||||
for i in 0..10 {
|
||||
let key = format!("test_key_{}", i);
|
||||
let res = pg_election.delete_value(&key).await.unwrap();
|
||||
assert!(res);
|
||||
}
|
||||
|
||||
let (res, current) = pg_election
|
||||
.get_value_with_lease_by_prefix(&key_prefix)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res.is_empty());
|
||||
assert!(current == Timestamp::default());
|
||||
}
|
||||
|
||||
async fn candidate(leader_value: String, candidate_lease_ttl_secs: u64) {
|
||||
let client = create_postgres_client().await.unwrap();
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let pg_election = PgElection {
|
||||
leader_value,
|
||||
client,
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: "test_prefix".to_string(),
|
||||
candidate_lease_ttl_secs,
|
||||
};
|
||||
|
||||
let node_info = MetasrvNodeInfo {
|
||||
addr: "test_addr".to_string(),
|
||||
version: "test_version".to_string(),
|
||||
git_commit: "test_git_commit".to_string(),
|
||||
start_time_ms: 0,
|
||||
};
|
||||
pg_election.register_candidate(&node_info).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_candidate_registration() {
|
||||
let leader_value_prefix = "test_leader".to_string();
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let mut handles = vec![];
|
||||
for i in 0..10 {
|
||||
let leader_value = format!("{}{}", leader_value_prefix, i);
|
||||
let handle = tokio::spawn(candidate(leader_value, candidate_lease_ttl_secs));
|
||||
handles.push(handle);
|
||||
}
|
||||
// Wait for candidates to registrate themselves and renew their leases at least once.
|
||||
tokio::time::sleep(Duration::from_secs(6)).await;
|
||||
|
||||
let client = create_postgres_client().await.unwrap();
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let leader_value = "test_leader".to_string();
|
||||
let pg_election = PgElection {
|
||||
leader_value,
|
||||
client,
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: "test_prefix".to_string(),
|
||||
candidate_lease_ttl_secs,
|
||||
};
|
||||
|
||||
let candidates = pg_election.all_candidates().await.unwrap();
|
||||
assert_eq!(candidates.len(), 10);
|
||||
|
||||
for handle in handles {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
// Wait for the candidate leases to expire.
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
let candidates = pg_election.all_candidates().await.unwrap();
|
||||
assert!(candidates.is_empty());
|
||||
|
||||
// Garbage collection
|
||||
for i in 0..10 {
|
||||
let key = format!(
|
||||
"{}{}{}{}",
|
||||
"test_prefix", CANDIDATES_ROOT, leader_value_prefix, i
|
||||
);
|
||||
let res = pg_election.delete_value(&key).await.unwrap();
|
||||
assert!(res);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -697,6 +697,8 @@ pub enum Error {
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to execute via postgres"))]
|
||||
PostgresExecution {
|
||||
#[snafu(source)]
|
||||
error: tokio_postgres::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
@@ -36,7 +36,7 @@ impl HeartbeatHandler for OnLeaderStartHandler {
|
||||
return Ok(HandleControl::Continue);
|
||||
};
|
||||
|
||||
if election.in_infancy() {
|
||||
if election.in_leader_infancy() {
|
||||
ctx.is_infancy = true;
|
||||
// TODO(weny): Unifies the multiple leader state between Context and Metasrv.
|
||||
// we can't ensure the in-memory kv has already been reset in the outside loop.
|
||||
|
||||
Reference in New Issue
Block a user