diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index 30e779c31f..99181dcd0c 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -35,6 +35,9 @@ pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS; /// The lease seconds of metasrv leader. pub const META_LEASE_SECS: u64 = 5; +/// The keep-alive interval of the Postgres connection. +pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30; + /// In a lease, there are two opportunities for renewal. pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2; diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 717aae19ec..6ce27978f6 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -31,8 +31,6 @@ use common_meta::kv_backend::rds::MySqlStore; #[cfg(feature = "pg_kvbackend")] use common_meta::kv_backend::rds::PgStore; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; -#[cfg(feature = "pg_kvbackend")] -use common_telemetry::error; use common_telemetry::info; #[cfg(feature = "pg_kvbackend")] use deadpool_postgres::{Config, Runtime}; @@ -270,22 +268,41 @@ pub async fn metasrv_builder( } #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { - let pool = create_postgres_pool(&opts.store_addrs).await?; - let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) - .await - .context(error::KvBackendSnafu)?; - // Client for election should be created separately since we need a different session keep-alive idle time. - let election_client = create_postgres_client(opts).await?; + use std::time::Duration; + + use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS; + + use crate::election::rds::postgres::ElectionPgClient; + + let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS); + let execution_timeout = Duration::from_secs(META_LEASE_SECS); + let statement_timeout = Duration::from_secs(META_LEASE_SECS); + let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS); + + let mut cfg = Config::new(); + cfg.keepalives = Some(true); + cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS)); + // We use a separate pool for election since we need a different session keep-alive idle time. + let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?; + + let election_client = + ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?; let election = PgElection::with_pg_client( opts.server_addr.clone(), election_client, opts.store_key_prefix.clone(), - CANDIDATE_LEASE_SECS, - META_LEASE_SECS, + candidate_lease_ttl, + meta_lease_ttl, &opts.meta_table_name, opts.meta_election_lock_id, ) .await?; + + let pool = create_postgres_pool(&opts.store_addrs).await?; + let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) + .await + .context(error::KvBackendSnafu)?; + (kv_backend, Some(election)) } #[cfg(feature = "mysql_kvbackend")] @@ -372,31 +389,24 @@ pub async fn create_etcd_client(store_addrs: &[String]) -> Result { } #[cfg(feature = "pg_kvbackend")] -async fn create_postgres_client(opts: &MetasrvOptions) -> Result { - let postgres_url = opts - .store_addrs - .first() - .context(error::InvalidArgumentsSnafu { - err_msg: "empty store addrs", - })?; - let (client, connection) = tokio_postgres::connect(postgres_url, NoTls) - .await - .context(error::ConnectPostgresSnafu)?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - error!(e; "connection error"); - } - }); - Ok(client) +/// Creates a pool for the Postgres backend. +/// +/// It only use first store addr to create a pool. +pub async fn create_postgres_pool(store_addrs: &[String]) -> Result { + create_postgres_pool_with(store_addrs, Config::new()).await } #[cfg(feature = "pg_kvbackend")] -pub async fn create_postgres_pool(store_addrs: &[String]) -> Result { +/// Creates a pool for the Postgres backend. +/// +/// It only use first store addr to create a pool, and use the given config to create a pool. +pub async fn create_postgres_pool_with( + store_addrs: &[String], + mut cfg: Config, +) -> Result { let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu { err_msg: "empty store addrs", })?; - let mut cfg = Config::new(); cfg.url = Some(postgres_url.to_string()); let pool = cfg .create_pool(Some(Runtime::Tokio1), NoTls) diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index e4b569b99e..8163e2b9ad 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -157,6 +157,11 @@ pub trait Election: Send + Sync { /// but only one can be the leader at a time. async fn campaign(&self) -> Result<()>; + /// Resets the campaign. + /// + /// Reset the client and the leader flag if needed. + async fn reset_campaign(&self) {} + /// Returns the leader value for the current election. async fn leader(&self) -> Result; diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index 1e970c40df..b8c4ff718e 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -18,11 +18,12 @@ use std::time::Duration; use common_telemetry::{error, warn}; use common_time::Timestamp; +use deadpool_postgres::{Manager, Pool}; use snafu::{ensure, OptionExt, ResultExt}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, RwLock}; use tokio::time::MissedTickBehavior; use tokio_postgres::types::ToSql; -use tokio_postgres::Client; +use tokio_postgres::Row; use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP}; use crate::election::{ @@ -30,15 +31,14 @@ use crate::election::{ CANDIDATES_ROOT, ELECTION_KEY, }; use crate::error::{ - DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, - UnexpectedSnafu, + DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu, + Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu, }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; struct ElectionSqlFactory<'a> { lock_id: u64, table_name: &'a str, - meta_lease_ttl_secs: u64, } struct ElectionSqlSet { @@ -88,11 +88,10 @@ struct ElectionSqlSet { } impl<'a> ElectionSqlFactory<'a> { - fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self { + fn new(lock_id: u64, table_name: &'a str) -> Self { Self { lock_id, table_name, - meta_lease_ttl_secs, } } @@ -108,15 +107,6 @@ impl<'a> ElectionSqlFactory<'a> { } } - // Currently the session timeout is longer than the leader lease time. - // So the leader will renew the lease twice before the session timeout if everything goes well. - fn set_idle_session_timeout_sql(&self) -> String { - format!( - "SET idle_session_timeout = '{}s';", - self.meta_lease_ttl_secs + 1 - ) - } - fn campaign_sql(&self) -> String { format!("SELECT pg_try_advisory_lock({})", self.lock_id) } @@ -171,46 +161,165 @@ impl<'a> ElectionSqlFactory<'a> { } } +/// PgClient for election. +pub struct ElectionPgClient { + current: Option>, + pool: Pool, + /// The client-side timeout for statement execution. + /// + /// This timeout is enforced by the client application and is independent of any server-side timeouts. + /// If a statement takes longer than this duration to execute, the client will abort the operation. + execution_timeout: Duration, + + /// The idle session timeout. + /// + /// This timeout is configured per client session and is enforced by the PostgreSQL server. + /// If a session remains idle for longer than this duration, the server will terminate it. + idle_session_timeout: Duration, + + /// The statement timeout. + /// + /// This timeout is configured per client session and is enforced by the PostgreSQL server. + /// If a statement takes longer than this duration to execute, the server will abort it. + statement_timeout: Duration, +} + +impl ElectionPgClient { + pub fn new( + pool: Pool, + execution_timeout: Duration, + idle_session_timeout: Duration, + statement_timeout: Duration, + ) -> Result { + Ok(ElectionPgClient { + current: None, + pool, + execution_timeout, + idle_session_timeout, + statement_timeout, + }) + } + + fn set_idle_session_timeout_sql(&self) -> String { + format!( + "SET idle_session_timeout = '{}s';", + self.idle_session_timeout.as_secs() + ) + } + + fn set_statement_timeout_sql(&self) -> String { + format!( + "SET statement_timeout = '{}s';", + self.statement_timeout.as_secs() + ) + } + + async fn reset_client(&mut self) -> Result<()> { + self.current = None; + self.maybe_init_client().await + } + + async fn maybe_init_client(&mut self) -> Result<()> { + if self.current.is_none() { + let client = self.pool.get().await.context(GetPostgresClientSnafu)?; + + self.current = Some(client); + // Set idle session timeout and statement timeout. + let idle_session_timeout_sql = self.set_idle_session_timeout_sql(); + self.execute(&idle_session_timeout_sql, &[]).await?; + let statement_timeout_sql = self.set_statement_timeout_sql(); + self.execute(&statement_timeout_sql, &[]).await?; + } + + Ok(()) + } + + /// Returns the result of the query. + /// + /// # Panics + /// if `current` is `None`. + async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result { + let result = tokio::time::timeout( + self.execution_timeout, + self.current.as_ref().unwrap().execute(sql, params), + ) + .await + .map_err(|_| { + SqlExecutionTimeoutSnafu { + sql: sql.to_string(), + duration: self.execution_timeout, + } + .build() + })?; + + result.context(PostgresExecutionSnafu { sql }) + } + + /// Returns the result of the query. + /// + /// # Panics + /// if `current` is `None`. + async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result> { + let result = tokio::time::timeout( + self.execution_timeout, + self.current.as_ref().unwrap().query(sql, params), + ) + .await + .map_err(|_| { + SqlExecutionTimeoutSnafu { + sql: sql.to_string(), + duration: self.execution_timeout, + } + .build() + })?; + + result.context(PostgresExecutionSnafu { sql }) + } +} + /// PostgreSql implementation of Election. pub struct PgElection { leader_value: String, - client: Client, + pg_client: RwLock, is_leader: AtomicBool, leader_infancy: AtomicBool, leader_watcher: broadcast::Sender, store_key_prefix: String, - candidate_lease_ttl_secs: u64, - meta_lease_ttl_secs: u64, + candidate_lease_ttl: Duration, + meta_lease_ttl: Duration, sql_set: ElectionSqlSet, } impl PgElection { + async fn maybe_init_client(&self) -> Result<()> { + if self.pg_client.read().await.current.is_none() { + self.pg_client.write().await.maybe_init_client().await?; + } + + Ok(()) + } + pub async fn with_pg_client( leader_value: String, - client: Client, + pg_client: ElectionPgClient, store_key_prefix: String, - candidate_lease_ttl_secs: u64, - meta_lease_ttl_secs: u64, + candidate_lease_ttl: Duration, + meta_lease_ttl: Duration, table_name: &str, lock_id: u64, ) -> Result { - let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs); - // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. - client - .execute(&sql_factory.set_idle_session_timeout_sql(), &[]) - .await - .context(PostgresExecutionSnafu)?; + let sql_factory = ElectionSqlFactory::new(lock_id, table_name); let tx = listen_leader_change(leader_value.clone()); Ok(Arc::new(Self { leader_value, - client, + pg_client: RwLock::new(pg_client), is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(false), leader_watcher: tx, store_key_prefix, - candidate_lease_ttl_secs, - meta_lease_ttl_secs, + candidate_lease_ttl, + meta_lease_ttl, sql_set: sql_factory.build(), })) } @@ -249,18 +358,17 @@ impl Election for PgElection { input: format!("{node_info:?}"), })?; let res = self - .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs) + .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl) .await?; // May registered before, just update the lease. if !res { self.delete_value(&key).await?; - self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs) + self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl) .await?; } // Check if the current lease has expired and renew the lease. - let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2)); + let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2); loop { let _ = keep_alive_interval.tick().await; @@ -282,13 +390,8 @@ impl Election for PgElection { ); // Safety: origin is Some since we are using `get_value_with_lease` with `true`. - self.update_value_with_lease( - &key, - &lease.origin, - &node_info, - self.candidate_lease_ttl_secs, - ) - .await?; + self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl) + .await?; } } @@ -321,16 +424,17 @@ impl Election for PgElection { /// - If the lock is not acquired (result is false), it calls the `follower_action` method /// to perform actions as a follower. async fn campaign(&self) -> Result<()> { - let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2)); + let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2); keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + self.maybe_init_client().await?; loop { let res = self - .client - .query(&self.sql_set.campaign, &[]) + .pg_client + .read() .await - .context(PostgresExecutionSnafu)?; + .query(&self.sql_set.campaign, &[]) + .await?; let row = res.first().context(UnexpectedSnafu { violated: "Failed to get the result of acquiring advisory lock", })?; @@ -349,6 +453,12 @@ impl Election for PgElection { } } + async fn reset_campaign(&self) { + if let Err(err) = self.pg_client.write().await.reset_client().await { + error!(err; "Failed to reset client"); + } + } + async fn leader(&self) -> Result { if self.is_leader.load(Ordering::Relaxed) { Ok(self.leader_value.as_bytes().into()) @@ -376,11 +486,13 @@ impl PgElection { /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned. async fn get_value_with_lease(&self, key: &str) -> Result> { let key = key.as_bytes(); + self.maybe_init_client().await?; let res = self - .client - .query(&self.sql_set.get_value_with_lease, &[&key]) + .pg_client + .read() .await - .context(PostgresExecutionSnafu)?; + .query(&self.sql_set.get_value_with_lease, &[&key]) + .await?; if res.is_empty() { Ok(None) @@ -414,11 +526,13 @@ impl PgElection { key_prefix: &str, ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> { let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec(); + self.maybe_init_client().await?; let res = self - .client - .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix]) + .pg_client + .read() .await - .context(PostgresExecutionSnafu)?; + .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix]) + .await?; let mut values_with_leases = vec![]; let mut current = Timestamp::default(); @@ -445,18 +559,21 @@ impl PgElection { key: &str, prev: &str, updated: &str, - lease_ttl: u64, + lease_ttl: Duration, ) -> Result<()> { let key = key.as_bytes(); let prev = prev.as_bytes(); + self.maybe_init_client().await?; + let lease_ttl_secs = lease_ttl.as_secs() as f64; let res = self - .client + .pg_client + .read() + .await .execute( &self.sql_set.update_value_with_lease, - &[&key, &prev, &updated, &(lease_ttl as f64)], + &[&key, &prev, &updated, &lease_ttl_secs], ) - .await - .context(PostgresExecutionSnafu)?; + .await?; ensure!( res == 1, @@ -473,16 +590,18 @@ impl PgElection { &self, key: &str, value: &str, - lease_ttl_secs: u64, + lease_ttl: Duration, ) -> Result { let key = key.as_bytes(); - let lease_ttl_secs = lease_ttl_secs as f64; + let lease_ttl_secs = lease_ttl.as_secs() as f64; let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs]; + self.maybe_init_client().await?; let res = self - .client - .query(&self.sql_set.put_value_with_lease, ¶ms) + .pg_client + .read() .await - .context(PostgresExecutionSnafu)?; + .query(&self.sql_set.put_value_with_lease, ¶ms) + .await?; Ok(res.is_empty()) } @@ -490,11 +609,13 @@ impl PgElection { /// Caution: Should only delete the key if the lease is expired. async fn delete_value(&self, key: &str) -> Result { let key = key.as_bytes(); + self.maybe_init_client().await?; let res = self - .client - .query(&self.sql_set.delete_value, &[&key]) + .pg_client + .read() .await - .context(PostgresExecutionSnafu)?; + .query(&self.sql_set.delete_value, &[&key]) + .await?; Ok(res.len() == 1) } @@ -536,7 +657,7 @@ impl PgElection { &key, &lease.origin, &self.leader_value, - self.meta_lease_ttl_secs, + self.meta_lease_ttl, ) .await?; } @@ -605,10 +726,12 @@ impl PgElection { ..Default::default() }; self.delete_value(&key).await?; - self.client - .query(&self.sql_set.step_down, &[]) + self.maybe_init_client().await?; + self.pg_client + .read() .await - .context(PostgresExecutionSnafu)?; + .query(&self.sql_set.step_down, &[]) + .await?; send_leader_change_and_set_flags( &self.is_leader, &self.leader_infancy, @@ -651,7 +774,7 @@ impl PgElection { ..Default::default() }; self.delete_value(&key).await?; - self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs) + self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl) .await?; if self @@ -674,15 +797,21 @@ impl PgElection { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use std::env; use common_meta::maybe_skip_postgres_integration_test; - use tokio_postgres::{Client, NoTls}; use super::*; - use crate::error::PostgresExecutionSnafu; + use crate::bootstrap::create_postgres_pool; + use crate::error; - async fn create_postgres_client(table_name: Option<&str>) -> Result { + async fn create_postgres_client( + table_name: Option<&str>, + execution_timeout: Duration, + idle_session_timeout: Duration, + statement_timeout: Duration, + ) -> Result { let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); if endpoint.is_empty() { return UnexpectedSnafu { @@ -690,25 +819,34 @@ mod tests { } .fail(); } - let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) - .await - .context(PostgresExecutionSnafu)?; - tokio::spawn(async move { - connection.await.context(PostgresExecutionSnafu).unwrap(); - }); + let pool = create_postgres_pool(&[endpoint]).await.unwrap(); + let mut pg_client = ElectionPgClient::new( + pool, + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .unwrap(); + pg_client.maybe_init_client().await?; if let Some(table_name) = table_name { let create_table_sql = format!( "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);", table_name ); - client.execute(&create_table_sql, &[]).await.unwrap(); + pg_client.execute(&create_table_sql, &[]).await?; } - Ok(client) + Ok(pg_client) } - async fn drop_table(client: &Client, table_name: &str) { + async fn drop_table(pg_election: &PgElection, table_name: &str) { let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name); - client.execute(&sql, &[]).await.unwrap(); + pg_election + .pg_client + .read() + .await + .execute(&sql, &[]) + .await + .unwrap(); } #[tokio::test] @@ -719,23 +857,35 @@ mod tests { let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_postgres_crud_greptime_metakv"; - let client = create_postgres_client(Some(table_name)).await.unwrap(); + let candidate_lease_ttl = Duration::from_secs(10); + let execution_timeout = Duration::from_secs(10); + let statement_timeout = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + let idle_session_timeout = Duration::from_secs(0); + let client = create_postgres_client( + Some(table_name), + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); let (tx, _) = broadcast::channel(100); let pg_election = PgElection { leader_value: "test_leader".to_string(), - client, + pg_client: RwLock::new(client), is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid, - candidate_lease_ttl_secs: 10, - meta_lease_ttl_secs: 2, - sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(28319, table_name).build(), }; let res = pg_election - .put_value_with_lease(&key, &value, 10) + .put_value_with_lease(&key, &value, candidate_lease_ttl) .await .unwrap(); assert!(res); @@ -748,7 +898,7 @@ mod tests { assert_eq!(lease.leader_value, value); pg_election - .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs) + .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl) .await .unwrap(); @@ -762,7 +912,7 @@ mod tests { let key = format!("test_key_{}", i); let value = format!("test_value_{}", i); pg_election - .put_value_with_lease(&key, &value, 10) + .put_value_with_lease(&key, &value, candidate_lease_ttl) .await .unwrap(); } @@ -787,28 +937,39 @@ mod tests { assert!(res.is_empty()); assert!(current == Timestamp::default()); - drop_table(&pg_election.client, table_name).await; + drop_table(&pg_election, table_name).await; } async fn candidate( leader_value: String, - candidate_lease_ttl_secs: u64, + candidate_lease_ttl: Duration, store_key_prefix: String, table_name: String, ) { - let client = create_postgres_client(None).await.unwrap(); + let execution_timeout = Duration::from_secs(10); + let statement_timeout = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + let idle_session_timeout = Duration::from_secs(0); + let client = create_postgres_client( + None, + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); let (tx, _) = broadcast::channel(100); let pg_election = PgElection { leader_value, - client, + pg_client: RwLock::new(client), is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix, - candidate_lease_ttl_secs, - meta_lease_ttl_secs: 2, - sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(28319, &table_name).build(), }; let node_info = MetasrvNodeInfo { @@ -824,17 +985,28 @@ mod tests { async fn test_candidate_registration() { maybe_skip_postgres_integration_test!(); let leader_value_prefix = "test_leader".to_string(); - let candidate_lease_ttl_secs = 5; let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_candidate_registration_greptime_metakv"; let mut handles = vec![]; - let client = create_postgres_client(Some(table_name)).await.unwrap(); + let candidate_lease_ttl = Duration::from_secs(5); + let execution_timeout = Duration::from_secs(10); + let statement_timeout = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + let idle_session_timeout = Duration::from_secs(0); + let client = create_postgres_client( + Some(table_name), + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); for i in 0..10 { let leader_value = format!("{}{}", leader_value_prefix, i); let handle = tokio::spawn(candidate( leader_value, - candidate_lease_ttl_secs, + candidate_lease_ttl, uuid.clone(), table_name.to_string(), )); @@ -847,14 +1019,14 @@ mod tests { let leader_value = "test_leader".to_string(); let pg_election = PgElection { leader_value, - client, + pg_client: RwLock::new(client), is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid.clone(), - candidate_lease_ttl_secs, - meta_lease_ttl_secs: 2, - sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(28319, table_name).build(), }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -876,29 +1048,40 @@ mod tests { assert!(res); } - drop_table(&pg_election.client, table_name).await; + drop_table(&pg_election, table_name).await; } #[tokio::test] async fn test_elected_and_step_down() { maybe_skip_postgres_integration_test!(); let leader_value = "test_leader".to_string(); - let candidate_lease_ttl_secs = 5; let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_elected_and_step_down_greptime_metakv"; - let client = create_postgres_client(Some(table_name)).await.unwrap(); + let candidate_lease_ttl = Duration::from_secs(5); + let execution_timeout = Duration::from_secs(10); + let statement_timeout = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + let idle_session_timeout = Duration::from_secs(0); + let client = create_postgres_client( + Some(table_name), + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); let (tx, mut rx) = broadcast::channel(100); let leader_pg_election = PgElection { leader_value: leader_value.clone(), - client, + pg_client: RwLock::new(client), is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid, - candidate_lease_ttl_secs, - meta_lease_ttl_secs: 2, - sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(28320, table_name).build(), }; leader_pg_election.elected().await.unwrap(); @@ -990,7 +1173,7 @@ mod tests { _ => panic!("Expected LeaderChangeMessage::StepDown"), } - drop_table(&leader_pg_election.client, table_name).await; + drop_table(&leader_pg_election, table_name).await; } #[tokio::test] @@ -999,25 +1182,38 @@ mod tests { let leader_value = "test_leader".to_string(); let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_leader_action_greptime_metakv"; - let candidate_lease_ttl_secs = 5; - let client = create_postgres_client(Some(table_name)).await.unwrap(); + let candidate_lease_ttl = Duration::from_secs(5); + let execution_timeout = Duration::from_secs(10); + let statement_timeout = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + let idle_session_timeout = Duration::from_secs(0); + let client = create_postgres_client( + Some(table_name), + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); let (tx, mut rx) = broadcast::channel(100); let leader_pg_election = PgElection { leader_value: leader_value.clone(), - client, + pg_client: RwLock::new(client), is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid, - candidate_lease_ttl_secs, - meta_lease_ttl_secs: 2, - sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(28321, table_name).build(), }; // Step 1: No leader exists, campaign and elected. let res = leader_pg_election - .client + .pg_client + .read() + .await .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); @@ -1048,7 +1244,9 @@ mod tests { // Step 2: As a leader, renew the lease. let res = leader_pg_election - .client + .pg_client + .read() + .await .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); @@ -1070,7 +1268,9 @@ mod tests { tokio::time::sleep(Duration::from_secs(2)).await; let res = leader_pg_election - .client + .pg_client + .read() + .await .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); @@ -1098,7 +1298,9 @@ mod tests { // Step 4: Re-campaign and elected. let res = leader_pg_election - .client + .pg_client + .read() + .await .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); @@ -1155,7 +1357,9 @@ mod tests { // Step 6: Re-campaign and elected. let res = leader_pg_election - .client + .pg_client + .read() + .await .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); @@ -1186,7 +1390,9 @@ mod tests { // Step 7: Something wrong, the leader key changed by others. let res = leader_pg_election - .client + .pg_client + .read() + .await .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); @@ -1197,7 +1403,11 @@ mod tests { .await .unwrap(); leader_pg_election - .put_value_with_lease(&leader_pg_election.election_key(), "test", 10) + .put_value_with_lease( + &leader_pg_election.election_key(), + "test", + Duration::from_secs(10), + ) .await .unwrap(); leader_pg_election.leader_action().await.unwrap(); @@ -1223,52 +1433,74 @@ mod tests { // Clean up leader_pg_election - .client + .pg_client + .read() + .await .query(&leader_pg_election.sql_set.step_down, &[]) .await .unwrap(); - drop_table(&leader_pg_election.client, table_name).await; + drop_table(&leader_pg_election, table_name).await; } #[tokio::test] async fn test_follower_action() { maybe_skip_postgres_integration_test!(); common_telemetry::init_default_ut_logging(); - let candidate_lease_ttl_secs = 5; let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_follower_action_greptime_metakv"; - let follower_client = create_postgres_client(Some(table_name)).await.unwrap(); + let candidate_lease_ttl = Duration::from_secs(5); + let execution_timeout = Duration::from_secs(10); + let statement_timeout = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + let idle_session_timeout = Duration::from_secs(0); + let follower_client = create_postgres_client( + Some(table_name), + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); let (tx, mut rx) = broadcast::channel(100); let follower_pg_election = PgElection { leader_value: "test_follower".to_string(), - client: follower_client, + pg_client: RwLock::new(follower_client), is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid.clone(), - candidate_lease_ttl_secs, - meta_lease_ttl_secs: 2, - sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(28322, table_name).build(), }; - let leader_client = create_postgres_client(Some(table_name)).await.unwrap(); + let leader_client = create_postgres_client( + Some(table_name), + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); let (tx, _) = broadcast::channel(100); let leader_pg_election = PgElection { leader_value: "test_leader".to_string(), - client: leader_client, + pg_client: RwLock::new(leader_client), is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid, - candidate_lease_ttl_secs, - meta_lease_ttl_secs: 2, - sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(28322, table_name).build(), }; leader_pg_election - .client + .pg_client + .read() + .await .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); @@ -1309,11 +1541,41 @@ mod tests { // Clean up leader_pg_election - .client + .pg_client + .read() + .await .query(&leader_pg_election.sql_set.step_down, &[]) .await .unwrap(); - drop_table(&follower_pg_election.client, table_name).await; + drop_table(&follower_pg_election, table_name).await; + } + + #[tokio::test] + async fn test_idle_session_timeout() { + maybe_skip_postgres_integration_test!(); + common_telemetry::init_default_ut_logging(); + let execution_timeout = Duration::from_secs(10); + let statement_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(1); + let mut client = create_postgres_client( + None, + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(1100)).await; + // Wait for the idle session timeout. + let err = client.query("SELECT 1", &[]).await.unwrap_err(); + assert_matches!(err, error::Error::PostgresExecution { .. }); + let error::Error::PostgresExecution { error, .. } = err else { + panic!("Expected PostgresExecution error"); + }; + assert!(error.is_closed()); + // Reset the client and try again. + client.reset_client().await.unwrap(); + let _ = client.query("SELECT 1", &[]).await.unwrap(); } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 88ae7142ea..b14869587b 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -748,21 +748,31 @@ pub enum Error { }, #[cfg(feature = "pg_kvbackend")] - #[snafu(display("Failed to execute via postgres"))] + #[snafu(display("Failed to execute via postgres, sql: {}", sql))] PostgresExecution { #[snafu(source)] error: tokio_postgres::Error, + sql: String, #[snafu(implicit)] location: Location, }, #[cfg(feature = "pg_kvbackend")] - #[snafu(display("Failed to connect to Postgres"))] - ConnectPostgres { - #[snafu(source)] - error: tokio_postgres::Error, + #[snafu(display("Failed to get Postgres client"))] + GetPostgresClient { #[snafu(implicit)] location: Location, + #[snafu(source)] + error: deadpool::managed::PoolError, + }, + + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))] + SqlExecutionTimeout { + #[snafu(implicit)] + location: Location, + sql: String, + duration: std::time::Duration, }, #[cfg(feature = "pg_kvbackend")] @@ -1005,9 +1015,10 @@ impl ErrorExt for Error { Error::LookupPeer { source, .. } => source.status_code(), #[cfg(feature = "pg_kvbackend")] Error::CreatePostgresPool { .. } + | Error::GetPostgresClient { .. } | Error::GetPostgresConnection { .. } | Error::PostgresExecution { .. } - | Error::ConnectPostgres { .. } => StatusCode::Internal, + | Error::SqlExecutionTimeout { .. } => StatusCode::Internal, #[cfg(feature = "mysql_kvbackend")] Error::MySqlExecution { .. } | Error::CreateMySqlPool { .. } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index a26df4cc96..8225f0ada4 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -582,6 +582,7 @@ impl Metasrv { if let Err(e) = res { warn!(e; "Metasrv election error"); } + election.reset_campaign().await; info!("Metasrv re-initiate election"); } info!("Metasrv stopped");