fix(meta): enhance postgres election client with timeouts and reconnection (#6276)

* fix(meta): enhance postgres election client with timeouts and reconnection

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-06-09 21:51:13 +08:00
committed by GitHub
parent 74222c3070
commit 2979aa048e
6 changed files with 470 additions and 178 deletions

View File

@@ -35,6 +35,9 @@ pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 5;
/// The keep-alive interval of the Postgres connection.
pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
/// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;

View File

@@ -31,8 +31,6 @@ use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
#[cfg(feature = "pg_kvbackend")]
use common_telemetry::error;
use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
@@ -270,22 +268,41 @@ pub async fn metasrv_builder(
}
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pool = create_postgres_pool(&opts.store_addrs).await?;
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.
let election_client = create_postgres_client(opts).await?;
use std::time::Duration;
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
use crate::election::rds::postgres::ElectionPgClient;
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
let mut cfg = Config::new();
cfg.keepalives = Some(true);
cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
// We use a separate pool for election since we need a different session keep-alive idle time.
let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
let election_client =
ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
META_LEASE_SECS,
candidate_lease_ttl,
meta_lease_ttl,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
let pool = create_postgres_pool(&opts.store_addrs).await?;
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
(kv_backend, Some(election))
}
#[cfg(feature = "mysql_kvbackend")]
@@ -372,31 +389,24 @@ pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
}
#[cfg(feature = "pg_kvbackend")]
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
let postgres_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!(e; "connection error");
}
});
Ok(client)
/// Creates a pool for the Postgres backend.
///
/// It only use first store addr to create a pool.
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
create_postgres_pool_with(store_addrs, Config::new()).await
}
#[cfg(feature = "pg_kvbackend")]
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
/// Creates a pool for the Postgres backend.
///
/// It only use first store addr to create a pool, and use the given config to create a pool.
pub async fn create_postgres_pool_with(
store_addrs: &[String],
mut cfg: Config,
) -> Result<deadpool_postgres::Pool> {
let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let mut cfg = Config::new();
cfg.url = Some(postgres_url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)

View File

@@ -157,6 +157,11 @@ pub trait Election: Send + Sync {
/// but only one can be the leader at a time.
async fn campaign(&self) -> Result<()>;
/// Resets the campaign.
///
/// Reset the client and the leader flag if needed.
async fn reset_campaign(&self) {}
/// Returns the leader value for the current election.
async fn leader(&self) -> Result<Self::Leader>;

View File

@@ -18,11 +18,12 @@ use std::time::Duration;
use common_telemetry::{error, warn};
use common_time::Timestamp;
use deadpool_postgres::{Manager, Pool};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, RwLock};
use tokio::time::MissedTickBehavior;
use tokio_postgres::types::ToSql;
use tokio_postgres::Client;
use tokio_postgres::Row;
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
use crate::election::{
@@ -30,15 +31,14 @@ use crate::election::{
CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::error::{
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
struct ElectionSqlFactory<'a> {
lock_id: u64,
table_name: &'a str,
meta_lease_ttl_secs: u64,
}
struct ElectionSqlSet {
@@ -88,11 +88,10 @@ struct ElectionSqlSet {
}
impl<'a> ElectionSqlFactory<'a> {
fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self {
fn new(lock_id: u64, table_name: &'a str) -> Self {
Self {
lock_id,
table_name,
meta_lease_ttl_secs,
}
}
@@ -108,15 +107,6 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.meta_lease_ttl_secs + 1
)
}
fn campaign_sql(&self) -> String {
format!("SELECT pg_try_advisory_lock({})", self.lock_id)
}
@@ -171,46 +161,165 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
/// PgClient for election.
pub struct ElectionPgClient {
current: Option<deadpool::managed::Object<Manager>>,
pool: Pool,
/// The client-side timeout for statement execution.
///
/// This timeout is enforced by the client application and is independent of any server-side timeouts.
/// If a statement takes longer than this duration to execute, the client will abort the operation.
execution_timeout: Duration,
/// The idle session timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a session remains idle for longer than this duration, the server will terminate it.
idle_session_timeout: Duration,
/// The statement timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a statement takes longer than this duration to execute, the server will abort it.
statement_timeout: Duration,
}
impl ElectionPgClient {
pub fn new(
pool: Pool,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
Ok(ElectionPgClient {
current: None,
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
})
}
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.idle_session_timeout.as_secs()
)
}
fn set_statement_timeout_sql(&self) -> String {
format!(
"SET statement_timeout = '{}s';",
self.statement_timeout.as_secs()
)
}
async fn reset_client(&mut self) -> Result<()> {
self.current = None;
self.maybe_init_client().await
}
async fn maybe_init_client(&mut self) -> Result<()> {
if self.current.is_none() {
let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
self.current = Some(client);
// Set idle session timeout and statement timeout.
let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
self.execute(&idle_session_timeout_sql, &[]).await?;
let statement_timeout_sql = self.set_statement_timeout_sql();
self.execute(&statement_timeout_sql, &[]).await?;
}
Ok(())
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().execute(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().query(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
}
/// PostgreSql implementation of Election.
pub struct PgElection {
leader_value: String,
client: Client,
pg_client: RwLock<ElectionPgClient>,
is_leader: AtomicBool,
leader_infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
sql_set: ElectionSqlSet,
}
impl PgElection {
async fn maybe_init_client(&self) -> Result<()> {
if self.pg_client.read().await.current.is_none() {
self.pg_client.write().await.maybe_init_client().await?;
}
Ok(())
}
pub async fn with_pg_client(
leader_value: String,
client: Client,
pg_client: ElectionPgClient,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
table_name: &str,
lock_id: u64,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
.await
.context(PostgresExecutionSnafu)?;
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
client,
pg_client: RwLock::new(pg_client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
meta_lease_ttl_secs,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: sql_factory.build(),
}))
}
@@ -249,18 +358,17 @@ impl Election for PgElection {
input: format!("{node_info:?}"),
})?;
let res = self
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
.await?;
// May registered before, just update the lease.
if !res {
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
.await?;
}
// Check if the current lease has expired and renew the lease.
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
loop {
let _ = keep_alive_interval.tick().await;
@@ -282,13 +390,8 @@ impl Election for PgElection {
);
// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
self.update_value_with_lease(
&key,
&lease.origin,
&node_info,
self.candidate_lease_ttl_secs,
)
.await?;
self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
.await?;
}
}
@@ -321,16 +424,17 @@ impl Election for PgElection {
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
/// to perform actions as a follower.
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2));
let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.maybe_init_client().await?;
loop {
let res = self
.client
.query(&self.sql_set.campaign, &[])
.pg_client
.read()
.await
.context(PostgresExecutionSnafu)?;
.query(&self.sql_set.campaign, &[])
.await?;
let row = res.first().context(UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock",
})?;
@@ -349,6 +453,12 @@ impl Election for PgElection {
}
}
async fn reset_campaign(&self) {
if let Err(err) = self.pg_client.write().await.reset_client().await {
error!(err; "Failed to reset client");
}
}
async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
@@ -376,11 +486,13 @@ impl PgElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self
.client
.query(&self.sql_set.get_value_with_lease, &[&key])
.pg_client
.read()
.await
.context(PostgresExecutionSnafu)?;
.query(&self.sql_set.get_value_with_lease, &[&key])
.await?;
if res.is_empty() {
Ok(None)
@@ -414,11 +526,13 @@ impl PgElection {
key_prefix: &str,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
self.maybe_init_client().await?;
let res = self
.client
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
.pg_client
.read()
.await
.context(PostgresExecutionSnafu)?;
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
.await?;
let mut values_with_leases = vec![];
let mut current = Timestamp::default();
@@ -445,18 +559,21 @@ impl PgElection {
key: &str,
prev: &str,
updated: &str,
lease_ttl: u64,
lease_ttl: Duration,
) -> Result<()> {
let key = key.as_bytes();
let prev = prev.as_bytes();
self.maybe_init_client().await?;
let lease_ttl_secs = lease_ttl.as_secs() as f64;
let res = self
.client
.pg_client
.read()
.await
.execute(
&self.sql_set.update_value_with_lease,
&[&key, &prev, &updated, &(lease_ttl as f64)],
&[&key, &prev, &updated, &lease_ttl_secs],
)
.await
.context(PostgresExecutionSnafu)?;
.await?;
ensure!(
res == 1,
@@ -473,16 +590,18 @@ impl PgElection {
&self,
key: &str,
value: &str,
lease_ttl_secs: u64,
lease_ttl: Duration,
) -> Result<bool> {
let key = key.as_bytes();
let lease_ttl_secs = lease_ttl_secs as f64;
let lease_ttl_secs = lease_ttl.as_secs() as f64;
let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
self.maybe_init_client().await?;
let res = self
.client
.query(&self.sql_set.put_value_with_lease, &params)
.pg_client
.read()
.await
.context(PostgresExecutionSnafu)?;
.query(&self.sql_set.put_value_with_lease, &params)
.await?;
Ok(res.is_empty())
}
@@ -490,11 +609,13 @@ impl PgElection {
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &str) -> Result<bool> {
let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self
.client
.query(&self.sql_set.delete_value, &[&key])
.pg_client
.read()
.await
.context(PostgresExecutionSnafu)?;
.query(&self.sql_set.delete_value, &[&key])
.await?;
Ok(res.len() == 1)
}
@@ -536,7 +657,7 @@ impl PgElection {
&key,
&lease.origin,
&self.leader_value,
self.meta_lease_ttl_secs,
self.meta_lease_ttl,
)
.await?;
}
@@ -605,10 +726,12 @@ impl PgElection {
..Default::default()
};
self.delete_value(&key).await?;
self.client
.query(&self.sql_set.step_down, &[])
self.maybe_init_client().await?;
self.pg_client
.read()
.await
.context(PostgresExecutionSnafu)?;
.query(&self.sql_set.step_down, &[])
.await?;
send_leader_change_and_set_flags(
&self.is_leader,
&self.leader_infancy,
@@ -651,7 +774,7 @@ impl PgElection {
..Default::default()
};
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs)
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
.await?;
if self
@@ -674,15 +797,21 @@ impl PgElection {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::env;
use common_meta::maybe_skip_postgres_integration_test;
use tokio_postgres::{Client, NoTls};
use super::*;
use crate::error::PostgresExecutionSnafu;
use crate::bootstrap::create_postgres_pool;
use crate::error;
async fn create_postgres_client(table_name: Option<&str>) -> Result<Client> {
async fn create_postgres_client(
table_name: Option<&str>,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
if endpoint.is_empty() {
return UnexpectedSnafu {
@@ -690,25 +819,34 @@ mod tests {
}
.fail();
}
let (client, connection) = tokio_postgres::connect(&endpoint, NoTls)
.await
.context(PostgresExecutionSnafu)?;
tokio::spawn(async move {
connection.await.context(PostgresExecutionSnafu).unwrap();
});
let pool = create_postgres_pool(&[endpoint]).await.unwrap();
let mut pg_client = ElectionPgClient::new(
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.unwrap();
pg_client.maybe_init_client().await?;
if let Some(table_name) = table_name {
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
table_name
);
client.execute(&create_table_sql, &[]).await.unwrap();
pg_client.execute(&create_table_sql, &[]).await?;
}
Ok(client)
Ok(pg_client)
}
async fn drop_table(client: &Client, table_name: &str) {
async fn drop_table(pg_election: &PgElection, table_name: &str) {
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
client.execute(&sql, &[]).await.unwrap();
pg_election
.pg_client
.read()
.await
.execute(&sql, &[])
.await
.unwrap();
}
#[tokio::test]
@@ -719,23 +857,35 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_postgres_crud_greptime_metakv";
let client = create_postgres_client(Some(table_name)).await.unwrap();
let candidate_lease_ttl = Duration::from_secs(10);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
leader_value: "test_leader".to_string(),
client,
pg_client: RwLock::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs: 10,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
};
let res = pg_election
.put_value_with_lease(&key, &value, 10)
.put_value_with_lease(&key, &value, candidate_lease_ttl)
.await
.unwrap();
assert!(res);
@@ -748,7 +898,7 @@ mod tests {
assert_eq!(lease.leader_value, value);
pg_election
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs)
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
.await
.unwrap();
@@ -762,7 +912,7 @@ mod tests {
let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i);
pg_election
.put_value_with_lease(&key, &value, 10)
.put_value_with_lease(&key, &value, candidate_lease_ttl)
.await
.unwrap();
}
@@ -787,28 +937,39 @@ mod tests {
assert!(res.is_empty());
assert!(current == Timestamp::default());
drop_table(&pg_election.client, table_name).await;
drop_table(&pg_election, table_name).await;
}
async fn candidate(
leader_value: String,
candidate_lease_ttl_secs: u64,
candidate_lease_ttl: Duration,
store_key_prefix: String,
table_name: String,
) {
let client = create_postgres_client(None).await.unwrap();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
leader_value,
client,
pg_client: RwLock::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
};
let node_info = MetasrvNodeInfo {
@@ -824,17 +985,28 @@ mod tests {
async fn test_candidate_registration() {
maybe_skip_postgres_integration_test!();
let leader_value_prefix = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_candidate_registration_greptime_metakv";
let mut handles = vec![];
let client = create_postgres_client(Some(table_name)).await.unwrap();
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
for i in 0..10 {
let leader_value = format!("{}{}", leader_value_prefix, i);
let handle = tokio::spawn(candidate(
leader_value,
candidate_lease_ttl_secs,
candidate_lease_ttl,
uuid.clone(),
table_name.to_string(),
));
@@ -847,14 +1019,14 @@ mod tests {
let leader_value = "test_leader".to_string();
let pg_election = PgElection {
leader_value,
client,
pg_client: RwLock::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
};
let candidates = pg_election.all_candidates().await.unwrap();
@@ -876,29 +1048,40 @@ mod tests {
assert!(res);
}
drop_table(&pg_election.client, table_name).await;
drop_table(&pg_election, table_name).await;
}
#[tokio::test]
async fn test_elected_and_step_down() {
maybe_skip_postgres_integration_test!();
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_elected_and_step_down_greptime_metakv";
let client = create_postgres_client(Some(table_name)).await.unwrap();
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
client,
pg_client: RwLock::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28320, table_name).build(),
};
leader_pg_election.elected().await.unwrap();
@@ -990,7 +1173,7 @@ mod tests {
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
drop_table(&leader_pg_election.client, table_name).await;
drop_table(&leader_pg_election, table_name).await;
}
#[tokio::test]
@@ -999,25 +1182,38 @@ mod tests {
let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_leader_action_greptime_metakv";
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client(Some(table_name)).await.unwrap();
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
client,
pg_client: RwLock::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28321, table_name).build(),
};
// Step 1: No leader exists, campaign and elected.
let res = leader_pg_election
.client
.pg_client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1048,7 +1244,9 @@ mod tests {
// Step 2: As a leader, renew the lease.
let res = leader_pg_election
.client
.pg_client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1070,7 +1268,9 @@ mod tests {
tokio::time::sleep(Duration::from_secs(2)).await;
let res = leader_pg_election
.client
.pg_client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1098,7 +1298,9 @@ mod tests {
// Step 4: Re-campaign and elected.
let res = leader_pg_election
.client
.pg_client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1155,7 +1357,9 @@ mod tests {
// Step 6: Re-campaign and elected.
let res = leader_pg_election
.client
.pg_client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1186,7 +1390,9 @@ mod tests {
// Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election
.client
.pg_client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1197,7 +1403,11 @@ mod tests {
.await
.unwrap();
leader_pg_election
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
.put_value_with_lease(
&leader_pg_election.election_key(),
"test",
Duration::from_secs(10),
)
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
@@ -1223,52 +1433,74 @@ mod tests {
// Clean up
leader_pg_election
.client
.pg_client
.read()
.await
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
drop_table(&leader_pg_election.client, table_name).await;
drop_table(&leader_pg_election, table_name).await;
}
#[tokio::test]
async fn test_follower_action() {
maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_follower_action_greptime_metakv";
let follower_client = create_postgres_client(Some(table_name)).await.unwrap();
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let follower_client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let follower_pg_election = PgElection {
leader_value: "test_follower".to_string(),
client: follower_client,
pg_client: RwLock::new(follower_client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
};
let leader_client = create_postgres_client(Some(table_name)).await.unwrap();
let leader_client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: "test_leader".to_string(),
client: leader_client,
pg_client: RwLock::new(leader_client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
};
leader_pg_election
.client
.pg_client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1309,11 +1541,41 @@ mod tests {
// Clean up
leader_pg_election
.client
.pg_client
.read()
.await
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
drop_table(&follower_pg_election.client, table_name).await;
drop_table(&follower_pg_election, table_name).await;
}
#[tokio::test]
async fn test_idle_session_timeout() {
maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let idle_session_timeout = Duration::from_secs(1);
let mut client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1100)).await;
// Wait for the idle session timeout.
let err = client.query("SELECT 1", &[]).await.unwrap_err();
assert_matches!(err, error::Error::PostgresExecution { .. });
let error::Error::PostgresExecution { error, .. } = err else {
panic!("Expected PostgresExecution error");
};
assert!(error.is_closed());
// Reset the client and try again.
client.reset_client().await.unwrap();
let _ = client.query("SELECT 1", &[]).await.unwrap();
}
}

View File

@@ -748,21 +748,31 @@ pub enum Error {
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to execute via postgres"))]
#[snafu(display("Failed to execute via postgres, sql: {}", sql))]
PostgresExecution {
#[snafu(source)]
error: tokio_postgres::Error,
sql: String,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to connect to Postgres"))]
ConnectPostgres {
#[snafu(source)]
error: tokio_postgres::Error,
#[snafu(display("Failed to get Postgres client"))]
GetPostgresClient {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: deadpool::managed::PoolError<tokio_postgres::Error>,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
SqlExecutionTimeout {
#[snafu(implicit)]
location: Location,
sql: String,
duration: std::time::Duration,
},
#[cfg(feature = "pg_kvbackend")]
@@ -1005,9 +1015,10 @@ impl ErrorExt for Error {
Error::LookupPeer { source, .. } => source.status_code(),
#[cfg(feature = "pg_kvbackend")]
Error::CreatePostgresPool { .. }
| Error::GetPostgresClient { .. }
| Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. }
| Error::ConnectPostgres { .. } => StatusCode::Internal,
| Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { .. }
| Error::CreateMySqlPool { .. }

View File

@@ -582,6 +582,7 @@ impl Metasrv {
if let Err(e) = res {
warn!(e; "Metasrv election error");
}
election.reset_campaign().await;
info!("Metasrv re-initiate election");
}
info!("Metasrv stopped");