diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 9f78257e38..33c32443e2 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -20,6 +20,8 @@ use api::v1::meta::procedure_service_server::ProcedureServiceServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; use common_config::Configurable; +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] +use common_meta::distributed_time_constants::META_LEASE_SECS; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -249,6 +251,7 @@ pub async fn metasrv_builder( election_client, opts.store_key_prefix.clone(), CANDIDATE_LEASE_SECS, + META_LEASE_SECS, &opts.meta_table_name, opts.meta_election_lock_id, ) @@ -270,6 +273,7 @@ pub async fn metasrv_builder( election_client, opts.store_key_prefix.clone(), CANDIDATE_LEASE_SECS, + META_LEASE_SECS, &election_table_name, ) .await?; diff --git a/src/meta-srv/src/election/mysql.rs b/src/meta-srv/src/election/mysql.rs index 6ccf46cc4c..41151fcbcb 100644 --- a/src/meta-srv/src/election/mysql.rs +++ b/src/meta-srv/src/election/mysql.rs @@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; use common_telemetry::{error, warn}; use common_time::Timestamp; use itertools::Itertools; @@ -25,7 +24,7 @@ use sqlx::mysql::{MySqlArguments, MySqlRow}; use sqlx::query::Query; use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row}; use tokio::sync::{broadcast, Mutex, MutexGuard}; -use tokio::time::{Interval, MissedTickBehavior}; +use tokio::time::MissedTickBehavior; use crate::election::{ listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY, @@ -41,7 +40,7 @@ const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; /// Lease information. /// TODO(CookiePie): PgElection can also use this struct. Refactor it to a common module. -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] struct Lease { leader_value: String, expire_time: Timestamp, @@ -52,6 +51,7 @@ struct Lease { struct ElectionSqlFactory<'a> { table_name: &'a str, + meta_lease_ttl_secs: u64, } struct ElectionSqlSet { @@ -99,8 +99,11 @@ struct ElectionSqlSet { } impl<'a> ElectionSqlFactory<'a> { - fn new(table_name: &'a str) -> Self { - Self { table_name } + fn new(table_name: &'a str, meta_lease_ttl_secs: u64) -> Self { + Self { + table_name, + meta_lease_ttl_secs, + } } fn build(self) -> ElectionSqlSet { @@ -117,7 +120,10 @@ impl<'a> ElectionSqlFactory<'a> { // Currently the session timeout is longer than the leader lease time. // So the leader will renew the lease twice before the session timeout if everything goes well. fn set_idle_session_timeout_sql(&self) -> String { - format!("SET SESSION wait_timeout = {};", META_LEASE_SECS + 1) + format!( + "SET SESSION wait_timeout = {};", + self.meta_lease_ttl_secs + 1 + ) } fn set_lock_wait_timeout_sql(&self) -> &str { @@ -147,6 +153,8 @@ impl<'a> ElectionSqlFactory<'a> { "SELECT @@version;" } + /// Use `SELECT FOR UPDATE` to lock for compatibility with other MySQL-compatible databases + /// instead of directly using `GET_LOCK`. fn campaign_sql(&self) -> String { format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name) } @@ -315,6 +323,7 @@ pub struct MySqlElection { leader_watcher: broadcast::Sender, store_key_prefix: String, candidate_lease_ttl_secs: u64, + meta_lease_ttl_secs: u64, sql_set: ElectionSqlSet, } @@ -324,9 +333,10 @@ impl MySqlElection { mut client: sqlx::MySqlConnection, store_key_prefix: String, candidate_lease_ttl_secs: u64, + meta_lease_ttl_secs: u64, table_name: &str, ) -> Result { - let sql_factory = ElectionSqlFactory::new(table_name); + let sql_factory = ElectionSqlFactory::new(table_name, meta_lease_ttl_secs); sqlx::query(&sql_factory.create_table_sql()) .execute(&mut client) .await @@ -365,6 +375,7 @@ impl MySqlElection { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, + meta_lease_ttl_secs, sql_set: sql_factory.build(), })) } @@ -452,8 +463,14 @@ impl Election for MySqlElection { } ); - self.update_value_with_lease(&key, &lease.origin, &node_info, &mut executor) - .await?; + self.update_value_with_lease( + &key, + &lease.origin, + &node_info, + self.candidate_lease_ttl_secs, + &mut executor, + ) + .await?; std::mem::drop(executor); } } @@ -480,10 +497,11 @@ impl Election for MySqlElection { async fn campaign(&self) -> Result<()> { let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)); + tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2)); keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { - let _ = self.do_campaign(&mut keep_alive_interval).await; + let _ = self.do_campaign().await; + keep_alive_interval.tick().await; } } @@ -514,7 +532,7 @@ impl Election for MySqlElection { } impl MySqlElection { - /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned. + /// Returns value, expire time and current time. async fn get_value_with_lease( &self, key: &str, @@ -587,6 +605,7 @@ impl MySqlElection { key: &str, prev: &str, updated: &str, + lease_ttl: u64, executor: &mut Executor<'_>, ) -> Result<()> { let key = key.as_bytes(); @@ -595,7 +614,7 @@ impl MySqlElection { let query = sqlx::query(&self.sql_set.update_value_with_lease) .bind(updated) - .bind(self.candidate_lease_ttl_secs as f64) + .bind(lease_ttl as f64) .bind(key) .bind(prev); let res = executor @@ -627,9 +646,9 @@ impl MySqlElection { .bind(value) .bind(lease_ttl_secs); let res = executor - .query(query, &self.sql_set.put_value_with_lease) + .execute(query, &self.sql_set.put_value_with_lease) .await?; - Ok(res.is_empty()) + Ok(res == 1) } /// Returns `true` if the deletion is successful. @@ -644,62 +663,82 @@ impl MySqlElection { /// Attempts to acquire leadership by executing a campaign. This function continuously checks /// if the current lease is still valid. - async fn do_campaign(&self, interval: &mut Interval) -> Result<()> { + async fn do_campaign(&self) -> Result<()> { // Need to restrict the scope of the client to avoid ambiguous overloads. use sqlx::Acquire; - loop { - let client = self.client.lock().await; - let executor = Executor::Default(client); - let mut lease = Lease::default(); - match ( - self.lease_check(executor, &mut lease).await, - self.is_leader(), - ) { - // If the leader lease is valid and I'm the leader, renew the lease. - (Ok(_), true) => { - let mut client = self.client.lock().await; - let txn = client - .begin() - .await - .context(MySqlExecutionSnafu { sql: "BEGIN" })?; - let mut executor = Executor::Txn(txn); - let query = sqlx::query(&self.sql_set.campaign); - executor.query(query, &self.sql_set.campaign).await?; - self.renew_lease(executor, lease).await?; - } - // If the leader lease expires and I'm the leader, notify the leader watcher and step down. - // Another instance should be elected as the leader in this case. - (Err(_), true) => { - warn!("Leader lease expired, re-initiate the campaign"); - self.step_down_without_lock().await?; - } - // If the leader lease expires and I'm not the leader, elect myself. - (Err(_), false) => { - warn!("Leader lease expired, re-initiate the campaign"); - let mut client = self.client.lock().await; - let txn = client - .begin() - .await - .context(MySqlExecutionSnafu { sql: "BEGIN" })?; - let mut executor = Executor::Txn(txn); - let query = sqlx::query(&self.sql_set.campaign); - executor.query(query, &self.sql_set.campaign).await?; - self.elected(&mut executor).await?; - executor.commit().await?; - } - // If the leader lease is valid and I'm not the leader, do nothing. - (Ok(_), false) => {} + let client = self.client.lock().await; + let executor = Executor::Default(client); + let mut lease = Lease::default(); + match ( + self.lease_check(executor, &mut lease).await, + self.is_leader(), + self.leader_value == lease.leader_value, + ) { + // If the leader lease is valid and I'm the leader, renew the lease. + (Ok(_), true, true) => { + let mut client = self.client.lock().await; + let txn = client + .begin() + .await + .context(MySqlExecutionSnafu { sql: "BEGIN" })?; + let mut executor = Executor::Txn(txn); + let query = sqlx::query(&self.sql_set.campaign); + executor.query(query, &self.sql_set.campaign).await?; + self.renew_lease(executor, lease).await?; } - interval.tick().await; + // If the leader lease expires and I'm the leader, notify the leader watcher and step down. + // Another instance should be elected as the leader in this case. + (Err(_), true, _) | (Ok(_), true, false) => { + warn!("Leader lease expired, step down..."); + self.step_down_without_lock().await?; + } + // If the leader lease expires and I'm not the leader, elect myself. + (Err(_), false, _) => { + warn!("Leader lease expired, elected."); + let mut client = self.client.lock().await; + let txn = client + .begin() + .await + .context(MySqlExecutionSnafu { sql: "BEGIN" })?; + let mut executor = Executor::Txn(txn); + let query = sqlx::query(&self.sql_set.campaign); + executor.query(query, &self.sql_set.campaign).await?; + self.elected(&mut executor).await?; + executor.commit().await?; + } + // If the leader lease is valid and I'm the leader, but I don't think I'm the leader. + // Just re-elect myself. + (Ok(_), false, true) => { + warn!("I should be the leader, but I don't think so. Something went wrong."); + let mut client = self.client.lock().await; + let txn = client + .begin() + .await + .context(MySqlExecutionSnafu { sql: "BEGIN" })?; + let mut executor = Executor::Txn(txn); + let query = sqlx::query(&self.sql_set.campaign); + executor.query(query, &self.sql_set.campaign).await?; + self.elected(&mut executor).await?; + executor.commit().await?; + } + // If the leader lease is valid and I'm not the leader, do nothing. + (Ok(_), false, false) => {} } + Ok(()) } /// Renew the lease async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> { let key = self.election_key(); - self.update_value_with_lease(&key, &lease.origin, &self.leader_value, &mut executor) - .await?; + self.update_value_with_lease( + &key, + &lease.origin, + &self.leader_value, + self.meta_lease_ttl_secs, + &mut executor, + ) + .await?; executor.commit().await?; Ok(()) } @@ -758,7 +797,7 @@ impl MySqlElection { ..Default::default() }; self.delete_value(&key, executor).await?; - self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS, executor) + self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs, executor) .await?; if self @@ -784,7 +823,7 @@ impl MySqlElection { match query.fetch_one(client).await { Ok(row) => { let version: String = row.try_get(0).unwrap(); - if !version.starts_with("8.0") || !version.starts_with("5.7") { + if !version.starts_with("8.0") && !version.starts_with("5.7") { warn!( "Unsupported MySQL version: {}, expected: [5.7, 8.0]", version @@ -798,3 +837,589 @@ impl MySqlElection { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::env; + + use common_telemetry::init_default_ut_logging; + use sqlx::Connection; + + use super::*; + use crate::error::MySqlExecutionSnafu; + + async fn create_mysql_client(table_name: Option<&str>) -> Result> { + init_default_ut_logging(); + let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default(); + if endpoint.is_empty() { + return UnexpectedSnafu { + violated: "MySQL endpoint is empty".to_string(), + } + .fail(); + } + let mut client = MySqlConnection::connect(&endpoint).await.unwrap(); + if let Some(table_name) = table_name { + let create_table_sql = format!( + "CREATE TABLE IF NOT EXISTS {}(k VARCHAR(255) PRIMARY KEY, v BLOB);", + table_name + ); + sqlx::query(&create_table_sql) + .execute(&mut client) + .await + .context(MySqlExecutionSnafu { + sql: create_table_sql, + })?; + } + Ok(Mutex::new(client)) + } + + async fn drop_table(client: &Mutex, table_name: &str) { + let mut client = client.lock().await; + let sql = format!("DROP TABLE IF EXISTS {};", table_name); + sqlx::query(&sql) + .execute(&mut *client) + .await + .context(MySqlExecutionSnafu { sql }) + .unwrap(); + } + + #[tokio::test] + async fn test_mysql_crud() { + let key = "test_key".to_string(); + let value = "test_value".to_string(); + + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_mysql_crud_greptime_metakv"; + let client = create_mysql_client(Some(table_name)).await.unwrap(); + + { + let mut a = client.lock().await; + let txn = a.begin().await.unwrap(); + let mut executor = Executor::Txn(txn); + let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name); + let query = sqlx::query(&raw_query); + let _ = executor.query(query, &raw_query).await.unwrap(); + } + + let (tx, _) = broadcast::channel(100); + let mysql_election = MySqlElection { + leader_value: "test_leader".to_string(), + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl_secs: 10, + meta_lease_ttl_secs: 1, + sql_set: ElectionSqlFactory::new(table_name, 1).build(), + }; + let client = mysql_election.client.lock().await; + let mut executor = Executor::Default(client); + let res = mysql_election + .put_value_with_lease(&key, &value, 10, &mut executor) + .await + .unwrap(); + assert!(res); + + let lease = mysql_election + .get_value_with_lease(&key, &mut executor) + .await + .unwrap() + .unwrap(); + assert_eq!(lease.leader_value, value); + + mysql_election + .update_value_with_lease(&key, &lease.origin, &value, 10, &mut executor) + .await + .unwrap(); + + let res = mysql_election + .delete_value(&key, &mut executor) + .await + .unwrap(); + assert!(res); + + let res = mysql_election + .get_value_with_lease(&key, &mut executor) + .await + .unwrap(); + assert!(res.is_none()); + + for i in 0..10 { + let key = format!("test_key_{}", i); + let value = format!("test_value_{}", i); + mysql_election + .put_value_with_lease(&key, &value, 10, &mut executor) + .await + .unwrap(); + } + + let key_prefix = "test_key".to_string(); + let (res, _) = mysql_election + .get_value_with_lease_by_prefix(&key_prefix, &mut executor) + .await + .unwrap(); + assert_eq!(res.len(), 10); + + for i in 0..10 { + let key = format!("test_key_{}", i); + let res = mysql_election + .delete_value(&key, &mut executor) + .await + .unwrap(); + assert!(res); + } + + let (res, current) = mysql_election + .get_value_with_lease_by_prefix(&key_prefix, &mut executor) + .await + .unwrap(); + assert!(res.is_empty()); + assert!(current == Timestamp::default()); + + // Should drop manually. + std::mem::drop(executor); + drop_table(&mysql_election.client, table_name).await; + } + + async fn candidate( + leader_value: String, + candidate_lease_ttl_secs: u64, + store_key_prefix: String, + table_name: String, + ) { + let client = create_mysql_client(None).await.unwrap(); + + let (tx, _) = broadcast::channel(100); + let mysql_election = MySqlElection { + leader_value, + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix, + candidate_lease_ttl_secs, + meta_lease_ttl_secs: 1, + sql_set: ElectionSqlFactory::new(&table_name, 1).build(), + }; + + let node_info = MetasrvNodeInfo { + addr: "test_addr".to_string(), + version: "test_version".to_string(), + git_commit: "test_git_commit".to_string(), + start_time_ms: 0, + }; + mysql_election.register_candidate(&node_info).await.unwrap(); + } + + #[tokio::test] + async fn test_candidate_registration() { + let leader_value_prefix = "test_leader".to_string(); + let candidate_lease_ttl_secs = 2; + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_candidate_registration_greptime_metakv"; + let mut handles = vec![]; + let client = create_mysql_client(Some(table_name)).await.unwrap(); + + for i in 0..10 { + let leader_value = format!("{}{}", leader_value_prefix, i); + let handle = tokio::spawn(candidate( + leader_value, + candidate_lease_ttl_secs, + uuid.clone(), + table_name.to_string(), + )); + handles.push(handle); + } + // Wait for candidates to registrate themselves and renew their leases at least once. + tokio::time::sleep(Duration::from_secs(candidate_lease_ttl_secs / 2 + 1)).await; + + let (tx, _) = broadcast::channel(100); + let leader_value = "test_leader".to_string(); + let mysql_election = MySqlElection { + leader_value, + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid.clone(), + candidate_lease_ttl_secs, + meta_lease_ttl_secs: 1, + sql_set: ElectionSqlFactory::new(table_name, 1).build(), + }; + + let candidates = mysql_election.all_candidates().await.unwrap(); + assert_eq!(candidates.len(), 10); + + for handle in handles { + handle.abort(); + } + + // Wait for the candidate leases to expire. + tokio::time::sleep(Duration::from_secs(candidate_lease_ttl_secs + 1)).await; + let candidates = mysql_election.all_candidates().await.unwrap(); + assert!(candidates.is_empty()); + + // Garbage collection + let client = mysql_election.client.lock().await; + let mut executor = Executor::Default(client); + for i in 0..10 { + let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i); + let res = mysql_election + .delete_value(&key, &mut executor) + .await + .unwrap(); + assert!(res); + } + + // Should drop manually. + std::mem::drop(executor); + drop_table(&mysql_election.client, table_name).await; + } + + async fn elected(election: &MySqlElection, table_name: &str) { + let mut client = election.client.lock().await; + let txn = client.begin().await.unwrap(); + let mut executor = Executor::Txn(txn); + let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name); + let query = sqlx::query(&raw_query); + let _ = executor.query(query, &raw_query).await.unwrap(); + election.elected(&mut executor).await.unwrap(); + executor.commit().await.unwrap(); + } + + async fn get_lease(election: &MySqlElection) -> Option { + let client = election.client.lock().await; + let mut executor = Executor::Default(client); + election + .get_value_with_lease(&election.election_key(), &mut executor) + .await + .unwrap() + } + + #[tokio::test] + async fn test_elected_and_step_down() { + let leader_value = "test_leader".to_string(); + let candidate_lease_ttl_secs = 1; + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_elected_and_step_down_greptime_metakv"; + let client = create_mysql_client(Some(table_name)).await.unwrap(); + + let (tx, mut rx) = broadcast::channel(100); + let leader_mysql_election = MySqlElection { + leader_value: leader_value.clone(), + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl_secs, + meta_lease_ttl_secs: 1, + sql_set: ElectionSqlFactory::new(table_name, 1).build(), + }; + + elected(&leader_mysql_election, table_name).await; + let lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + assert!(lease.expire_time > lease.current); + assert!(leader_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + leader_mysql_election + .step_down_without_lock() + .await + .unwrap(); + let lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + assert!(!leader_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + + elected(&leader_mysql_election, table_name).await; + let lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + assert!(lease.expire_time > lease.current); + assert!(leader_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + drop_table(&leader_mysql_election.client, table_name).await; + } + + #[tokio::test] + async fn test_campaign() { + let leader_value = "test_leader".to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_leader_action_greptime_metakv"; + let candidate_lease_ttl_secs = 5; + let meta_lease_ttl_secs = 1; + let client = create_mysql_client(Some(table_name)).await.unwrap(); + + let (tx, mut rx) = broadcast::channel(100); + let leader_mysql_election = MySqlElection { + leader_value: leader_value.clone(), + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl_secs, + meta_lease_ttl_secs, + sql_set: ElectionSqlFactory::new(table_name, 1).build(), + }; + + // Step 1: No leader exists, campaign and elected. + leader_mysql_election.do_campaign().await.unwrap(); + let lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + assert!(lease.expire_time > lease.current); + assert!(leader_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + // Step 2: As a leader, renew the lease. + leader_mysql_election.do_campaign().await.unwrap(); + let new_lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + // The lease should be renewed. + assert!(new_lease.expire_time > lease.expire_time); + assert!(new_lease.expire_time > new_lease.current); + assert!(leader_mysql_election.is_leader()); + + // Step 3: Something wrong, the leader lease expired. + tokio::time::sleep(Duration::from_secs(meta_lease_ttl_secs + 1)).await; + leader_mysql_election.do_campaign().await.unwrap(); + let lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + assert!(lease.expire_time <= lease.current); + assert!(!leader_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + + // Step 4: Re-elect itself. + leader_mysql_election.do_campaign().await.unwrap(); + let lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + assert!(lease.expire_time > lease.current); + assert!(leader_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + // Step 5: Something wrong, the leader key is deleted by other followers. + { + let client = leader_mysql_election.client.lock().await; + let mut executor = Executor::Default(client); + leader_mysql_election + .delete_value(&leader_mysql_election.election_key(), &mut executor) + .await + .unwrap(); + } + leader_mysql_election.do_campaign().await.unwrap(); + let res = get_lease(&leader_mysql_election).await; + assert!(res.is_none()); + assert!(!leader_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + + // Step 6: Re-elect itself. + leader_mysql_election.do_campaign().await.unwrap(); + let lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + assert!(lease.expire_time > lease.current); + assert!(leader_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + // Step 7: Something wrong, the leader key changed by others. + let another_leader_key = "another_leader"; + { + let client = leader_mysql_election.client.lock().await; + let mut executor = Executor::Default(client); + leader_mysql_election + .delete_value(&leader_mysql_election.election_key(), &mut executor) + .await + .unwrap(); + leader_mysql_election + .put_value_with_lease( + &leader_mysql_election.election_key(), + another_leader_key, + 10, + &mut executor, + ) + .await + .unwrap(); + } + leader_mysql_election.do_campaign().await.unwrap(); + let lease = get_lease(&leader_mysql_election).await.unwrap(); + // Different from pg, mysql will not delete the key, just step down. + assert_eq!(lease.leader_value, another_leader_key); + assert!(lease.expire_time > lease.current); + assert!(!leader_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::StepDown(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), leader_value); + assert_eq!( + String::from_utf8_lossy(key.key()), + leader_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::StepDown"), + } + + drop_table(&leader_mysql_election.client, table_name).await; + } + + #[tokio::test] + async fn test_follower_action() { + common_telemetry::init_default_ut_logging(); + let candidate_lease_ttl_secs = 5; + let meta_lease_ttl_secs = 1; + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_follower_action_greptime_metakv"; + + let follower_client = create_mysql_client(Some(table_name)).await.unwrap(); + let (tx, mut rx) = broadcast::channel(100); + let follower_mysql_election = MySqlElection { + leader_value: "test_follower".to_string(), + client: follower_client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid.clone(), + candidate_lease_ttl_secs, + meta_lease_ttl_secs, + sql_set: ElectionSqlFactory::new(table_name, 1).build(), + }; + + let leader_client = create_mysql_client(Some(table_name)).await.unwrap(); + let (tx, _) = broadcast::channel(100); + let leader_mysql_election = MySqlElection { + leader_value: "test_leader".to_string(), + client: leader_client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl_secs, + meta_lease_ttl_secs, + sql_set: ElectionSqlFactory::new(table_name, 1).build(), + }; + + leader_mysql_election.do_campaign().await.unwrap(); + + // Step 1: As a follower, the leader exists and the lease is not expired. Do nothing. + follower_mysql_election.do_campaign().await.unwrap(); + + // Step 2: As a follower, the leader exists but the lease expired. Re-elect itself. + tokio::time::sleep(Duration::from_secs(meta_lease_ttl_secs + 1)).await; + follower_mysql_election.do_campaign().await.unwrap(); + assert!(follower_mysql_election.is_leader()); + + match rx.recv().await { + Ok(LeaderChangeMessage::Elected(key)) => { + assert_eq!(String::from_utf8_lossy(key.name()), "test_follower"); + assert_eq!( + String::from_utf8_lossy(key.key()), + follower_mysql_election.election_key() + ); + assert_eq!(key.lease_id(), i64::default()); + assert_eq!(key.revision(), i64::default()); + } + _ => panic!("Expected LeaderChangeMessage::Elected"), + } + + drop_table(&follower_mysql_election.client, table_name).await; + } +} diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index e0824a9a51..ed53d96036 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; use common_telemetry::{error, warn}; use common_time::Timestamp; use itertools::Itertools; @@ -41,6 +40,7 @@ const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; struct ElectionSqlFactory<'a> { lock_id: u64, table_name: &'a str, + meta_lease_ttl_secs: u64, } struct ElectionSqlSet { @@ -90,10 +90,11 @@ struct ElectionSqlSet { } impl<'a> ElectionSqlFactory<'a> { - fn new(lock_id: u64, table_name: &'a str) -> Self { + fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self { Self { lock_id, table_name, + meta_lease_ttl_secs, } } @@ -112,7 +113,10 @@ impl<'a> ElectionSqlFactory<'a> { // Currently the session timeout is longer than the leader lease time. // So the leader will renew the lease twice before the session timeout if everything goes well. fn set_idle_session_timeout_sql(&self) -> String { - format!("SET idle_session_timeout = '{}s';", META_LEASE_SECS + 1) + format!( + "SET idle_session_timeout = '{}s';", + self.meta_lease_ttl_secs + 1 + ) } fn campaign_sql(&self) -> String { @@ -226,6 +230,7 @@ pub struct PgElection { leader_watcher: broadcast::Sender, store_key_prefix: String, candidate_lease_ttl_secs: u64, + meta_lease_ttl_secs: u64, sql_set: ElectionSqlSet, } @@ -235,10 +240,11 @@ impl PgElection { client: Client, store_key_prefix: String, candidate_lease_ttl_secs: u64, + meta_lease_ttl_secs: u64, table_name: &str, lock_id: u64, ) -> Result { - let sql_factory = ElectionSqlFactory::new(lock_id, table_name); + let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs); // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. client .execute(&sql_factory.set_idle_session_timeout_sql(), &[]) @@ -254,6 +260,7 @@ impl PgElection { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, + meta_lease_ttl_secs, sql_set: sql_factory.build(), })) } @@ -326,7 +333,7 @@ impl Election for PgElection { // Safety: origin is Some since we are using `get_value_with_lease` with `true`. let origin = origin.unwrap(); - self.update_value_with_lease(&key, &origin, &node_info) + self.update_value_with_lease(&key, &origin, &node_info, self.candidate_lease_ttl_secs) .await?; } } @@ -361,7 +368,7 @@ impl Election for PgElection { /// to perform actions as a follower. async fn campaign(&self) -> Result<()> { let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)); + tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2)); keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { @@ -489,19 +496,20 @@ impl PgElection { Ok((values_with_leases, current)) } - async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> { + async fn update_value_with_lease( + &self, + key: &str, + prev: &str, + updated: &str, + lease_ttl: u64, + ) -> Result<()> { let key = key.as_bytes(); let prev = prev.as_bytes(); let res = self .client .execute( &self.sql_set.update_value_with_lease, - &[ - &key, - &prev, - &updated, - &(self.candidate_lease_ttl_secs as f64), - ], + &[&key, &prev, &updated, &(lease_ttl as f64)], ) .await .context(PostgresExecutionSnafu)?; @@ -578,8 +586,13 @@ impl PgElection { (true, true) => { // Safety: prev is Some since we are using `get_value_with_lease` with `true`. let prev = prev.unwrap(); - self.update_value_with_lease(&key, &prev, &self.leader_value) - .await?; + self.update_value_with_lease( + &key, + &prev, + &self.leader_value, + self.meta_lease_ttl_secs, + ) + .await?; } // Case 1.2 (true, false) => { @@ -698,7 +711,7 @@ impl PgElection { ..Default::default() }; self.delete_value(&key).await?; - self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS) + self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs) .await?; if self @@ -775,7 +788,8 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid, candidate_lease_ttl_secs: 10, - sql_set: ElectionSqlFactory::new(28319, table_name).build(), + meta_lease_ttl_secs: 2, + sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(), }; let res = pg_election @@ -793,7 +807,7 @@ mod tests { let prev = prev.unwrap(); pg_election - .update_value_with_lease(&key, &prev, &value) + .update_value_with_lease(&key, &prev, &value, pg_election.meta_lease_ttl_secs) .await .unwrap(); @@ -852,7 +866,8 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28319, &table_name).build(), + meta_lease_ttl_secs: 2, + sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(), }; let node_info = MetasrvNodeInfo { @@ -896,7 +911,8 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid.clone(), candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28319, table_name).build(), + meta_lease_ttl_secs: 2, + sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(), }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -938,7 +954,8 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid, candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28320, table_name).build(), + meta_lease_ttl_secs: 2, + sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(), }; leader_pg_election.elected().await.unwrap(); @@ -1050,7 +1067,8 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid, candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28321, table_name).build(), + meta_lease_ttl_secs: 2, + sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(), }; // Step 1: No leader exists, campaign and elected. @@ -1103,7 +1121,7 @@ mod tests { assert!(leader_pg_election.is_leader()); // Step 3: Something wrong, the leader lease expired. - tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await; + tokio::time::sleep(Duration::from_secs(2)).await; let res = leader_pg_election .client @@ -1284,7 +1302,8 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid.clone(), candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28322, table_name).build(), + meta_lease_ttl_secs: 2, + sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(), }; let leader_client = create_postgres_client(Some(table_name)).await.unwrap(); @@ -1297,7 +1316,8 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid, candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28322, table_name).build(), + meta_lease_ttl_secs: 2, + sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(), }; leader_pg_election @@ -1311,7 +1331,7 @@ mod tests { follower_pg_election.follower_action().await.unwrap(); // Step 2: As a follower, the leader exists but the lease expired. - tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await; + tokio::time::sleep(Duration::from_secs(2)).await; assert!(follower_pg_election.follower_action().await.is_err()); // Step 3: As a follower, the leader does not exist.